[关闭]
@Loong-T 2015-07-21T06:40:27.000000Z 字数 3108 阅读 1934

草稿 - Rx Framework

使用 Rx 框架最简步骤可分为:

  1. Observable:事件源
  2. Observer:处理事件
  3. Schedulers:指定线程
  1. Observable.just("one", "two", "three", "four", "five") // 1
  2. .subscribeOn(Schedulers.newThread()) // 3
  3. .observeOn(AndroidSchedulers.mainThread()) // 3
  4. .subscribe(/* an Observer */); // 2

marble diagrams 说明

上图是 Rx 官网对于 Operator 解释图片的说明。

其中有一些术语需要注意:emit、item、transformation。

Observer

Observer 对 item 有三种响应动作:

对 onNext 的调用称为 emissions,对其他两个的调用称为 notifications。

Subscriber

Subscriber 是对 Observer 的特殊实现,实现了 unsubscribe 方法。此方法可以解除对 Observable 的订阅。如果 Observable 没有任何 Observer 订阅,则会停止 item 的分发。但这个停止的过程并不是实时的。

Observable

Observable 分为 Hot 和 Cold 两种类型。Hot 类型会在创建后立刻开始分发 item;Cold 类型会等待一个 Observer 订阅后才开始分发 item。

一般来说 cold 类型的 Observable 会是数据库查询、网络请求等。Hot 类型一般是键鼠事件、系统事件等。

在一些 Rx 实现中,还有一种 Connectable 类型,无论当前有没有 Observer订阅,在调用 connect 方法后才开始分发 item。

Operator

Rx 提供了大量的操作符,大部分函数式方法都有对应的操作符。

应该浏览一下官网对于操作符分类的简短说明。同页面中对于如何选择操作符的决策树也非常有帮助。

每个操作符的实现都配图进行了解释,对理解操作符的工作方式非常有帮助。

Scheduler

Scheduler 在多线程编程中用于指定 Observable 和 Observer 运行的线程。

ObserveOn 指定 Observer 的运行线程;SubcribeOn 指定 Observable 的运行线程。

Single

Single 是 Observable 的变种。区别在于,Single 在它的 timeline 上只会分发一个 item。

订阅了 Single 的 Observer 只有两种响应动作:onError 和 onSuccess。

Subject

Subject 既是 Observer 又是 Observable。

AsyncSubject

AsyncSubject 会把从原 Observable 收到的最后一个 item 继续分发下去。如果原 Observable 发出的是错误 notification,那么 AsyncSubject 发出的也是 error。

BehaviorSubject

当 Observer 订阅了 BehaviorSubject 后,先会收到订阅前发出的最新的一个 item(如果没有,则发出默认设置的 item),然后正常收到接下来的 item。

PublishSubject

在创建后立刻开始发出 items,订阅的 Observer 只会收到订阅之后发出的 items。

ReplaySubject

ReplaySubject 会将创建以来的所有 items 向订阅的 Observer 发送一遍,不管 Observer 是在什么时候订阅的。

Backpressure

当 Observable 发出 item 的速度远快于 Observer 的处理速度时,可能会占用大量的系统资源。

Rx 提供了一些操作符在这种场景下使用。

Throttling(节流?只将大量 items 中的一部分继续分发下去)

sample 或 throttleLast

这个操作符会将指定的时间间隔内的 items 的最后一个分发出来,并且丢弃掉前面的 items。

throttleFirst

与上方相似,将一段时间内的第一个 item 发出,丢弃掉其后面的 items。

debounce 或 throttleWithTimeout

指定一个时间段,如果在这个时间段内没有接收到下一个 item,就将前面接收到的 items 分为一组,然后将这个组的最后一个 item 分发出去,其余的丢弃。

Buffers and windows

buffer

指定时间段或者数量,将这些 items 收集成 List,然后将这个 List item 分发出去。

window

指定时间段或者数量,将这些 items 分别构建成 Observable,然后将 Observable 按批次分发出去。

Error Handling

Observable 通常不会抛出异常,而是发出 onError 的 notification。如果 onError 中出现错误时,会抛出一个 RuntimeException,可能是 OnErrorFailedException 或者 OnErrorNotImplementedException。

从 onError notification 中恢复

  1. 消费掉 error,切换到另一个备用的 Observable 来继续执行
  2. 消费掉 error,发出一个默认的 item
  3. 消费掉 error,立即重新执行出错的 Observable
  4. 消费掉 error,在一段时间后重新执行出错的 Observable

详细需要查看 Error Handling Operator 的文档。

RxJava 中的 Exception

RxAndroid

生命周期

AppObservable 提供了对 activity 和 fragment 的 bind 支持。但需要注意的是,这样的方法只是确保了在生命周期结束后,不会有 notification 被传递给 activity 或 fragment(不会再调用 onNext、onComplete 或 onError)。还是需要在 onDestroy 中进行 unsubscribe。

在 bind 了之后,Observer 会默认在主线程进行。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注