java框架---->RxJava的使用(一)

  RxJava是响应式程序设计的一种实现。在响应式程序设计中,当数据到达的时候,消费者做出响应。响应式编程可以将事件传递给注册了的observer。今天我们就来学习一下rxJava,并分析一下它源码感受一下它的观察者模式。

RxJava的简单使用

一、mavan的pom.xml中增加rxjava的依赖

这里我们用的是rxjava1.3.0,目前最新的已经更新到2了。

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.3.0</version>
</dependency>

二、测试的用类

import rx.Observable;
import rx.Subscriber;
/**
 * @author huhx
 */
public class RxJavaTest {
    public static void main(String[] args) {
        Observable<String> observable = Observable.just("One", "Two", "Three");
        observable.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable item) {
                System.out.println("onError");
            }

            @Override
            public void onNext(String item) {
                System.out.println("Item is " + item);
            }
        });
    }
}

三、打印的结果如下:

Item is One
Item is Two
Item is Three
onCompleted

RxJava源码的简单分析

根据上述的代码,我们简单分析一下程序的流程。

一、首先Observable.just("One", "Two", "Three")代码:

just是一个工厂方法,用心构建一个Observable对象。

public static <T> Observable<T> just(T t1, T t2, T t3) {
    return from((T[])new Object[] { t1, t2, t3 });
}

from方法的代码如下:

public static <T> Observable<T> from(T[] array) {
    int n = array.length;
    if (n == 0) {
        return empty();
    } else
    if (n == 1) {
        return just(array[0]);
    }
    return unsafeCreate(new OnSubscribeFromArray<T>(array));
}

二、我们重点是看unsafeCreate方法:

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
    return new Observable<T>(RxJavaHooks.onCreate(f));
}

对于RxJavaHooks.onCreate()方法,第一次会先执行RxJavaHooks静态块的代码。

static {
    init(); // 初始化了很多RxJavaHooks跟事件有着的变量
}

onCreate方法中,这个没有怎么看懂后续再会。

public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
    Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
    if (f != null) {
        return f.call(onSubscribe);
    }
    return onSubscribe;
}

三、对于observable.subscribe()代码

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

Observable的subscribe方法代码如下:

 1 static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
 2     if (subscriber == null) {
 3         throw new IllegalArgumentException("subscriber can not be null");
 4     }
 5     if (observable.onSubscribe == null) {
 6         throw new IllegalStateException("onSubscribe function can not be null.");
 7     }
 8
 9     subscriber.onStart();
10
11     if (!(subscriber instanceof SafeSubscriber)) {
12         subscriber = new SafeSubscriber<T>(subscriber);
13     }
14
15     try {
16         RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
17         return RxJavaHooks.onObservableReturn(subscriber);
18     } catch (Throwable e) {
19         Exceptions.throwIfFatal(e);
20         if (subscriber.isUnsubscribed()) {
21             RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
22         } else {
23             try {
24                 subscriber.onError(RxJavaHooks.onObservableError(e));
25             } catch (Throwable e2) {
26                 Exceptions.throwIfFatal(e2);
27                 RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
28                 RxJavaHooks.onObservableError(r);
29                 throw r; // NOPMD
30             }
31         }
32         return Subscriptions.unsubscribed();
33     }
34 }

四、整个的正常流程会走到16行的代码,这个是我们重点的分析地方。如果是异常情况,则只会执行onError方法

对于这个例子中,RxJavaHooks.onObservableStart(observable, observable.onSubscribe)得到的结果OnSubscribeFromArray这个类。调用它的call方法。

public void call(Subscriber<? super T> child) {
    child.setProducer(new FromArrayProducer<T>(child, array));
}

setProducer方法执行的代码如下:

 1 public void setProducer(Producer p) {
 2     long toRequest;
 3     boolean passToSubscriber = false;
 4     synchronized (this) {
 5         toRequest = requested;
 6         producer = p;
 7         if (subscriber != null) {
 8             // middle operator ... we pass through unless a request has been made
 9             if (toRequest == NOT_SET) {
10                 // we pass through to the next producer as nothing has been requested
11                 passToSubscriber = true;
12             }
13         }
14     }
15     // do after releasing lock
16     if (passToSubscriber) {
17         subscriber.setProducer(producer);
18     } else {
19         // we execute the request with whatever has been requested (or Long.MAX_VALUE)
20         if (toRequest == NOT_SET) {
21             producer.request(Long.MAX_VALUE);
22         } else {
23             producer.request(toRequest);
24         }
25     }
26 }

这里会走到21行的代码,跟进去FromArrayProducer的request方法:

public void request(long n) {
    if (n < 0) {
        throw new IllegalArgumentException("n >= 0 required but it was " + n);
    }
    if (n == Long.MAX_VALUE) {
        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
            fastPath();
        }
    } else
    if (n != 0) {
        if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
            slowPath(n);
        }
    }
}

对于fastPath()的代码如下:

void fastPath() {
    final Subscriber<? super T> child = this.child;

    for (T t : array) {
        if (child.isUnsubscribed()) {
            return;
        }
        child.onNext(t);
    }
    if (child.isUnsubscribed()) {
        return;
    }
    child.onCompleted();
}

执行onNext()方法,其中t就是Observable.just()方法的参数。child就是observable.subscribe的定义的参数。

友情链接

时间: 2024-08-03 14:06:55

java框架---->RxJava的使用(一)的相关文章

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现

Android异步框架RxJava 1.x系列(一) - 观察者模式及实现 前言 RxJava 是一款基于 Java VM 实现的响应式编程扩展库 - 基于观察者模式的异步和事件处理框架.RxJava 官方目前同时维护了两个版本,分别是 1.x 和 2.x,区别是它们使用不同的 group id 和 namespaces. 版本 group id namespaces v1.x io.reactivex io.reactivex v2.x io.reactivex.rxjava2 rx 本系列的

如何查看JDK以及JAVA框架的源码

如何查看JDK以及JAVA框架的源码 设置步骤如下: 1.点 “window”-> "Preferences" -> "Java" -> "Installed JRES" 2.此时"Installed JRES"右边是列表窗格,列出了系统中的 JRE 环境,选择你的JRE,然后点边上的 "Edit...", 会出现一个窗口(Edit JRE) 3.选中rt.jar文件的这一项:“c:\pr

JAVA框架整合(struct+spring+jpa)之utf8mb4

基于mysql数据库整合框架时出现的数据库字符编码的问题. 解决办法:数据库驱动的版本太高,换成低版本的,主要还是你的架包太不是最新版本,出现的兼容性问题. JAVA框架整合(struct+spring+jpa)之utf8mb4,布布扣,bubuko.com

最简单的Java框架

框架framework的目的是定义骨架式方案,处理各种相同的底层细节:而开发人员使用框架时,能够依照自己的需求实现自己的功能--仅仅须要填入自己的东西/flesh. 最简单的框架,类似于JUnit,它有一个main(String[] args)启动本框架.假设是applet或GUI框架,就有太多底层细节须要处理. package principle.callback.lower; /** * 最简单的Java框架 * * @author yqj2065 * @version 2014.10 */

Java框架服务

Java从诞生到现在,一路飙升,可以说红遍全球,红到发紫.随着Java的流行,促生了许多java框架:Spring.WebWork.Struts.HIbernate.JDiy.JFinal.Quartz.Welocity.IBATIS.Compiere ERP&CRM Spring Framework[Java开源J2EE框架] Spring是一个解决了许多在J2EE开发中常见的问题的强大框架.Spring提供了管理业务对象的一致方法并且鼓励了注入对接口编程而不是对类编程的良好习惯.Spring

将网页另存为图片的Java框架

首先要了解的是java在图像这一块非常弱.用java实现截图倒不难,原理吗就是把当前屏幕存成一个图,然后获取鼠标拉去的想去位置然后把截取的图保存到panel里边,再生成图片即可.那么这里要说什么呢?好吧下面就说几个将网页保存为图片的框架: 1.html2image 网上炒这个还不少呢.我说这个就是原声的java代码进行封装的一个jar包.效果非常差,代码就不贴了网上好多. 2.cobra 如果你不知道这个的话,你应该听说过lobobrowser,纯java实现的浏览器,测试了下,除了启动慢的要死

如何在Eclipse中查看JDK以及Java框架的源码

对于Java程序员来说,有时候是需要查看JDK或者一些Java框架的源码来分析问题的,而默认情况下,你按住Ctrl,再点击 Java本身的类库(例如ArrayList)是无法查看源码的,那么如何在Eclipse中查看JDK以及Java框架的源码呢?下面,跟着我 一起,一步步带你走进源码的世界. 方法一:快速简单 第一步: 打开你的Eclipse,然后随便找一个Java文件,随便找一个Java类库,比如String什么的,然后按住Ctrl,再点击它,你会发现跳到如下界面: 你会发现报错了:Sour

Java框架介绍-13个不容错过的框架项目

本文转自互联网,个人收藏所用. 下面,我们将一同分享各有趣且颇为实用的Java库,大家请任取所需.不用客气~ 1.极致精简的Java Bootique是一项用于构建无容器可运行Java应用的极简技术.该项目允许大家创建REST服务.Web应用.任务.数据库迁移等等,且一切都立足于模块实现.另外,大家也可以将其作为简单的命令进行使用. 该项目的目标在于将应用从Java容器中解放出来,允许开发者重新回归main()方法.另外其中还包含部分内置命令,因此就算各位需要处理的代码量不多或者并未向应用中导入

java框架——经典的Hibernate

在编程开发中,我们有很多框架,他们有些很方便,也很实用,今天我们一起来认识一个java经典的框架Hibernate,Hibernate英文名称为"冬眠",这是个很有意思的技术,同样这个名字也很巧妙!大家先看看这个技术的书籍在国外的封面: 简介: 大家知道冬眠的动物是静止的,在冻僵的边缘,而Hibernate的一个巨大作用就是对数据持久化的实现,这两者有什么联系? 简单介绍数据持久化: 数据持久化是解决程序与数据库之间交互访问的问题,使程序不直接访问数据库,而是直接访问Session会话