数据结构论坛

首页 » 分类 » 分类 » 24北京科技大学计算机考研数
TUhjnbcbe - 2021/5/19 18:36:00
哪家医院治白癜风好 http://wapyyk.39.net/bj/zhuanke/89ac7.html
前言

相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行。同样有很多同学已经开始在自己的项目中使用RxJava。它能够帮助我们在处理异步事件时能够省去那些复杂而繁琐的代码,尤其是当某些场景逻辑中回调中嵌入回调时,使用RxJava依旧能够让我们的代码保持极高的可读性与简洁性。不仅如此,这种基于异步数据流概念的编程模式事实上同样也能广泛运用在移动端这种包括网络调用、用户触摸输入和系统弹框等在内的多种响应驱动的场景。那么现在,就让我们一起分析一下RxJava的响应流程吧。(本文基于RxJava-1.1.3)

用法

首先来看一个简单的例子:

运行结果为:

从结果中我们不难看出整体的调用流程:

首先通过调用Observable.create()方法生成一个被观察者,紧接着在这里我们又调用了map()方法对原被观察者进行数据流的变换操作,生成一个新的被观察者(为何是新的被观察者后文会讲),最后调用subscribe()方法,传入我们的观察者,这里观察者订阅的则是调用map()之后生成的新被观察者。

在整个过程中我们会注意到三个主角:Observable、OnSubscribe、Subscriber,所有的操作都是围绕它们进行的。不难看出这里三个角色的分工:

Observable:被观察者的来源,亦或说是被观察者本身

OnSubscribe:用来通知观察者的不同行为

Subscriber:观察者,通过实现对应方法来产生具体的处理。

所以接下来我们以这三个角色为中心来分析具体的流程。

分析一、订阅过程首先我们进入Observable.create()看看:

这里调用构造函数生成了一个Observable对象并将传入的OnSubscribe赋给自己的成员变量onsubscribe,等等,这个hook是从哪里冒出来的?我们向上找:

RxJavaObservableExecutionHook这个抽象Proxy类默认对OnSubscribe对象不做任何处理,不过通过继承该类并重写onCreate()等方法我们可以对这些方法对应的时机做一些额外处理比如打Log或者一些数据收集方面的工作。

到目前最初始的被观察者已经生成了,我们再来看看观察者这边。我们知道通过调用observable.subscribe()方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:

这里我们略去部分无关代码看主要部分,subscribe.onStart()默认空实现我们暂且不用管它,对于传进来的subscriber要包装成SafeSubscriber,这个SafeSubscriber对原来的subscriber的一系列方法做了更完善的处理,包括:onError()与onCompleted()只会有一个被执行;保证一旦onError()或者onCompleted()被执行,将不再能再执onNext()等情况。这里封装为SafeSubscriber之后,调用onSubscribe.call(),并将subscriber传入,这样就完成了一次订阅。

显而易见,Subscriber作为观察者,在订阅行为完成后,其具体行为在整个链式调用中起着至关重要的作用,我们来看看它内部的构成的主要部分:

每个Subscriber都持有一个SubscriptionList,这个list保存的是所有该观察者的订阅事件,同时Subscriber也对应实现了Subscription接口,当这个Subscriber取消订阅的时候会将持有事件列表中的所有Subscription取消订阅,并且从此不再接受任何订阅事件。同时,通过Producer可以去限定该Subscriber所接收的数据流的总量,这个限制量其实是加在Subscriber.onNext()方法上的,onComplete()、onError()则不会受到其影响。因为是底层抽象类,onNext()、onComplete()、onError()统一不在这里处理。

二、变换过程

在收到Observable的消息之前我们有可能会对数据流进行处理,例如map()、flatMap()、deBounce()、buffer()等方法,本例中我们用了map()方法,它接收了原被观察者发射的数据并将通过该方法返回的结果作为新的数据发射出去,相当于做了一层中间转化:

我们接着看这个转化过程:

这里是通过一个lift()方法实现的,再查看其他的转化方法发现内部也都使用lift()实现的,看来这个lift()就是关键所在了,不过不急,我们先来看看这个OperationMap是什么:

