[关闭]
@Awille 2018-12-15T05:02:58.000000Z 字数 11022 阅读 93

函数响应式编程-RxJava

android 框架 异步


基本用法

概述

RxJava是ReactiveX的一种Java实现。ReactiveX为Reactive Extension的缩写。使用RxJava可以在代码逻辑越来越复杂的情况下,依然保持清晰的结构,便于分析,便于阅读,便于理解。

依赖的添加

版本会有更新:

dependencies {
    implementation 'io.reactivex:rxjava:1.2.0'
    implementation 'io.reactivex:rxandroid:1.2.1'
}

创建观察者

Observer对象

Observer<String> observer = new Observer<String>() { //该观察者对象是没有onStart方法的
    @Override
    public void onCompleted() {
        Log.e(TAG, "onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.e(TAG, "onError");
    }
    @Override
    public void onNext(String s) {
        Log.e(TAG, "onNext");
    }
};

Subscriber对象

Subscriber subscriber = new Subscriber<String>() { //String代表事件队列的类型
    @Override
    public void onCompleted() { //事件队列完结
        Log.e(TAG, "onCompleted");
    }
    @Override
    public void onError(Throwable e) { //事件队列异常
        Log.e(TAG, "onError");
    }
    @Override
    public void onNext(String s) { //普通事件发送
        Log.e(TAG, "onNext: " + s);
    }
    @Override
    public void onStart() { //在事件未发送时执行,可以用来做准备工作
        Log.e(TAG, "onStart");
    }
};

这两个观察者的创建实现的功能是一样的,但是Subscriber在Observer的基础之上做了拓展,比如Observer对象当中是没有onStart方法的。
可以看到Subscriber跟Observer的声明:

public interface Observer<T> 
public abstract class Subscriber<T> implements Observer<T>, Subscription

Observer是一个接口,而Subscriber在实现了Observer跟Subscription接口,并增加了一些方法,这里大概标记一下,先不管。

创建被观察者

三种方式:实现的功能是一模一样的

//方式1:
Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
            @Override
            public void call(Subscriber<? super String> subscriber1) {
                subscriber1.onNext("first");
                subscriber1.onNext("second");
                subscriber1.onNext("third");
                subscriber1.onCompleted();
            }
        });
//方式2:
Observable observableJust = Observable.just("first", "second", "third");
//方式3
String[] event = {"first", "second", "third"};
Observable observableFrom = Observable.from(event);

订阅

observable.subscribe(subscriber);

感受下结果:

11-30 17:56:09.643 1559-1559/? E/MainActivity: onStart
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: first
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: second
11-30 17:56:09.643 1559-1559/? E/MainActivity: onNext: third
11-30 17:56:09.643 1559-1559/? E/MainActivity: onCompleted

RxJava不完整定义回调

Action的定义
public interface Action extends Function {
}
public interface Action0 extends Action {
    void call();
}
public interface Action1<T> extends Action {
    void call(T t);
}
public interface Action2<T1, T2> extends Action {
    void call(T1 t1, T2 t2);
}
public interface Action3<T1, T2, T3> extends Action {
    void call(T1 t1, T2 t2, T3 t3);
}
public interface Action4<T1, T2, T3, T4> extends Action {
    void call(T1 t1, T2 t2, T3 t3, T4 t4);
}

这种简单而粗暴的源码真是让人开心[狗头]
订阅者假如逻辑简单的话,可以直接用action代替:

Action0 onCompleteAction = new Action0() {
    @Override
    public void call() {
        Log.e(TAG, "onComplete");
    }
};
Action1 onNextAction = new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e(TAG, s);
    }
};
Action1 onErrorAction = new Action1<Throwable>() {
    @Override
    public void call(Throwable throwable) {
        Log.e(TAG, "onError");
    }
};
observable.subscribe(onNextAction, onErrorAction, onCompleteAction);

订阅函数的调用为:

public final Subscription subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)

结果:

11-30 18:14:03.286 1625-1625/? E/MainActivity: first
11-30 18:14:03.286 1625-1625/? E/MainActivity: second
11-30 18:14:03.286 1625-1625/? E/MainActivity: third
11-30 18:14:03.286 1625-1625/? E/MainActivity: onComplete

RxJava的Subject

RxJava常用操作符

RxJava的操作符类型分为创建操作符、变换操作符、组合操作符、错误处理操作符、辅助操作符、条件和布尔操作符、算数、聚合操作符以及连接操作符等。

创建操作符

我们之前创建Observable对象的时候,用Observable.just, Observable.from, Observable.create都是创建操作符。除了这些操作符,还有很多其他的操作符

interval

创建固定时间间隔发送整数序列的Observable

Observable.interval(3, TimeUnit.SECONDS).subscribe(new Action1<Long>() {
    @Override
    public void call(Long aLong) {
        Log.e(TAG, "interval : " + aLong.intValue());
    }
});

结果:(如果不关app,就会一直发送)

11-30 20:24:30.426 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 0
11-30 20:24:33.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 1
11-30 20:24:36.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 2
11-30 20:24:39.425 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 3
11-30 20:24:42.424 1725-1741/com.example.will.rxjavatest E/MainActivity: interval : 4

range

创建发射指定范围的整数序列的Observable,类似于for循环,第二个参数为count

Observable.range(0, 5).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.e(TAG, "range  : " + integer);
    }
});

结果:

11-30 20:31:55.092 1794-1794/? E/MainActivity: range  : 0
11-30 20:31:55.092 1794-1794/? E/MainActivity: range  : 1
11-30 20:31:55.092 1794-1794/? E/MainActivity: range  : 2
11-30 20:31:55.092 1794-1794/? E/MainActivity: range  : 3
11-30 20:31:55.092 1794-1794/? E/MainActivity: range  : 4

repeat

创建一个N次重复发射特定数据的Observable

Observable.range(0, 5).repeat(2).subscribe(new Action1<Integer>() {
    @Override
    public void call(Integer integer) {
        Log.e(TAG, "range  : " + integer);
    }
});

结果:

11-30 20:34:57.899 1859-1859/? E/MainActivity: range  : 0
11-30 20:34:57.899 1859-1859/? E/MainActivity: range  : 1
11-30 20:34:57.899 1859-1859/? E/MainActivity: range  : 2
11-30 20:34:57.899 1859-1859/? E/MainActivity: range  : 3
11-30 20:34:57.899 1859-1859/? E/MainActivity: range  : 4
11-30 20:34:57.900 1859-1859/? E/MainActivity: range  : 0
11-30 20:34:57.900 1859-1859/? E/MainActivity: range  : 1
11-30 20:34:57.900 1859-1859/? E/MainActivity: range  : 2
11-30 20:34:57.900 1859-1859/? E/MainActivity: range  : 3
11-30 20:34:57.900 1859-1859/? E/MainActivity: range  : 4

变换操作符

感觉这个变换操作符还是设计得十分精妙的。
变换操作符可以对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发出去。

map

map是通过Func1对象创建的:

Observable.just("Hello").map(new Func1<String,String>() {
    @Override
    public String call(String s) {
        return s + " Hi";
    }
}).subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        Log.e(TAG, "map result:  : " + s);
    }
});

结果:

11-30 20:44:06.995 1924-1924/? E/MainActivity: map result:  : Hello Hi

假设有那么一种场景,给了一组bitmap文件的uri,你可以通过map把这字uri转换成bitmap然后再去加载。

Func跟Action一样,都十分的可爱:

public interface Func0<R> extends Function, Callable<R> {
    @Override
    R call();
}
public interface Func1<T, R> extends Function {
    R call(T t);
}
public interface Func2<T1, T2, R> extends Function {
    R call(T1 t1, T2 t2);
}
public interface FuncN<R> extends Function {
    R call(Object... args);
}

flatMap、cast

flatMap操作符将Observable发射的数据集合变换成Observable集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable。
cast操作符的作用是强制将Observable发射的所有数据转换为指定类型。(转换类型)

    final String Hosts = "https://github.com";
    List<String> list= new ArrayList<>();
    list.add("/Awille");
    list.add("/MtZero");
    list.add("/liushuwang");
    Observable.from(list).flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String s) {
            //先转化成一个个单独的observable
            return Observable.just(Hosts + s);
        }
        //这之后flatMap内部会将这些单独的obServable的数据集合放进一个Observable里面
    }).cast(String.class).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.e(TAG, "flatMap result: " + s);
        }
    });

结果:

11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/Awille
11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/MtZero
11-30 21:14:06.407 2056-2056/? E/MainActivity: flatMap result: https://github.com/liushuwang

ConcatMap、cast

ConcatMap与FlatMap作用一样,ConcatMap解决FlatMap转正一个个observable数据交叉的问题,ConcatMap能将发送的事件连起来。用法同FlatMap

flatMapIterable

flatMapIterable可以将数据包装成iterable。有关iterable,Java集合类的基本接口是Collection接口。而Collection接口必须继承java.lang.Iterable接口。有关iterable的内容可以参考:iterable

Observable.just(1,2,3).flatMapIterable(new Func1<Integer, Iterable<Integer>>() {
            @Override
            public Iterable<Integer> call(Integer integer) {
                Iterable list = new ArrayList();
                ((ArrayList) list).add(integer);
                return list;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.e(TAG, "flatMapIterable: " + integer);
            }
        });

那么这个东西到底有什么作用呢,这里吧数据转成Iterable,提供了一种数据不对等转换的效果,这个方法放到iterable中的数据最后都会输出到处理函数当中,我们这里,可以对iterable中数据进行随意更改,增删改除,怎样都可以。但是每一次的数据 与 上一次与下一次的没有 关联。

buffer

可以依次发送一组数据,看代码和输出就能懂什么意思了:

Observable.just(1, 2, 3, 4, 5, 6).buffer(2).subscribe(new Action1<List<Integer>>() {
    @Override
    public void call(List<Integer> integers) {
        for (Integer i : integers) {
            Log.e(TAG, String.valueOf(i));
        }
        Log.e(TAG, "-----------------");
    }
});

输出:

11-30 23:37:07.433 1898-1898/? E/MainActivity: 1
11-30 23:37:07.433 1898-1898/? E/MainActivity: 2
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------
11-30 23:37:07.433 1898-1898/? E/MainActivity: 3
11-30 23:37:07.433 1898-1898/? E/MainActivity: 4
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------
11-30 23:37:07.433 1898-1898/? E/MainActivity: 5
11-30 23:37:07.433 1898-1898/? E/MainActivity: 6
11-30 23:37:07.433 1898-1898/? E/MainActivity: -----------------

groupBy 操作符就先到这里 其他之后再补充

RxJava线程控制

以上谈到的都是Rxjava的事件特点,我们谈到rxjava的时候,想到的第一个关键词就是 异步,安卓开发与异步是不可分离的,那么异步就涉及到了线程控制与线程切换,rxjava中的线程控制方法是怎样的,我们一起来看下。
1.内置的Scheduler
如果我们不指定线程,默认是在调用subscribe方法的线程上进行回调的。如果我们想切换线程,就需要
使用Scheduler。RxJava 已经内置了如下5个Scheduler。
• Schedulers.immediate():直接在当前线程运行,它是timeout、timeInterval和timestamp操作符的默认
调度器。
Schedulers.newThread():总是启用新线程,并在新线程执行操作。
Schedulers.io():I/O操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模
式和 newThread()差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的
线程,因此多数情况下 io() 比 newThread() 更有效率。
• Schedulers.computation():计算所使用的 Scheduler,例如图形的计算。这个 Scheduler使用固定线程
池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O操作的等待时间会浪费 CPU。它
是 buffer、debounce、delay、interval、sample 和 skip操作符的默认调度器。
• Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即时,可以
用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中的每一个任务。它是repeat
和retry操作符默认的调度器。
另外,RxAndroid也提供了一个常用的Scheduler:
AndroidSchedulers.mainThread()—RxAndroid库中提供的Scheduler,它指定的操作在主线程中运
行。

2、subscribeOn操作符用于指定Observable自身在哪个线程上运行。observerOn用来指定Observer所运行的线程,也就是发射出的数据在哪个线程上使用。用法 .subscribeOn(schedulers) 与 .observerOn(schedulers).

RxJava结合okhttp

Observable.create(new Observable.OnSubscribe<Object>() {
    @Override
    public void call(final Subscriber<? super Object> subscriber) {
        OkHttpClient okHttpClient = new OkHttpClient();
        RequestBody requestBody = new FormBody.Builder()
                .add("ip", "192.168.1.1")
                .build();
        Request request = new Request.Builder()
                .url("http://ip.taobao.com/service/getInfo.php")
                .post(requestBody)
                .build();
        Call call = okHttpClient.newCall(request);
        call.enqueue(new Callback() {
            @Override
            public void onFailure(Call call, IOException e) {
                subscriber.onError(e);
            }
            @Override
            public void onResponse(Call call, Response response) throws IOException {
                subscriber.onNext(response);
                subscriber.onCompleted();
            }
        });
    }
}).subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Subscriber<Object>() {
      @Override
      public void onCompleted() {
          //....
      }
      @Override
      public void onError(Throwable e) {
          //....
      }
      @Override
      public void onNext(Object o) {
          //....
      }
  });    

RxJava结合retrofit

以上场景以前使用过,这就不再做重复了,retrofit高度解耦,rxjava结合retrofit回比okhttp好用的多。

RxJava源码解析 这里我们只关注线程的切换过程

订阅

变换

image.png-22.7kB

线程切换

首先我们知道rxjava线程切换的两种方法:
subscribeOn(...)指定Observable自身运行的线程,而observeOn(...)指定订阅者自身运行的线程。
我们就看这两个方法怎么实现的

subscribeOn()

public final Observable<T> subscribeOn(Scheduler scheduler) {
    if (this instanceof ScalarSynchronousObservable) {
        return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
    }
    return create(new OperatorSubscribeOn<T>(this, scheduler));
    //生成一个新的Observabe对象,this为我们之前的observable,scheduler为调度器
}

下面看OperatorSubscribeOn方法:

public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
    final Scheduler scheduler;
    final Observable<T> source;
    public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = source;
    }
    @Override
    public void call(final Subscriber<? super T> subscriber) {
        final Worker inner = scheduler.createWorker();//创建线程处理的待执行者,这个是通过scheduler来创建的。
        subscriber.add(inner);
        inner.schedule(new Action0() {//调用inner的schedule方法来执行我们的原本Observable中的方法
            @Override
            public void call() {
                final Thread t = Thread.currentThread();
                Subscriber<T> s = new Subscriber<T>(subscriber) {
                    @Override
                    public void onNext(T t) {
                        subscriber.onNext(t);
                    }
                    //...
                    }
                };
                source.unsafeSubscribe(s);
            }
        });
    }
}

查看创建Worker的实现方法:
image.png-95.5kB

找到excutor来实现的:
image.png-25.8kB

可以看到,这里的线程切换是由线程池来处理的。

observeOn()

这里线程切换时放在一个专门的线程之中去完成的。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注