[关闭]
@Loong-T 2015-07-21T06:41:06.000000Z 字数 3079 阅读 1884

草稿 - Rx 应用

基本

Rx 适合使用的场景:

  1. 需要对数据(事件)进行流式的处理
  2. 需要异步,非线性操作
  3. 回调过多,产生 callback hell
  4. 将异常统一处理,避免过多 try catch

subscribeOn 操作符指定 event 运行的线程,observeOn 指定 Observer 的运行线程。

subscribe 操作符的参数可以是 1~3 个 Action1,使用 lambda 来简化代码的情况下,可以取代参数为 Observer 的方法。参数为 Subscriber 时,可以多 Override 一个 onStart 的方法,执行顺序和线程为:onStart (main) -> Observable (background) -> onNext (main) -> onComplete or onError (main)

在 subscribe 调用后,会返回一个 Subscription 对象,需要在生命周期结束后调用其 unsubscribe 方法。如果有多个 Subscription,就使用 CompositeSubscription。

如果在某些情况下不知道如何应用 Rx,那就考虑下使用 Subject。

风格建议

Rx 的流式处理便于理解代码,所以不推荐在操作符中写太复杂的代码。代码块都尽可能短,隐藏具体的细节,只要表明进行了什么样的操作就可以。

理想情况是每个操作符的参数都限制在一行,超出的部分抽成方法。

链式调用的最后一个操作符内可以适当放宽这个要求。

常用操作符

just 和 from

从现有的 item 或 item 列表创建 Observable。

create

从 Observer 创建 Observable。

map

对 item 进行变换。

flatMap

从 item 创建 Observable。

sample 或 throttleLast

这个操作符会将指定的时间间隔内的 items 的最后一个分发出来,并且丢弃掉前面的 items。

throttleFirst

与上方相似,将一段时间内的第一个 item 发出,丢弃掉其后面的 items。

debounce 或 throttleWithTimeout

指定一个时间段,如果在这个时间段内没有接收到下一个 item,就将前面接收到的 items 分为一组,然后将这个组的最后一个 item 分发出去,其余的丢弃。

buffer

指定时间段或者数量,将这些 items 收集成 List,然后将这个 List item 分发出去。

window

指定时间段或者数量,将这些 items 分别构建成 Observable,然后将 Observable 按批次分发出去。

combineLatest

接收多个 Observable,每当从任意一个 Observable 接收到 item 时,会将每个 Observable 的最后一个 item 传递给指定的聚合方法,聚合方法负责处理这些 item。

代替 AsyncTask

  1. Observable.just(somethingBlockMainThread())
  2. .subscribeOn(Schedulers.newThread())
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .subscribe(/* an Observer */);
  1. AppObservable.bindActivity(activity, somethingBlockMainThread())
  2. .subscribeOn(Schedulers.io())
  3. .observeOn(AndroidSchedulers.mainThread())
  4. .subscribe(observer);

代替计时相关

使用 timer 操作符可以设置延迟进行或者间隔重复执行。

interval 操作符可设置间隔重复执行。

配合 take 等操作符,可以组合成重复执行固定次数的任务。

监听数据变化

WidgetObservable 提供了对 TextView、CompoundButton、AdapterView、AbsListView 的事件支持。

自定义监听的例子:

  1. public static <T extends View> Observable<T> clicksFrom(T view) {
  2. PublishSubject publishSubject = PublishSubject.create();
  3. view.setOnClickListener((v) -> publishSubject.onNext(view));
  4. return publishSubject.asObservable();
  5. }

使用 Rx 来监听事件的好处主要体现在其他操作符的支持。

比如当文字变化时,通常的做法是添加 TextWatcher,会在每一次变化时进行一些操作(校检数据合法性等)。但在用户快速输入文字时,会引起不必要的校验操作。

这种情况下,可以使用 Debounce 操作符,在一定的时间没有输入后,才会真正触发校验操作。

适合情景:

  1. 校验数据合法性
  2. Auto complete 相关的控件
  3. 搜索建议等
  1. // 无输入 400 millis 之后才显示搜索建议
  2. bindSupportFragment(this, WidgetObservable.text(searchText))
  3. .debounce(400, TimeUnit.MILLISECONDS)
  4. .observeOn(AndroidSchedulers.mainThread())
  5. .subscribe(searchObserver());

表单多个输入项联合校验

使用 combineLatest 操作符,每次触发 item 分发时,会将每个控件的最后一个事件传递给指定的方法。

  1. Observable.combineLatest(
  2. WidgetObservable.text(email),
  3. WidgetObservable.text(password),
  4. WidgetObservable.text(number),
  5. (onEmailChangeEvent, onPasswordChangeEvent, onNumberChangeEvent) -> {
  6. return emailValid(onEmailChangeEvent) &&
  7. passValid(onPasswordChangeEvent) &&
  8. numValid(onNumberChangeEvent);
  9. })
  10. .subscribe(aBoolean -> setValid(aBoolean));

错误处理

  1. Observable
  2. .error(new RuntimeException("testing"))
  3. .retryWhen(new RetryWithDelay(5, 1000))//
  4. .doOnSubscribe(new Action0() {
  5. @Override
  6. public void call() {
  7. _log("Attempting the impossible 5 times in intervals of 1s");
  8. }
  9. })
  10. .subscribe(new Observer<Object>() {
  11. @Override
  12. public void onCompleted() {
  13. Timber.d("on Completed");
  14. }
  15. @Override
  16. public void onError(Throwable e) {
  17. _log("Error: I give up!");
  18. }
  19. @Override
  20. public void onNext(Object aVoid) {
  21. Timber.d("on Next");
  22. }
  23. }));

retry 操作符会重新执行一次出错的 Observable。

retryWhen 操作符将延迟重新执行。

Retrofit

部分第三方库比如 Retrofit 对 Rx 进行了支持。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注