@Awille
2018-12-15T05:02:58.000000Z
字数 11022
阅读 93
android 框架 异步
RxJava是ReactiveX的一种Java实现。ReactiveX为Reactive Extension的缩写。使用RxJava可以在代码逻辑越来越复杂的情况下,依然保持清晰的结构,便于分析,便于阅读,便于理解。
版本会有更新:
dependencies {
implementation 'io.reactivex:rxjava:1.2.0'
implementation 'io.reactivex:rxandroid:1.2.1'
}
Observer对象
Observer<String> observer = new Observer<String>() { //该观察者对象是没有onStart方法的
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext");
}
};
Subscriber对象
Subscriber subscriber = new Subscriber<String>() { //String代表事件队列的类型
@Override
public void onCompleted() { //事件队列完结
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) { //事件队列异常
Log.e(TAG, "onError");
}
@Override
public void onNext(String s) { //普通事件发送
Log.e(TAG, "onNext: " + s);
}
@Override
public void onStart() { //在事件未发送时执行,可以用来做准备工作
Log.e(TAG, "onStart");
}
};
这两个观察者的创建实现的功能是一样的,但是Subscriber在Observer的基础之上做了拓展,比如Observer对象当中是没有onStart方法的。
可以看到Subscriber跟Observer的声明:
public interface Observer<T>
public abstract class Subscriber<T> implements Observer<T>, Subscription
Observer是一个接口,而Subscriber在实现了Observer跟Subscription接口,并增加了一些方法,这里大概标记一下,先不管。
三种方式:实现的功能是一模一样的
//方式1:
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
@Override
public void call(Subscriber<? super String> subscriber1) {
subscriber1.onNext("first");
subscriber1.onNext("second");
subscriber1.onNext("third");
subscriber1.onCompleted();
}
});
//方式2:
Observable observableJust = Observable.just("first", "second", "third");
//方式3
String[] event = {"first", "second", "third"};
Observable observableFrom = Observable.from(event);
observable.subscribe(subscriber);
感受下结果:
11-30 17:56:09.643 1559-1559/? E/MainActivity: onStart
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: first
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: second
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: third
11-30 17:56:09.643 1559-1559/? E/MainActivity: onCompleted
public interface Action extends Function {
}
public interface Action0 extends Action {
void call();
}
public interface Action1<T> extends Action {
void call(T t);
}
public interface Action2<T1, T2> extends Action {
void call(T1 t1, T2 t2);
}
public interface Action3<T1, T2, T3> extends Action {
void call(T1 t1, T2 t2, T3 t3);
}
public interface Action4<T1, T2, T3, T4> extends Action {
void call(T1 t1, T2 t2, T3 t3, T4 t4);
}
这种简单而粗暴的源码真是让人开心[狗头]
订阅者假如逻辑简单的话,可以直接用action代替:
Action0 onCompleteAction = new Action0() {
@Override
public void call() {
Log.e(TAG, "onComplete");
}
};
Action1 onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
};
Action1 onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "onError");
}
};
observable.subscribe(onNextAction, onErrorAction, onCompleteAction);
订阅函数的调用为:
public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
结果:
11-30 18:14:03.286 1625-1625/? E/MainActivity: first
11-30 18:14:03.286 1625-1625/? E/MainActivity: second
11-30 18:14:03.286 1625-1625/? E/MainActivity: third
11-30 18:14:03.286 1625-1625/? E/MainActivity: onComplete
RxJava的操作符类型分为创建操作符、变换操作符、组合操作符、错误处理操作符、辅助操作符、条件和布尔操作符、算数、聚合操作符以及连接操作符等。
我们之前创建Observable对象的时候,用Observable.just, Observable.from, Observable.create都是创建操作符。除了这些操作符,还有很多其他的操作符
创建固定时间间隔发送整数序列的Observable
Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
@Override
public void call(Long aLong) {
Log.e(TAG, "interval : " + aLong.intValue());
}
});
结果:(如果不关app,就会一直发送)
11-30 20:24:30.426 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 0
11-30 20:24:33.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 1
11-30 20:24:36.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 2
11-30 20:24:39.425 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 3
11-30 20:24:42.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 4
创建发射指定范围的整数序列的Observable,类似于for循环,第二个参数为count
Observable.range(0, 5).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "range : " + integer);
}
});
结果:
11-30 20:31:55.092 1794-1794/? E/MainActivity: range : 0
11-30 20:31:55.092 1794-1794/? E/MainActivity: range : 1
11-30 20:31:55.092 1794-1794/? E/MainActivity: range : 2
11-30 20:31:55.092 1794-1794/? E/MainActivity: range : 3
11-30 20:31:55.092 1794-1794/? E/MainActivity: range : 4
创建一个N次重复发射特定数据的Observable
Observable.range(0, 5).repeat(2).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "range : " + integer);
}
});
结果:
11-30 20:34:57.899 1859-1859/? E/MainActivity: range : 0
11-30 20:34:57.899 1859-1859/? E/MainActivity: range : 1
11-30 20:34:57.899 1859-1859/? E/MainActivity: range : 2
11-30 20:34:57.899 1859-1859/? E/MainActivity: range : 3
11-30 20:34:57.899 1859-1859/? E/MainActivity: range : 4
11-30 20:34:57.900 1859-1859/? E/MainActivity: range : 0
11-30 20:34:57.900 1859-1859/? E/MainActivity: range : 1
11-30 20:34:57.900 1859-1859/? E/MainActivity: range : 2
11-30 20:34:57.900 1859-1859/? E/MainActivity: range : 3
11-30 20:34:57.900 1859-1859/? E/MainActivity: range : 4
感觉这个变换操作符还是设计得十分精妙的。
变换操作符可以对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发出去。
map是通过Func1对象创建的:
Observable.just("Hello").map(new Func1<String,String>() {
@Override
public String call(String s) {
return s + " Hi";
}
}).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "map result: : " + s);
}
});
结果:
11-30 20:44:06.995 1924-1924/? E/MainActivity: map result: : Hello Hi
假设有那么一种场景,给了一组bitmap文件的uri,你可以通过map把这字uri转换成bitmap然后再去加载。
Func跟Action一样,都十分的可爱:
public interface Func0<R> extends Function, Callable<R> {
@Override
R call();
}
public interface Func1<T, R> extends Function {
R call(T t);
}
public interface Func2<T1, T2, R> extends Function {
R call(T1 t1, T2 t2);
}
public interface FuncN<R> extends Function {
R call(Object... args);
}
flatMap操作符将Observable发射的数据集合变换成Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。
cast操作符的作用是强制将Observable发射的所有数据转换为指定类型。(转换类型)
final String Hosts = "https://github.com";
List<String> list= new ArrayList<>();
list.add("/Awille");
list.add("/MtZero");
list.add("/liushuwang");
Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
//先转化成一个个单独的observable
return Observable.just(Hosts + s);
}
//这之后flatMap内部会将这些单独的obServable的数据集合放进一个Observable里面
}).cast(String.class).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, "flatMap result: " + s);
}
});
结果:
11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/Awille
11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/MtZero
11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/liushuwang
ConcatMap与FlatMap作用一样,ConcatMap解决FlatMap转正一个个observable数据交叉的问题,ConcatMap能将发送的事件连起来。用法同FlatMap
flatMapIterable可以将数据包装成iterable。有关iterable,Java集合类的基本接口是Collection接口。而Collection接口必须继承java.lang.Iterable接口。有关iterable的内容可以参考:iterable
Observable.just(1,2,3).flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
@Override
public Iterable<Integer> call(Integer integer) {
Iterable list = new ArrayList();
((ArrayList) list).add(integer);
return list;
}
}).subscribe(new Action1<Integer>() {
@Override
public void call(Integer integer) {
Log.e(TAG, "flatMapIterable: " + integer);
}
});
那么这个东西到底有什么作用呢,这里吧数据转成Iterable,提供了一种数据不对等转换的效果,这个方法放到iterable中的数据最后都会输出到处理函数当中,我们这里,可以对iterable中数据进行随意更改,增删改除,怎样都可以。但是每一次的数据 与 上一次与下一次的没有 关联。
可以依次发送一组数据,看代码和输出就能懂什么意思了:
Observable.just(1, 2, 3, 4, 5, 6).buffer(2).subscribe(new Action1<List<Integer>>() {
@Override
public void call(List<Integer> integers) {
for (Integer i : integers) {
Log.e(TAG, String.valueOf(i));
}
Log.e(TAG, "-----------------");
}
});
输出:
11-30 23:37:07.433 1898-1898/? E/MainActivity: 1
11-30 23:37:07.433 1898-1898/? E/MainActivity: 2
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------
11-30 23:37:07.433 1898-1898/? E/MainActivity: 3
11-30 23:37:07.433 1898-1898/? E/MainActivity: 4
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------
11-30 23:37:07.433 1898-1898/? E/MainActivity: 5
11-30 23:37:07.433 1898-1898/? E/MainActivity: 6
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------
以上谈到的都是Rxjava的事件特点,我们谈到rxjava的时候,想到的第一个关键词就是 异步,安卓开发与异步是不可分离的,那么异步就涉及到了线程控制与线程切换,rxjava中的线程控制方法是怎样的,我们一起来看下。
1.内置的Scheduler
如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的。如果我们想切换线程,就需要
使用Scheduler。RxJava 已经内置了如下5个Scheduler。
• Schedulers.immediate():直接在当前线程运行,它是timeout、timeInterval和timestamp操作符的默认
调度器。
• Schedulers.newThread():总是启用新线程,并在新线程执行操作。
• Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模
式和 newThread()差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的
线程,因此多数情况下 io() 比 newThread() 更有效率。
• Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler使用固定线程
池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O操作的等待时间会浪费 CPU。它
是 buffer、debounce、delay、interval、sample 和 skip操作符的默认调度器。
• Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以
用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中的每一个任务。它是repeat
和retry操作符默认的调度器。
另外,RxAndroid也提供了一个常用的Scheduler:
• AndroidSchedulers.mainThread()—RxAndroid库中提供的Scheduler,它指定的操作在主线程中运
行。
2、subscribeOn操作符用于指定Observable自身在哪个线程上运行。observerOn用来指定Observer所运行的线程,也就是发射出的数据在哪个线程上使用。用法 .subscribeOn(schedulers) 与 .observerOn(schedulers).
Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(final Subscriber<? super Object> subscriber) {
OkHttpClient okHttpClient = new OkHttpClient();
RequestBody requestBody = new FormBody.Builder()
.add("ip", "192.168.1.1")
.build();
Request request = new Request.Builder()
.url("http://ip.taobao.com/service/getInfo.php")
.post(requestBody)
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
subscriber.onError(e);
}
@Override
public void onResponse(Call call, Response response) throws IOException {
subscriber.onNext(response);
subscriber.onCompleted();
}
});
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
//....
}
@Override
public void onError(Throwable e) {
//....
}
@Override
public void onNext(Object o) {
//....
}
});
以上场景以前使用过,这就不再做重复了,retrofit高度解耦,rxjava结合retrofit回比okhttp好用的多。

