@cxm-2016
2016-12-17T03:42:36.000000Z
字数 10254
阅读 5953
RxJava学习指南
版本:1
整理:陈小默
延迟一段指定的时间再发射来自Observable的发射物

Delay操作符让原始Observable在发射每项数据之前都暂停一段指定的时间段。效果是Observable发射的数据项在时间上向前整体平移了一个增量。
RxJava的实现是 delay和delaySubscription。

第一种delay接受一个定义时长的参数(包括数量和单位)。每当原始Observable发射一项数据,delay就启动一个定时器,当定时器过了给定的时间段时,delay返回的Observable发射相同的数据项。
注意:delay不会平移onError通知,它会立即将这个通知传递给订阅者,同时丢弃任何待发射的onNext通知。然而它会平移一个onCompleted通知。
delay默认在computation调度器上执行,你可以通过参数指定使用其它的调度器。

另一种delay不实用常数延时参数,它使用一个函数针对原始Observable的每一项数据返回一个Observable,它监视返回的这个Observable,当任何那样的Observable终止时,delay返回的Observable就发射关联的那项数据。
这种delay默认不在任何特定的调度器上执行。

这个版本的delay对每一项数据使用一个Observable作为原始Observable的延时定时器。
这种delay默认不在任何特定的调度器上执行。

还有一个操作符delaySubscription让你你可以延迟订阅原始Observable。它结合搜一个定义延时的参数。
delaySubscription默认在computation调度器上执行,你可以通过参数指定使用其它的调度器。

还有一个版本的delaySubscription使用一个Obseable而不是一个固定的时长来设置订阅延时。
这种delaySubscription默认不在任何特定的调度器上执行。
注册一个动作作为原始Observable生命周期事件的一种占位符

你可以注册回调,当Observable的某个事件发生时,Rx会在与Observable链关联的正常通知集合中调用它。Rx实现了多种操作符用于达到这个目的。
RxJava实现了很多Do操作符的变体。

doOnEach操作符让你可以注册一个回调,它产生的Observable每发射一项数据就会调用它一次。你可以以Action的形式传递参数给它,这个Action接受一个onNext的变体Notification作为它的唯一参数,你也可以传递一个Observable给doOnEach,这个Observable的onNext会被调用,就好像它订阅了原始的Observable一样。

doOnNext操作符类似于doOnEach(Action1),但是它的Action不是接受一个Notification参数,而是接受发射的数据项。
示例代码
Observable.just(1, 2, 3).doOnNext(new Action1<Integer>() {@Overridepublic void call(Integer item) {if( item > 1 ) {throw new RuntimeException( "Item exceeds maximum value" );}}}).subscribe(new Subscriber<Integer>() {@Overridepublic void onNext(Integer item) {System.out.println("Next: " + item);}@Overridepublic void onError(Throwable error) {System.err.println("Error: " + error.getMessage());}@Overridepublic void onCompleted() {System.out.println("Sequence complete.");}});
输出
Next: 1Error: Item exceeds maximum value

doOnSubscribe操作符注册一个动作,当观察者订阅它生成的Observable它就会被调用。
doOnUnsubscribe操作符注册一个动作,当观察者取消订阅它生成的Observable它就会被调用。
doOnCompleted 操作符注册一个动作,当它产生的Observable正常终止调用onCompleted时会被调用。

doOnError 操作符注册一个动作,当它产生的Observable异常终止调用onError时会被调用。

doOnTerminate 操作符注册一个动作,当它产生的Observable终止之前会被调用,无论是正常还是异常终止。

finallyDo 操作符注册一个动作,当它产生的Observable终止之后会被调用,无论是正常还是异常终止。
Materialize将数据项和事件通知都当做数据项发射,Dematerialize刚好相反。

一个合法的有限的Obversable将调用它的观察者的onNext方法零次或多次,然后调用观察者的onCompleted或onError正好一次。Materialize操作符将这一系列调用,包括原来的onNext通知和终止通知onCompleted或onError都转换为一个Observable发射的数据序列。
RxJava的materialize将来自原始Observable的通知转换为Notification对象,然后它返回的Observable会发射这些数据。
materialize默认不在任何特定的调度器 (Scheduler) 上执行。

Dematerialize操作符是Materialize的逆向过程,它将Materialize转换的结果还原成它原本的形式。
dematerialize反转这个过程,将原始Observable发射的Notification对象还原成Observable的通知。
dematerialize默认不在任何特定的调度器 (Scheduler) 上执行。
指定一个观察者在哪个调度器上观察这个Observable

很多ReactiveX实现都使用调度器 "Scheduler"来管理多线程环境中Observable的转场。你可以使用ObserveOn操作符指定Observable在一个特定的调度器上发送通知给观察者 (调用观察者的onNext, onCompleted, onError方法)。

注意:当遇到一个异常时ObserveOn会立即向前传递这个onError终止通知,它不会等待慢速消费的Observable接受任何之前它已经收到但还没有发射的数据项。这可能意味着onError通知会跳到(并吞掉)原始Observable发射的数据项前面,正如图例上展示的。
SubscribeOn操作符的作用类似,但它是用于指定Observable本身在特定的调度器上执行,它同样会在那个调度器上给观察者发通知。
RxJava中,要指定Observable应该在哪个调度器上调用观察者的onNext, onCompleted, onError方法,你需要使用observeOn操作符,传递给它一个合适的Scheduler。
强制一个Observable连续调用并保证行为正确

一个Observable可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让Observable行为不正确,它可能会在某一个onNext调用之前尝试调用onCompleted或onError方法,或者从两个不同的线程同时调用onNext方法。使用Serialize操作符,你可以纠正这个Observable的行为,保证它的行为是正确的且是同步的。
RxJava中的实现是serialize,它默认不在任何特定的调度器上执行。
操作来自Observable的发射物和通知
Subscribe操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。
Subscribe操作符的一般实现可能会接受一到三个方法(然后由观察者组合它们),或者接受一个实现了包含这三个方法的接口的对象(有时叫做Observer或Subscriber):
onNext
每当Observable发射了一项数据它就会调用这个方法。这个方法的参数是这个Observable发射的数据项。
onError
Observable调用这个方法表示它无法生成期待的数据或者遇到了其它错误。这将停止Observable,它在这之后不会再调用onNext或onCompleted。onError方法的参数是导致这个错误的原因的一个表示(有时可能是一个Exception或Throwable对象,其它时候也可能是一个简单的字符串,取决于具体的实现)。
onCompleted
如果没有遇到任何错误,Observable在最后一次调用onCompleted之后会调用这个方法。
如果一个Observable直到有一个观察者订阅它才开始发射数据项,就称之为"冷"的Observable;如果一个Observable可能在任何时刻开始发射数据,就称之为"热"的Observable,一个订阅者可能从开始之后的某个时刻开始观察它发射的数据序列,它可能会错过在订阅之前发射的数据。
RxJava中的实现是subscribe方法。
如果你使用无参数的版本,它将触发对Observable的一个订阅,但是将忽略它的发射物和通知。这个操作会激活一个"冷"的Observable。
你也可以传递一到三个函数给它,它们会按下面的方法解释:
onNextonNext和onErroronNext, onError和onCompleted最后,你还可以传递一个Observer或Subscriber接口给它,Observer接口包含这三个以on开头的方法。Subscriber接口也实现了这三个方法,而且还添加了几个额外的方法,用于支持使用反压操作(reactive pull backpressure),这让Subscriber可以在Observable完成前取消订阅。
subscribe方法返回一个实现了Subscription接口的对象。这个接口包含unsubscribe方法,任何时刻你都可以调用它来断开subscribe方法建立的Observable和观察者之间的订阅关系。
forEach方法是简化版的subscribe,你同样可以传递一到三个函数给它,解释和传递给subscribe时一样。
不同的是,你无法使用forEach返回的对象取消订阅。也没办法传递一个可以用于取消订阅的参数。因此,只有当你明确地需要操作Observable的所有发射物和通知时,你才应该使用这个操作符。
BlockingObservable类中也有一个类似的叫作forEach的方法。详细的说明见 BlockingObservable
指定Observable自身在哪个调度器上执行

很多ReactiveX实现都使用调度器 "Scheduler"来管理多线程环境中Observable的转场。你可以使用SubscribeOn操作符指定Observable在一个特定的调度器上运转。
ObserveOn操作符的作用类似,但是功能很有限,它指示Observable在一个指定的调度器上给观察者发通知。
在某些实现中还有一个UnsubscribeOn操作符。
将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable

TimeInterval操作符拦截原始Observable发射的数据项,替换为发射表示相邻发射物时间间隔的对象。
RxJava中的实现为timeInterval,这个操作符将原始Observable转换为另一个Obserervable,后者发射一个标志替换前者的数据项,这个标志表示前者的两个连续发射物之间流逝的时间长度。新的Observable的第一个发射物表示的是在观察者订阅原始Observable到原始Observable发射它的第一项数据之间流逝的时间长度。不存在与原始Observable发射最后一项数据和发射onCompleted通知之间时长对应的发射物。
timeInterval默认在immediate调度器上执行,你可以通过传参数修改。
对原始Observable的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知

