前言
ReactiveX,是一套用可观察流来完成异步编程的api,它的官网是:
http://reactivex.io/
而RxJava,则是ReactiveX为Java语言平台提供的api,目前最新版本为2.2.5,github地址为:
https://github.com/ReactiveX/RxJava
博主也在项目中使用了RxJava相关的api,惊叹于其神奇的链式调用,消除了复杂的异步编程层层嵌套导致的回调地狱,并把逻辑流程梳理清晰
最近刚好有空,在阅读了RxJava的源码后,在此写下一篇博客记录一下体会
博主阅读的是RxJava2版本的源码,与RxJava1的版本源码有较大出入
本文旨在分析RxJava2源码中流程相关的部分,基础的使用方式以及一些操作符说明请自行查询相关文档
例子引入
假设我们这里有这么一个场景:
有一系列的学生数据,我们需要把他们的所有id打印到logCat,然后再把其中的偶数id再次打印,如果熟悉RxJava的童鞋肯定会写出下面相似的代码:
1 | Observable.create(new ObservableOnSubscribe<Student>() { |
简单介绍下上面代码完成的事情:
- 我们通过Observable#create方法,创建了一个发射学生Student数据的数据源
- 通过map操作符,我们从Student中获取他们的id
- 通过observeOn操作符,我们定义下面的工作将在一个新线程中执行
- 通过doOnNext操作符,我们打印出所有学生的id
- 通过filter操作符,我们过滤出其中的偶数学生id
- 通过subscribeOn操作符,我们定义数据源的发射操作在io线程中执行
- 通过observeOn操作符,我们定义下面的工作将在Android主线程中执行
- 通过subscribe方法,我们打印出剩下的偶数学生id
源码分析
值得注意的是,博主在RxJava的github例子上发现了一段话,它告诉了我们RxJava的流程设计是怎么样子的,相信当我们浏览完源码后再回来看会有更深的体会
原文如下:
This style of chaining methods is called a fluent API which resembles the builder pattern. However, RxJava’s reactive types are immutable; each of the method calls returns a new Flowable with added behavior.
这段话告诉了我们RxJava中的响应类型是不变的,每次方法的调用都是通过生成一个新的对象来添加新的行为
这很容易就让人联想到了使用装饰器模式实现的javaIo,看完源码之后我们会发现两者确实有共通之处
接下来我们一起来阅读下其中的源码,我们分为三个阶段,来看下RxJava真正的流程是怎样的
创建阶段
首先我们来看下响应类型对象的创建流程
就例子而言,我们经过一系列链式调用后(除去subscribe方法)得到的是一个Observable对象
首先我们先来看下Observable#create方法
Observable#create:
1 | public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { |
其中,第2行的ObjectHelper#requireNonNull用于检测传入的数据是否为null,因为在RxJava的数据流中是不允许出现null的,后续类似的代码我们将忽略
第3行的RxJavaPlugins是一个全局的工具类,它允许我们设置一些监听器,在相应的hook点执行我们自己的操作,这里我们默认什么也不做,只返回原本的对象,后续类似的代码我们也将忽略
综上,Observable#create方法返回了一个ObservableCreate对象,并把我们写的ObservableOnSubscribe匿名内部类当做构造参数传入持有
接下来我们来看看Observable#map方法
Observable#map:
1 | public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { |
map方法也很简单,返回了一个ObservableMap对象,并把Observable#create返回的ObservableCreate对象,以及我们写的Function匿名内部类当做构造参数传入持有
值得注意的是,这里返回的Observable的泛型参数已经改变了
接下来我们来看看Observable#observeOn方法
Observable#observeOn:
1 | public final Observable<T> observeOn(Scheduler scheduler) { |
该方法返回了一个ObservableObserveOn对象,持有了map返回的ObservableMap对象,以及我们传入的调度器Schedulers.newThread()
接下来我们来看看Observable#doOnNext方法
Observable#doOnNext:
1 | public final Observable<T> doOnNext(Consumer<? super T> onNext) { |
该方法返回了一个ObservableDoOnEach对象,持有了observeOn返回的ObservableObserveOn对象,我们传入的Consumer对象以及一些我们并不关心的其他东西
接下来我们来看看Observable#filter方法
Observable#filter:
1 | public final Observable<T> filter(Predicate<? super T> predicate) { |
该方法返回了一个ObservableFilter对象,持有了doOnNext返回的ObservableDoOnEach对象,以及我们传入的Predicate对象
接下来我们来看看Observable#subscribeOn方法
Observable#subscribeOn:
1 | public final Observable<T> subscribeOn(Scheduler scheduler) { |
该方法返回了一个ObservableSubscribeOn对象,持有了filter返回的ObservableFilter对象,以及我们传入的调度器Schedulers.io()
接下来又调用了Observable#observeOn方法,返回了一个ObservableObserveOn对象,持有了subscribeOn返回的ObservableSubscribeOn对象,以及我们传入的调度器AndroidSchedulers.mainThread()
可见,在创建阶段,我们获得了一个“层层包裹”的ObservableObserveOn对象
就像是在使用java.io中的stream一样,通过装饰器模式“层层包裹”来增强相应的功能:
1 | new BufferedReader(new InputStreamReader(new ByteArrayInputStream())) |
创建阶段具体流程可用下图表示:
订阅阶段
接下来,我们对创建阶段得到的ObservableObserveOn对象调用了subscribe方法,这是在父类Observable中的一个final方法,我们一起来看下
Observable#subscribe:
1 | public final Disposable subscribe(Consumer<? super T> onNext) { |
在代码第11行,我们可以看到我们传入的监听器onNext被包装成为了一个LambdaObserver对象
接着在第24行,以该对象作为参数调用了ObservableObserveOn#subscribeActual
ObservableObserveOn#subscribeActual:
1 |
|
在subscribeActual方法中,我们传入的scheduler是AndroidSchedulers.mainThread(),它是一个HandlerScheduler,因此走的是分支语句的else逻辑
在这里,我们的LambdaObserver对象被包装成为一个ObserveOnObserver对象,并与scheduler创建的Worker绑定在了一起
接着调用了source的subscribe方法,这里的source即是我们构造ObservableObserveOn时传入的“上游”数据源————ObservableSubscribeOn对象
由于Observable#subscribe是一个final方法,无法被子类重写,并且它最终的逻辑都会调用子类的subscribeActual方法,这里我们直接看ObservableSubscribeOn#subscribeActual
ObservableSubscribeOn#subscribeActual:
1 |
|
首先在第3行,我们的ObserveOnObserver对象被包装成为了SubscribeOnObserver对象
接着在第5行,调用了ObserveOnObserver#onSubscribe
ObserveOnObserver#onSubscribe:
1 |
|
经过一系列检查后,最终会调用actual对象的onSubscribe方法,而actual对象即是我们构造ObserveOnObserver对象时传入的LambdaObserver对象
LambdaObserver#onSubscribe:
1 |
|
因为我们再例子中没有传入相应的监听器,所以这里onSubscribe是由Functions.emptyConsumer()创建的,什么也不会做
如果我们传入了相应的监听器,则会在当前线程收到开始订阅的回调通知(onSubscribe)
回到subscribeActual这里,接着第7行调用了传入的scheduler的scheduleDirect方法,执行了特定的任务SubscribeTask
这里我们不去深究具体的代码,由于我们传入的是Schedulers.io(),因此容易得知SubscribeTask#run是在io线程中执行的
在第18行,此时我们已经处于io线程中,同上,我们以SubscribeOnObserver对象作为参数,调用了“上游”数据源source————ObservableFilter的subscribe方法
ObservableFilter#subscribeActual:
1 |
|
逻辑比较简单,把传入的SubscribeOnObserver对象与构造时传入的Predicate对象绑定在一起,包装成FilterObserver对象后,调用了“上游”数据源source————ObservableDoOnEach的subscribe方法
ObservableDoOnEach#subscribeActual:
1 |
|
逻辑类似,把传入的FilterObserver对象与构造时传入的Consumer对象绑定在一起,包装成DoOnEachObserver对象后,调用了“上游”数据源source————ObservableObserveOn的subscribe方法
与上面提到的一样,ObservableObserveOn#subscribeActual会把DoOnEachObserver对象与scheduler(Schedulers.newThread())创建的Worker绑定在了一起,包装成ObserveOnObserver对象,交给“上游”数据源————ObservableMap
ObservableMap#subscribeActual:
1 |
|
逻辑类似,把传入的ObserveOnObserver对象与构造时传入的Function对象绑定在一起,包装成MapObserver对象后,交给了“上游”数据源source————ObservableCreate
ObservableCreate#subscribeActual:
1 |
|
在第3行,传入的MapObserver对象被包装成了CreateEmitter
然后在第4行调用了MapObserver#onSubscribe,不过由于我们前面已经回调了订阅事件,因此这里实际上并不会回调到我们的监听器中,有兴趣的童鞋可以进一步看下相关源码
在第7行,调用了“上游”数据源source————我们构造的ObservableOnSubscribe匿名内部类的subscribe方法
至此,订阅阶段结束,这次被“层层包裹”的是我们的监听器Observer
订阅阶段的流程图如下所示:
发射数据阶段
在订阅阶段的最后,我们调用了ObservableOnSubscribe匿名内部类的subscribe方法,进入了发射数据的阶段
首先,让我们一起来回顾下例子中的代码做了什么:
1 | Observable.create(new ObservableOnSubscribe<Student>() { |
在第4行,我们首先通过getStudentList方法获得了学生数据列表,然后在第6行调用了emitter对象的onNext方法把数据发送了出去
根据上文分析,这里的emitter对象实际类型为CreateEmitter,让我们来看下相关的代码
CreateEmitter#onNext
1 | final Observer<? super T> observer; |
逻辑比较简单,首先检查了发送的数据是否为null(因为在RxJava数据流中不允许任何null出现)
然后如果是还没Disposed的情况下,把数据传递给了observer对象
从构造函数可以看出,observer对象实际类型为订阅阶段传入的MapObserver对象,我们接着看下相关代码
MapObserver#onNext
1 | final Function<? super T, ? extends U> mapper; |
在第15行,调用了我们在订阅阶段时绑定的Function对象进行映射(map)转换
在进行了非空检查后,调用了actual对象,即ObserveOnObserver对象的onNext方法,继续数据的传递
ObserveOnObserver#onNext
1 |
|
在第6行,ObserveOnObserver首先把数据加入了一个队列中,然后再第8行调用了schedule方法开启线程调度
在第13行中,我们看到调用了worker对象的schedule方法,并传入了参数this
这里我们不去深究Scheduler调度器的具体实现,根据上下文的含义,不难推断出,由于我们传入的调度器是Schedulers.newThread(),ObserveOnObserver实现的run方法将会在一个新的线程中被调用
在run方法中,由于我们没有设置结果合并到一起输出,因此进入的是第22行的drainNormal逻辑
最终在第61行,我们会把从队列中取出的数据,在新的线程中,继续传递给“下游”的监听器————DoOnEachObserver
DoOnEachObserver#onNext
1 | DoOnEachObserver( |
逻辑比较简单,交给了绑定的Consumer处理之后,继续把数据传递给“下游”的监听器————FilterObserver
FilterObserver#onNext
1 | final Predicate<? super T> filter; |
这里我们没有设置sourceMode,因此默认值为NONE,进入上半部分分支
FilterObserver使用订阅阶段绑定的Predicate对象,通过其test方法的返回值判断哪些数据允许往下传递
在例子中我们过滤条件为数据为偶数类型,因此test方法只有数据为偶数时才返回true,只有偶数数据才会被传递到“下游”————SubscribeOnObserver
SubscribeOnObserver#onNext
1 |
|
啥也没干,直接把数据往“下游”(ObserveOnObserver)传递
ObserveOnObserver上面我们已经分析过了,这里就不再重复分析了,它会在把线程切换到Android主线程后(这里的调度器是AndroidSchedulers.mainThread()),把数据传递给“下游”————LambdaObserver
LambdaObserver#onNext
1 | public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError, |
最终,LambdaObserver会把数据回调给我们一开始实现的Consumer匿名内部类
至此,发射数据阶段结束,其流程图如下所示:
总结
总而言之,RxJava的执行流程可以分为三个阶段:
- 创建阶段:使用装饰器模式“层层包裹”创建出一个 reactive type 对象(这里是Observable)
- 订阅阶段:使用装饰器模式“层层包裹”我们传入的监听器,不断调用“上游”的subscribe;回调开始订阅事件(onSubscribe),切换订阅线程
- 发射数据阶段:数据由顶层监听器向“下游”逐级传递,传递数据的同时执行相应的变换操作