首先我们知道rxjava线程切换的两种方法:
subscribeOn(...)指定Observable自身运行的线程,而observeOn(...)指定订阅者自身运行的线程。
我们就看这两个方法怎么实现的
public final Observable<T> subscribeOn(Scheduler scheduler) {
if (this instanceof ScalarSynchronousObservable) {
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
}
return create(new OperatorSubscribeOn<T>(this, scheduler));
//生成一个新的Observabe对象,this为我们之前的observable,scheduler为调度器
}
下面看OperatorSubscribeOn方法:
public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
final Scheduler scheduler;
final Observable<T> source;
public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
this.scheduler = scheduler;
this.source = source;
}
@Override
public void call(final Subscriber<? super T> subscriber) {
final Worker inner = scheduler.createWorker();//创建线程处理的待执行者,这个是通过scheduler来创建的。
subscriber.add(inner);
inner.schedule(new Action0() {//调用inner的schedule方法来执行我们的原本Observable中的方法
@Override
public void call() {
final Thread t = Thread.currentThread();
Subscriber<T> s = new Subscriber<T>(subscriber) {
@Override
public void onNext(T t) {
subscriber.onNext(t);
}
//...
}
};
source.unsafeSubscribe(s);
}
});
}
}
查看创建Worker的实现方法:

找到excutor来实现的:

可以看到,这里的线程切换是由线程池来处理的。
这里线程切换时放在一个专门的线程之中去完成的。