@SmartDengg
2016-11-16T07:19:08.000000Z
字数 3181
阅读 1168
单独看文章并没有什么意义,主要原因是,谢阿温先森_Gemini 的邀请与信任。为了节约时间成本,所以本文的阅读体验并不会特别舒服,还望各位见谅。
首先请允许我来解释一下官方对subscribeon的定义:
在多线程环境下,许多ReactiveX实现库都可以使用 “Scheduler”来实现Observable在线程间的切换。你可以通过使用Observable的
.subscribeOn()
操作符来指定Observable在一个特殊的线程上工作。
再看一下 RxJava1.1.1 中API是如何定义的:
/**
* Asynchronously subscribes Observers to this Observable on the specified {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>you specify which {@link Scheduler} this operator will use</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source Observable modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
}
第一句描述的很清晰,“在指定线程上,通过异步的方式订阅 Observers
(观察者)至 Observable
(被观察者)。”
然后是对return的描述“源Observable已经被修改,以便它的订阅事件能够发生在指定的Scheduler上”。
另外从你对subscribeOn
的总结,我们依然能够得出一个结论,那就是.subscribeOn()
操作符与.observeOn()
不同,它不收集事件甚至不关心事件,他只关心Observable
与Subscriber
,因此它的.call()
实现,特别的精妙,在.schedule()
中通过调用了source.unsafeSubscribe(s);
来线程间的切换。
于是:
.create()
中的subscriber其实就是OperatorSubscribeOn
类中的内部类Subscriber
,也就是source.unsafeSubscribe(s);
中的s
按照断点的方式,进来看一下,当在.created()
中调用subscriber.onNext();
,其实也就是调用了s.onNext()
。
而source
呢,是在指定scheduler
中调用的,这是原封不动的源码,因此source
所持有的OnSubscribe
,也是就是Observable.OnSubscribe
已经处在指定的scheduler
中了:
值得一提的是,可别忘了即便是Observable.create()
也属于rx-operator,也就是说,这个操作符对事件的处理是在指定的scheduler
上的,至此,也正应了开头那句,官方对其的定义:“用Observable的.SubscribeOn()操作符来指定一个Observable在特殊的线程上执行工作。”
另外,我还要再强调的一点是,正如你所说的最神奇的地方发生在source.unsafeSubscribe(s)
的调用时机是.schedule()
的内部实现类所持有的.call()
函数,所以回到源码的角度来讲,我们根本不需要知道s
在何处声明,只需要关注source.unsafeSubscribe(s)
在哪个inner
中调用的就行了。这也就是为什么,离事件源最近的Scheduler
也就是第一个.subscribeOn()
才会起作用原因,而多次调用.subscribeOn()
根本毫无意义。
为了避免疑惑的存在,官网还在对.subscriberon()
的描述页放了这样的一张图:
尤其要注意红色框内的描述:
.subscribeOn()
操作符通过指定一个不同的Scheduler
改变了这种默认的行为(Observable
所在线程,操作符对事件的处理以及通知观察者,这三种情况下的线程,默认情况下与调用.subscribe()
时所在线程保持一致),以便Observable
能够在该Scheduler
上处理事件。Observable
使用.ObserveOn()
操作符所指定的Scheduler
将事件发送至Observer
(观察者)。
当然把文档继续读下去,收获也许会更多:)
所以,到此为止,我能得出的结论,就是.subscribeOn()
指定操作符处理事件时的所在线程,而.ObserveOn()
则能够指定Observable
在什么线程上将事件发送至Observer
。而且不要忘记了Subscriber
实现了Observer
这样做不仅能够保存订阅状态,而且还能持有订阅链SubscriptionList
(天啊,订阅链,我实在不了解大家究竟管他叫什么,这名字好难听+ +)。
我本人认为OperatorObserveOn
与OperatorSubscribeOn
最大的区别在于,因为前者是一个纯粹的Operator
,所以无法持有Observable
。我举一个看起来并不是那么恰当的例子,我的意思是指这个例子或许有些疏漏,那就是.subscribeOn()
操作符能够改变整个“生产-消费者”所在“工作线”的Scheduler
,从而达到线程切换的目的,而.observeOn()
由于无法持有Observable
,所以只能通过从指定Scheduler
上发送通知至订阅它的observer
这种手段来达到线程切换的目的。
一句话总结就是,.subscribeOn()
能够改变observable.onSubscribe.call()
所在Scheduler
,从而改变Observable
执行操作的线程,无论是上游还是下游的操作符,都能够影响到:
而.observeOn()
只能通过改变subscriber.onNext()
所在Scheduler
,从而达到线程切换的目的,因此它的线程切换更加受限,它无法影响到那些在它之前的操作符所在线程,它只能作用于后续订阅它的那些Observable
: