稀有猿诉

十年磨一剑,历炼出锋芒,说话千百句,不如码二行。

Introduction to RxJava

RxJava是一个异步数据流式的开源库,已流行于Android开发行业中多年,现在已经变成了Android开发的一个标配,几乎所有,是的几乎所有的项目都会使用它(即使大部分人并没有真的在用它)。也几乎每个开发人员的简历中都会写着熟悉RxJava,甚至是精通RxJava,可见它的流行程度,今天就来学习一下RxJava的基本使用。

理解基本的范式

RxJava是Reactive Extensions的Java实现,是基于数据流的响应式编程范式,同时结合了函数式编程,准确的来说它是函数式响应式编程范式FRP(Functional Reacive Programming)。核心思想是数据流和响应式。 一个Observable就是一个会发出事件的机器,这里事件是一个数据的意思。就好比marble球,一个Observable就是一个可以不断发射出marble的机器,它就是一个数据流。 可以把它想像成一个无限列表,但并不是常规意义上的List,因为它是异步的,这里异步的意思是,同一个时间内,你看不到所有的元素,对于常规意义上的列表是同步的,意思是拿到列表时,里面的数据 全在。但Observable是异步的,拿到Observable时,可能还没有数据。数据是有时序的,有点类似于信号。如果你收集齐了Observable发出的所有数据,假设它是有限的(比如从一个真正列表创建的Observable),那么得到的就是一个列表。

Observable最佳的类比就是一个物理信号,是有时域上的概念。

Subscriber是数据的消费者,它是对数据的响应,由它来体现响应式Reactive。

我们需要做的就是把数据封装成为一个Observable,然后定义好一个响应数据的Subscriber,这就是FRP了。

1
2
 Observable.just("Hello, world of RxJava!")
           .subscribe(helloLabel::setText);

常见的Operator

Map

比较容易理解,把Observable发出的事件进行转换。提供一个单元函数(一个输入参数,一个输出参数),输入就是Observable发出的数据,输出就是转换后的结果。针对发出的事件每个都应用提供的函数。

concatMap

与flatMap类似,保证顺序。

flatMap

先做map,然后再做flat,把二维结构展平为一维,也即是把Observable of Observable展平为一个Observable。传入的map函数必须是返回一个Observable,也即是把常规数据转化为一个Observable。

此外,它的每个map操作可以是并行的,不能保证先后顺序,如果想保证顺序要使用concatMap。

switchMap

combineLatest

Operator就是数据的管道,用以把各种不同的数据发射器(Observable)连接起来,一起组成一个能够从源头数据,通过管道计算,最终生成符合预期的数据,流出到Subscriber那里。

理解线程模型

RxJava是一个异步执行框架,那么理解它的线程模型就是至关重要的,否则将陷入混乱。

简单言之,一个Observable是在调用了subscribe()方法后才会开始执行(开始发射数据),默认的情况下,Observable的执行线程就是调用subscribe()方法所在的线程。

如果要改变Observable执行的线程就要使用subscribeOn来指定一个不同的Scheduler。因为Observable是在subscribe()被调用 后才开始执行的,因此,离subscribe()方法最近的subscribeOn生效,其余的无作用。

Observable除了自己运行的线程外,还可以改变大量回调运行的线程,典型的就是监听者回调执行的线程,也就是subscribe函数中的第一个参数,它其实是一个函数,作为发射的一个数据的接收处理者(onNext),另外两个参数也是函数一个是onError,最后一个是onComplete。可以用observeOn来指定这些回调执行的线程,它可以调用多次,仅对调用点后面的回调生效。还需要注意的是,这里所指的回调包括操作符map,flatMap和fitler等,subscribe中的函数,以及像doOnNext之类的都视为回调,都受其前面的observerOn影响,其实除了Observable的构造函数以外,都算是回调都受其前面的observeOn影响。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class ThreadingModel {
    private static void logd(String msg) {
        System.out.println("[" + Thread.currentThread().getName() + "]: " + msg);
    }

    /*
     * SubscribeOn affects an Observable's operations including its constructor.
     * ObserveOn affects all callbacks of an Observable: map, flatMap, filter,
     * doOnNext and onNext in subscribe() are all callbacks. Which means, all
     * other operators and observers are callbacks, except an Observable's
     * constructor.
     */
    public static void main(String[] args) throws InterruptedException {
        var subscription = Observable.fromCallable(() -> {
                    var stuff = """
                            The tree of liberty must be refreshed
                            from time to time
                            with the blood of patriots and tyrants.
                            """;
                    var list = stuff.split("[ \n]");
                    logd("In the callable '" + stuff + "'");
                    return String.join(" ", list);
                })
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .flatMap(s -> {
                    logd("in flatMap: " + s);
                    return Observable.fromArray(s.split(" "));
                })
                .doOnEach(s -> logd("in doOnEach: " + s.getValue()))
                .observeOn(Schedulers.single())
                .doOnNext(s -> logd("in doOnNext: " + s))
                .filter(s -> {
                    logd("in filter: " + s);
                    return s.length() > 5;
                })
                .map(s -> {
                    logd("in map: s");
                    return s.toUpperCase();
                })
                .observeOn(Schedulers.computation())
                .subscribe(s -> {
                    logd(" onNext: '" + s + "'");
                });

        Thread.sleep(2000);
        subscription.dispose();
    }
}