如果原始Observable过了指定的一段时长没有发射任何数据,Timeout操作符会以一个onError通知终止这个Observable。
RxJava中的实现为timeout,但是有好几个变体。

第一个变体接受一个时长参数,每当原始Observable发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始Observable没有发射另一项数据,timeout就抛出TimeoutException,以一个错误通知终止Observable。
这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。

这个版本的timeout在超时时会切换到使用一个你指定的备用的Observable,而不是发错误通知。它也默认在computation调度器上执行。

这个版本的timeout使用一个函数针对原始Observable的每一项返回一个Observable,如果当这个Observable终止时原始Observable还没有发射另一项数据,就会认为是超时了,timeout就抛出TimeoutException,以一个错误通知终止Observable。
这个timeout默认在immediate调度器上执行。

这个版本的timeout同时指定超时时长和备用的Observable。它默认在immediate调度器上执行。

这个版本的time除了给每一项设置超时,还可以单独给第一项设置一个超时。它默认在immediate调度器上执行。

同上,但是同时可以指定一个备用的Observable。它默认在immediate调度器上执行。
给Observable发射的数据项附加一个时间戳

RxJava中的实现为timestamp,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped<T>的数据的Observable,每一项都包含数据的原始发射时间。
timestamp默认在immediate调度器上执行,但是可以通过参数指定其它的调度器。
创建一个只在Observable生命周期内存在的一次性资源

Using操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。

using操作符接受三个参数:
当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。
using默认不在任何特定的调度器上执行。
将Observable转换为另一个对象或数据结构

ReactiveX的很多语言特定实现都有一种操作符让你可以将Observable或者Observable发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到Observable终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的Observable。
在某些ReactiveX实现中,还有一个操作符用于将Observable转换成阻塞式的。一个阻塞式的Ogbservable在普通的Observable的基础上增加了几个方法,用于操作Observable发射的数据项。

getIterator操作符只能用于BlockingObservable的子类,要使用它,你首先必须把原始的Observable转换为一个BlockingObservable。可以使用这两个操作符:BlockingObservable.from或the Observable.toBlocking。
这个操作符将Observable转换为一个Iterator,你可以通过它迭代原始Observable发射的数据集。

toFuture操作符也是只能用于BlockingObservable。这个操作符将Observable转换为一个返回单个数据项的Future,如果原始Observable发射多个数据项,Future会收到一个IllegalArgumentException;如果原始Observable没有发射任何数据,Future会收到一个NoSuchElementException。
如果你想将发射多个数据项的Observable转换为Future,可以这样用:myObservable.toList().toBlocking().toFuture()。

toFuture操作符也是只能用于BlockingObservable。这个操作符将Observable转换为一个Iterable,你可以通过它迭代原始Observable发射的数据集。

通常,发射多项数据的Observable会为每一项数据调用onNext方法。你可以用toList操作符改变这个行为,让Observable将多项数据组合成一个List,然后调用一次onNext方法传递整个列表。
如果原始Observable没有发射任何数据就调用了onCompleted,toList返回的Observable会在调用onCompleted之前发射一个空列表。如果原始Observable调用了onError,toList返回的Observable会立即调用它的观察者的onError方法。
toList默认不在任何特定的调度器上执行。

toMap收集原始Observable发射的所有数据项到一个Map(默认是HashMap)然后发射这个Map。你可以提供一个用于生成Map的Key的函数,还可以提供一个函数转换数据项到Map存储的值(默认数据项本身就是值)。
toMap默认不在任何特定的调度器上执行。

toMultiMap类似于toMap,不同的是,它生成的这个Map同时还是一个ArrayList(默认是这样,你可以传递一个可选的工厂方法修改这个行为)。
toMultiMap默认不在任何特定的调度器上执行。

toSortedList类似于toList,不同的是,它会对产生的列表排序,默认是自然升序,如果发射的数据项没有实现Comparable接口,会抛出一个异常。然而,你也可以传递一个函数作为用于比较两个数据项,这是toSortedList不会使用Comparable接口。
toSortedList默认不在任何特定的调度器上执行。

nest操作符有一个特殊的用途:将一个Observable转换为一个发射这个Observable的Observable。