[关闭]
@SmartDengg 2016-11-16T07:19:08.000000Z 字数 3181 阅读 1168

SubscribeOn与ObserveOn


单独看文章并没有什么意义,主要原因是,谢阿温先森_Gemini 的邀请与信任。为了节约时间成本,所以本文的阅读体验并不会特别舒服,还望各位见谅。

SubscribeOn

首先请允许我来解释一下官方对subscribeon的定义:

在多线程环境下,许多ReactiveX实现库都可以使用 “Scheduler”来实现Observable在线程间的切换。你可以通过使用Observable的.subscribeOn()操作符来指定Observable在一个特殊的线程上工作。

再看一下 RxJava1.1.1 中API是如何定义的:

  1. /**
  2. * Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.
  3. * <p>
  4. * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
  5. * <dl>
  6. * <dt><b>Scheduler:</b></dt>
  7. * <dd>you specify which {@link Scheduler} this operator will use</dd>
  8. * </dl>
  9. *
  10. * @param scheduler
  11. * the {@link Scheduler} to perform subscription actions on
  12. * @return the source Observable modified so that its subscriptions happen on the
  13. * specified {@link Scheduler}
  14. * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
  15. * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
  16. * @see #observeOn
  17. */
  18. public final Observable<T> subscribeOn(Scheduler scheduler) {
  19. if (this instanceof ScalarSynchronousObservable) {
  20. return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
  21. }
  22. return create(new OperatorSubscribeOn<T>(this, scheduler));
  23. }

第一句描述的很清晰,“在指定线程上,通过异步的方式订阅 Observers(观察者)至 Observable(被观察者)。”

然后是对return的描述“源Observable已经被修改,以便它的订阅事件能够发生在指定的Scheduler上”。

另外从你对subscribeOn的总结,我们依然能够得出一个结论,那就是.subscribeOn()操作符与.observeOn()不同,它不收集事件甚至不关心事件,他只关心ObservableSubscriber,因此它的.call()实现,特别的精妙,在.schedule()中通过调用了source.unsafeSubscribe(s);来线程间的切换。

于是:

.create()中的subscriber其实就是OperatorSubscribeOn类中的内部类Subscriber,也就是source.unsafeSubscribe(s);中的s

按照断点的方式,进来看一下,当在.created()中调用subscriber.onNext();,其实也就是调用了s.onNext()

source呢,是在指定scheduler中调用的,这是原封不动的源码,因此source所持有的OnSubscribe,也是就是Observable.OnSubscribe已经处在指定的scheduler中了:

值得一提的是,可别忘了即便是Observable.create()也属于rx-operator,也就是说,这个操作符对事件的处理是在指定的scheduler上的,至此,也正应了开头那句,官方对其的定义:“用Observable的.SubscribeOn()操作符来指定一个Observable在特殊的线程上执行工作。”

另外,我还要再强调的一点是,正如你所说的最神奇的地方发生在source.unsafeSubscribe(s)的调用时机是.schedule()的内部实现类所持有的.call()函数,所以回到源码的角度来讲,我们根本不需要知道s在何处声明,只需要关注source.unsafeSubscribe(s)在哪个inner中调用的就行了。这也就是为什么,离事件源最近的Scheduler也就是第一个.subscribeOn()才会起作用原因,而多次调用.subscribeOn()根本毫无意义。

为了避免疑惑的存在,官网还在对.subscriberon()的描述页放了这样的一张图:

尤其要注意红色框内的描述:

.subscribeOn()操作符通过指定一个不同的Scheduler改变了这种默认的行为(Observable所在线程,操作符对事件的处理以及通知观察者,这三种情况下的线程,默认情况下与调用.subscribe()时所在线程保持一致),以便Observable能够在该Scheduler上处理事件。Observable使用.ObserveOn()操作符所指定的Scheduler将事件发送至Observer(观察者)。

当然把文档继续读下去,收获也许会更多:)

所以,到此为止,我能得出的结论,就是.subscribeOn()指定操作符处理事件时的所在线程,而.ObserveOn()则能够指定Observable在什么线程上将事件发送至Observer。而且不要忘记了Subscriber实现了Observer这样做不仅能够保存订阅状态,而且还能持有订阅链SubscriptionList(天啊,订阅链,我实在不了解大家究竟管他叫什么,这名字好难听+ +)。

ObserveOn

我本人认为OperatorObserveOnOperatorSubscribeOn最大的区别在于,因为前者是一个纯粹的Operator,所以无法持有Observable。我举一个看起来并不是那么恰当的例子,我的意思是指这个例子或许有些疏漏,那就是.subscribeOn()操作符能够改变整个“生产-消费者”所在“工作线”的Scheduler,从而达到线程切换的目的,而.observeOn()由于无法持有Observable,所以只能通过从指定Scheduler上发送通知至订阅它的observer这种手段来达到线程切换的目的。

一句话总结就是,.subscribeOn()能够改变observable.onSubscribe.call()所在Scheduler,从而改变Observable执行操作的线程,无论是上游还是下游的操作符,都能够影响到:

.observeOn()只能通过改变subscriber.onNext()所在Scheduler,从而达到线程切换的目的,因此它的线程切换更加受限,它无法影响到那些在它之前的操作符所在线程,它只能作用于后续订阅它的那些Observable

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注