结果输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
[RxCachedThreadScheduler-1]: In the callable 'The tree of liberty must be refreshed
from time to time
with the blood of patriots and tyrants.
'
[RxNewThreadScheduler-1]: in flatMap: The tree of liberty must be refreshed from time to time with the blood of patriots and tyrants.
[RxNewThreadScheduler-1]: in doOnEach: The
[RxNewThreadScheduler-1]: in doOnEach: tree
[RxNewThreadScheduler-1]: in doOnEach: of
[RxNewThreadScheduler-1]: in doOnEach: liberty
[RxNewThreadScheduler-1]: in doOnEach: must
[RxNewThreadScheduler-1]: in doOnEach: be
[RxSingleScheduler-1]: in doOnNext: The
[RxNewThreadScheduler-1]: in doOnEach: refreshed
[RxNewThreadScheduler-1]: in doOnEach: from
[RxSingleScheduler-1]: in filter: The
[RxNewThreadScheduler-1]: in doOnEach: time
[RxSingleScheduler-1]: in doOnNext: tree
[RxSingleScheduler-1]: in filter: tree
[RxNewThreadScheduler-1]: in doOnEach: to
[RxSingleScheduler-1]: in doOnNext: of
[RxNewThreadScheduler-1]: in doOnEach: time
[RxSingleScheduler-1]: in filter: of
[RxNewThreadScheduler-1]: in doOnEach: with
[RxSingleScheduler-1]: in doOnNext: liberty
[RxNewThreadScheduler-1]: in doOnEach: the
[RxSingleScheduler-1]: in filter: liberty
[RxNewThreadScheduler-1]: in doOnEach: blood
[RxSingleScheduler-1]: in map: s
[RxNewThreadScheduler-1]: in doOnEach: of
[RxNewThreadScheduler-1]: in doOnEach: patriots
[RxNewThreadScheduler-1]: in doOnEach: and
[RxNewThreadScheduler-1]: in doOnEach: tyrants.
[RxSingleScheduler-1]: in doOnNext: must
[RxSingleScheduler-1]: in filter: must
[RxSingleScheduler-1]: in doOnNext: be
[RxSingleScheduler-1]: in filter: be
[RxNewThreadScheduler-1]: in doOnEach: null
[RxSingleScheduler-1]: in doOnNext: refreshed
[RxSingleScheduler-1]: in filter: refreshed
[RxSingleScheduler-1]: in map: s
[RxSingleScheduler-1]: in doOnNext: from
[RxSingleScheduler-1]: in filter: from
[RxSingleScheduler-1]: in doOnNext: time
[RxSingleScheduler-1]: in filter: time
[RxComputationThreadPool-1]:  onNext: 'LIBERTY'
[RxSingleScheduler-1]: in doOnNext: to
[RxComputationThreadPool-1]:  onNext: 'REFRESHED'
[RxSingleScheduler-1]: in filter: to
[RxSingleScheduler-1]: in doOnNext: time
[RxSingleScheduler-1]: in filter: time
[RxSingleScheduler-1]: in doOnNext: with
[RxSingleScheduler-1]: in filter: with
[RxSingleScheduler-1]: in doOnNext: the
[RxSingleScheduler-1]: in filter: the
[RxSingleScheduler-1]: in doOnNext: blood
[RxSingleScheduler-1]: in filter: blood
[RxSingleScheduler-1]: in doOnNext: of
[RxSingleScheduler-1]: in filter: of
[RxSingleScheduler-1]: in doOnNext: patriots
[RxSingleScheduler-1]: in filter: patriots
[RxSingleScheduler-1]: in map: s
[RxSingleScheduler-1]: in doOnNext: and
[RxSingleScheduler-1]: in filter: and
[RxSingleScheduler-1]: in doOnNext: tyrants.
[RxSingleScheduler-1]: in filter: tyrants.
[RxComputationThreadPool-1]:  onNext: 'PATRIOTS'
[RxSingleScheduler-1]: in map: s
[RxComputationThreadPool-1]:  onNext: 'TYRANTS.'

那么,总结一下就是Observable的回调(map等操作符,doOnNext,和subscribe的三个函数参数)是受其前面的observeOn影响;而Observable的构造(如just,如from),onSubscribed等属于Observable本身执行的线程,受最后一个(离subscribe())最近的subscribeOn影响。

在实际使用中为了避免歧义和理解错误,应该先调用一次subscrieOn,改变Observable的执行线程,然后在最后用observeOn指定回调线程,就像AsyncTask中的doInBackground(这是subscribeOn影响了Observable的执行线程),和onPostExecute(这是observeOn影响了回调线程)。这样线程模型会相当的清晰,减少出错。为什么要把observeOn放在最后呢,因为这货会影响操作符map,而map等操作符一般认为是中间过程,而非最后的结果回调,按道理它不应该受observeOn影响,但实际它却受其影响,这会引发一些难以调试的隐藏Bug,特别是对于复杂的项目,map中又会产生对全局变量的副作用时。

扩展阅读:

常见的技巧

感知冷热

Observable是一个数据流,可视为一个事件发射器,不断的向下游发送数据(emission)。但数据何时发送,以及发送多少,是有一些细微区别的,这就引出了Observable是有冷热之分(Cold vs Hot)。

在《Learning RxJava》这本书中有一个非常形象的比喻,cold Observables就像音乐CD,每次播放都能得到相同的内容。对于所有Subscriber来说,无论你啥时候去subscribe,都能得到同样的数据流,这就是cold的。大多数以数据集为基础创建的Observable都是cold的,如Observable.just, Observable.fromIterable以及像从数据库或者文件存储中读取的数据。

1
2
3
4
5
6
7
8
9
10
11
     Observable<String> source = Observable.just("alpha", "beta", "gamma");
    source.subscribe(s -> System.out.println("Subscriber #1 received: " + s));

    source.subscribe(s -> System.out.println("Subscriber #2 received: " + s));

     //Subscriber #1 received: alpha
    //Subscriber #1 received: beta
    //Subscriber #1 received: gamma
    //Subscriber #2 received: alpha
    //Subscriber #2 received: beta
    //Subscriber #2 received: gamma

Hot Observables则是像一个音乐广播电台,你今天收听到的内容,跟昨天收听到的内容是不一样的。不同的时间去subscribe会得到不一样的数据流,晚些subscribe就会错失前面的数据,这便是hot Observables。像一些无限数据集(比如社交信息,或者新闻信息),与时间有关的数据(如interval),以及用户事件都属于hot Observables。

RxBinding只能被subscribe一次

一般来说一个Observable,只能被subscribe一次。

但总的说 是分为冷和热,对于冷的,一般是有限集合的Observable,它可以被subscribe无限次,且每个Subscriber接收到的东西是一样的。

但对于热的,就不一样,有些可以被subscribe多次,有些则不能。

给一个Observable subscribe多个Subscriber的行为叫做multicast。 对于cold的,可以通过ConnectableObesrver,通过connect,来让几个Subscriber同步接收来自Observable的emission。

但是对于hot的Observable,如何 让 不同的Subscriber同步接收emission呢? 就比如说RxBinding中的大部分来自于View的事件Observable,都是hot的,并且,它们默认情况下,不能被multicast,只有最后一个subscribe的Observer,才可以接收事件。

这时就需要把Observable share一下。通过share(),之后就可以multicast了。

如何做Recursion

有一些场景是会出现循环,或者说Recursion的,比如说像文件遍历,对于文件夹的操作,是需要Recursion的。

这里就有两种场景,一种会在某个节点停留,用户具体进一步操作才会深入的遍历的情况,比如像文件浏览器,展示的就是当前的文件夹,用户点子目录,才会更进一步。这种场景,需要Hold住当前的文件夹,但当有新的文件夹变成当前文件夹时,它需要更新数据,这种数据产生的闭环,可以用Subject来解决。

但,如果是一个完整的遍历流程,从根节点开始,一直到所有的叶子为止,那么用subject可能就不太合适。这时就需要用一些Recursion来解决,可以看一些网络上的例子。需要用到常规的recursion方式,先要弄一个方法,在里面做出reactive chain,就是在Reactive链里面再调用这个方法,以此递归下去。不过,不知道这个当有一些耗时操作时,会不会造成堆积,以及会不会有资源释放的问题,有待考查。

Reactive这玩意儿,确实难度较大,想写出符合Reactive规范,且正确的代码还是相当难的。而且它难以调试,有时候完全不知道错在哪里。

书籍推荐

RxJava的学习曲线 非常之陡峭,它融合了异步,多线程,函数式编程和响应式编程,集多种编程范式于一体,要想真正的用好RxJava需要深度理解RxJava本身,更需要函数式和响应式编程的一些思维。必须要以Reactive的方式来架构你的应用程序,这才能真正算得上使用了RxJava。比如仅是用了几个Observable,用了几个operator,但是整体项目的代码仍是状态变量散落一大堆,这根本不叫用了RxJava,这仅仅是把RxJava当成工具来用了,并没真正践行它的精髓思想。

要想学好RxJava必须要啃书,它的学习曲线陡峭,并不是看了文档就能用(那根本不叫Reactive,仅是把RxJava当成工具类了),通过啃书达到一定的理解深度,然后再在项目中去实践。

《Learning RxJava》

这本书对于深入的理解RxJava本身非常有帮助,它比官方文档要详细得多,具体给你解释什么是Observable,什么是Observer以及各种operator,并且都带有实例。这本书,不建议从头读到尾,而是要像文档一样对待,需要深入理解哪个知点点的时候就去具体看那一章节就好。

书中的示例非常短小精悍,但能非常好的帮助你理解对应的知识点。

这本书的目的是让你更深入的了解RxJava这一库的本身,也就是说让你更好的了解工具本身。但这远远不够,即使把这本书看完,你仍旧会是把RxJava当成一个工具类来使用。

参考资料

Comments