@Loong-T
2015-07-21T06:41:06.000000Z
字数 3079
阅读 1884
Rx 适合使用的场景:
subscribeOn 操作符指定 event 运行的线程,observeOn 指定 Observer 的运行线程。
subscribe 操作符的参数可以是 1~3 个 Action1,使用 lambda 来简化代码的情况下,可以取代参数为 Observer 的方法。参数为 Subscriber 时,可以多 Override 一个 onStart 的方法,执行顺序和线程为:onStart (main) -> Observable (background) -> onNext (main) -> onComplete or onError (main)
。
在 subscribe 调用后,会返回一个 Subscription 对象,需要在生命周期结束后调用其 unsubscribe 方法。如果有多个 Subscription,就使用 CompositeSubscription。
如果在某些情况下不知道如何应用 Rx,那就考虑下使用 Subject。
Rx 的流式处理便于理解代码,所以不推荐在操作符中写太复杂的代码。代码块都尽可能短,隐藏具体的细节,只要表明进行了什么样的操作就可以。
理想情况是每个操作符的参数都限制在一行,超出的部分抽成方法。
链式调用的最后一个操作符内可以适当放宽这个要求。
从现有的 item 或 item 列表创建 Observable。
从 Observer 创建 Observable。
对 item 进行变换。
从 item 创建 Observable。
这个操作符会将指定的时间间隔内的 items 的最后一个分发出来,并且丢弃掉前面的 items。
与上方相似,将一段时间内的第一个 item 发出,丢弃掉其后面的 items。
指定一个时间段,如果在这个时间段内没有接收到下一个 item,就将前面接收到的 items 分为一组,然后将这个组的最后一个 item 分发出去,其余的丢弃。
指定时间段或者数量,将这些 items 收集成 List,然后将这个 List item 分发出去。
指定时间段或者数量,将这些 items 分别构建成 Observable,然后将 Observable 按批次分发出去。
接收多个 Observable,每当从任意一个 Observable 接收到 item 时,会将每个 Observable 的最后一个 item 传递给指定的聚合方法,聚合方法负责处理这些 item。
Observable.just(somethingBlockMainThread())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
AppObservable.bindActivity(activity, somethingBlockMainThread())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
使用 timer 操作符可以设置延迟进行或者间隔重复执行。
interval 操作符可设置间隔重复执行。
配合 take 等操作符,可以组合成重复执行固定次数的任务。
WidgetObservable 提供了对 TextView、CompoundButton、AdapterView、AbsListView 的事件支持。
自定义监听的例子:
public static <T extends View> Observable<T> clicksFrom(T view) {
PublishSubject publishSubject = PublishSubject.create();
view.setOnClickListener((v) -> publishSubject.onNext(view));
return publishSubject.asObservable();
}
使用 Rx 来监听事件的好处主要体现在其他操作符的支持。
比如当文字变化时,通常的做法是添加 TextWatcher,会在每一次变化时进行一些操作(校检数据合法性等)。但在用户快速输入文字时,会引起不必要的校验操作。
这种情况下,可以使用 Debounce 操作符,在一定的时间没有输入后,才会真正触发校验操作。
适合情景:
// 无输入 400 millis 之后才显示搜索建议
bindSupportFragment(this, WidgetObservable.text(searchText))
.debounce(400, TimeUnit.MILLISECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(searchObserver());
使用 combineLatest 操作符,每次触发 item 分发时,会将每个控件的最后一个事件传递给指定的方法。
Observable.combineLatest(
WidgetObservable.text(email),
WidgetObservable.text(password),
WidgetObservable.text(number),
(onEmailChangeEvent, onPasswordChangeEvent, onNumberChangeEvent) -> {
return emailValid(onEmailChangeEvent) &&
passValid(onPasswordChangeEvent) &&
numValid(onNumberChangeEvent);
})
.subscribe(aBoolean -> setValid(aBoolean));
Observable
.error(new RuntimeException("testing"))
.retryWhen(new RetryWithDelay(5, 1000))//
.doOnSubscribe(new Action0() {
@Override
public void call() {
_log("Attempting the impossible 5 times in intervals of 1s");
}
})
.subscribe(new Observer<Object>() {
@Override
public void onCompleted() {
Timber.d("on Completed");
}
@Override
public void onError(Throwable e) {
_log("Error: I give up!");
}
@Override
public void onNext(Object aVoid) {
Timber.d("on Next");
}
}));
retry 操作符会重新执行一次出错的 Observable。
retryWhen 操作符将延迟重新执行。
部分第三方库比如 Retrofit 对 Rx 进行了支持。