OperationMap实现了Operator接口的call()方法,该方法接受外部传入的观察者,并将其作为参数构造出了一个新的观察者,我们不难发现o.onNext(transformer.call(t));这一句起了至关重要的作用,这里的接口transformer将泛型T转化为泛型R:

这样之后,再将转换后的数据传回至原观察者的onNext()方法,就完成了观察数据流的转化,但是你应该也注意到了,我们用来做转换的这个新的观察者并没有实现订阅被观察者的操作,这个订阅操作又是在哪里实现的呢?答案就是接下来的lift():

在这里我们新生成了一个Observable对象,在这个新对象的onSubscribe成员的call()方法中我们通过operator.call()拿到之前生成的未产生订阅的观察者st,之后将它作为参数传入一开始的onSubscribe.call()中,即完成了这个中间订阅的过程。现在我们将整个流程梳理一下:1、一次map()变换2、根据Operator实例生成新的Subscriber3、通过lift()生成新的Observable4、原Subscriber订阅新的Observavble5、新的Observable中onSubscribe通知新Subscriber订阅原Observable6、新Subscriber将消息传给原Subscriber。为了便于理解,这里借用一下扔物线的图:

以上就是一次map()变换的流程,事实上多次map()也是同样道理:最外层的目标Subscriber发生订阅行为后,onSubscribe.onNext()会逐层嵌套调用,直至初始Observable被最底层的Subscriber订阅,通过Operator的一层层变化将消息传到目标Subscriber。再次祭出扔物线的图:

至于其他的多种变化的实现流程也都很类似,借助于Operator的不同实现来达到变换数据流的目的。例如其中的flatMap(),它需要进行两次lift(),其中第二次是OperationMerge,将转换成的每一个Observable数据流通过InnerSubscriber这个纽带订阅后,在InnerSubscriber的onNext()中拿到R,再通过传入的parent(也就是原MergeSubscriber)将它们全部发射(emit)出去,由最外层我们传入的Subscriber统一接收,这样就完成了T=ObservableR=R的转化:

除此之外,还有许多各式各样的操作符,如果它们还不能满足你的需要,你也可以通过实现Operator接口定制新的操作符。灵活运用它们往往能达到事半功倍的效果,比如通过使用sample()、debounce()等操作符有效避免backpressure的需要等等,这里就不一一介绍了。

三、线程切换过程

从上文中我们知道了RxJava能够帮助我们对数据流进行灵活的变换,以达到链式结构操作的目的,然而它的强大不止于此。下面我们就来看看它的又一利器,调度器Scheduler:就像我们所知道的,Scheduler是给Observable数据流添加多线程功能所准备的,一般我们会通过使用subscribeOn()、observeOn()方法传入对应的Scheduler去指定数据流的每部分操作应该以何种方式运行在何种线程。对于我们而言,最常见的莫过于在非主线程获取并处理数据之后在主线程更新UI这样的场景了:

这是我们十分常见的调用方法,一气呵成就把不同线程之间的处理都搞定了,因为是链式所以结构也很清晰,我们现在来看看这其中的线程切换流程。

1、subscribeOn()

当我们调用subscribeOn()的时候:

可以看到这里也是调用了create()去生成一个Observable,而OperatorSubscribeOn则是实现了OnSubscribe接口,同时将原始的Observable和我们需要的scheduler传入:

可以看出来,这里对subscriber的处理与前文中OperatorMap中call()对subscriber的处理很相似。在这里我们同样会根据传入的subscriber构造出新的Subscribers,不过这一系列的过程大部分都是由worker通过schedule()去执行的,从后面setProducer()中对于线程的判断,再结合subscribeOn()方法的目的我们能大概推测出,这个worker在一定程度上就相当于一个新线程的代理执行者,schedule()所实现的与Thread类中run()应该十分类似。我们现在来看看这个worker的执行过程。首先从Schedulers.io()进入:

这个通过hook拿到scheduler的过程我们先不管,直接进CachedThreadScheduler,看它的createWorker()方法:

这里的pool是一个原子变量引用AtomicReference,所持有的则是CachedWorkerPool,因而这个pool顾名思义就是用来保存worker的缓存池啦,我们从缓存池里拿到需要的worker并作了一层封装成为EventLoopWorker:

在这里我们终于发现目标ThreadWorker,它继承自NewThreadWorker,之前的schedule()方法最终都会到这个scheduleActual()方法里:

这里我们看到了executor线程池,我们用Schedulers.io()最终实现的线程切换的本质就在这里了。现在再结合之前的过程我们从头梳理一下:

在subscribeOn()时,我们会新生成一个Observable,它的成员onSubscribe会在目标Subscriber订阅时使用传入的Scheduler的worker作为线程调度执行者,在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber,这个Subscriber又会通知到目标Subscriber,这样就完成了subscribeOn()的过程。

2、observeOn()

下面我们接着来看看observeOn():

我们直接看最终调用的部分,可以看到这里又是一个lift(),在这里传入了OperatorObserveOn,它与OperatorSubscribeOn不同,是一个Operator(Operator的功能我们上文中已经讲过就不赘述了),它构造出了新的观察者ObserveOnSubscriber并实现了Action0接口:

可以看出来,这里ObserveOnSubscriber所有的发送给目标Subscriberchild的消息都被切换到了recursiveScheduler的线程作处理,也就达到了将线程切回的目的。总结observeOn()整体流程如下:

对比subscribeOn()和observeOn()这两个过程,我们不难发现两者的区别:subscribeOn()将初始Observable的订阅事件整体都切换到了另一个线程;而observeOn()则是将初始Observable发送的消息切换到另一个线程通知到目标Subscriber。前者把“订阅+发送”的切换了一个线程,后者把“发送”切换了一个线程。所以,我们的代码中所实现的功能其实是:

这样就能很容易实现耗时任务在子线程操作,在主线程作更新操作等这些常见场景的功能啦。

四、其他角色Subject

Subject在Rx系列是一个比较特殊的角色,它继承了Observable的同时也实现了Observer接口,也就是说它既可作为观察者,也可作为被观察者,他一般被用来作为连接多个不同Observable、Observer之间的纽带。可能你会奇怪,我们不是已经有了像map()、flatMap()这类的操作符去变化Observable数据流了吗,为什么还要引入Subject这个东西呢?这是因为Subject所承担的工作并非是针对Observable数据流内容的转换连接,而是数据流本身在Observable、Observer之间的调度。光这么说可能还是很模糊,我们举个《RxJavaEssentials》中的例子:

我们通过create()创建了一个PublishSubject,观察者成功订阅了这个subject,然而这个subject却没有任何数据要发送,我们只是知道他未来会发送的会是String值而已。之后,当我们调用subject.onNext()时,消息才被发送,Observer的onNext()被触发调用,输出了"HelloWorld"。这里我们注意到,当订阅事件发生时,我们的subject是没有产生数据流的,直到它发射了"HelloWorld",数据流才开始运转,试想我们如果将订阅过程和subject.onNext()调换一下位置,那么Observer就一定不会接受到"HelloWorld"了(这不是废话吗--

),因而这也在根本上反映了Observable的冷热区别。一般而言,我们的Observable都属于ColdObservables,就像看视频,每次点开新视频我们都要从头开始播放;而Subject则默认属于HotObservables,就像看直播,视频数据永远都是新的。基于这种属性,Subject自然拥有了对接收到的数据流进行选择调度等的能力了,因此,我们对于Subject的使用也就通常基于如下的思路:

在前面的例子里我们用到的是PublishSubject,它只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。等一下,这功能听起来是不是有些似曾相识呢?

没错,就是EventBus和Otto。(RxJava的出现慢慢让Otto退出了舞台,现在Otto的Repo已经是Deprecated状态了,而EventBus依旧坚挺)基于RxJava的观察订阅取消的能力和PublishSubject的功能,我们十分容易就能写出实现了最基本功能的简易事件总线框架:

当然Subject还有其他如BehaviorSubject、ReplaySubject、AsyncSubject等类型,大家可以去看官方文档,写得十分详细,这里就不介绍了。

后记

前面相信最近这段日子里,提到RxJava,大家就会想到Google最近刚刚开源的Agera。Agera作为专门为Android打造的ReactiveProgramming框架,难免会被拿来与RxJava做对比。本文前面RxJava的主体流程分析已近尾声,现在我们再来看看Agera这东东又是怎么一回事。

首先先上结论(

1
查看完整版本: 24北京科技大学计算机考研数