RxJava的使用

RxJava介绍

首先要说明的一点,RxAndroid和RxJava是差不多的东西,只不过RxAndroid 针对Android平台做了一点调整。那么RxJava是什么?在其github上是这样讲的:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。这么讲可能还有点绕口,简单的讲实际上最重要的就是异步两字,RxJava可以简单的实现异步操作,并且不管逻辑多么复杂,它始终能够保持简洁性。
通常在Android中,非UI线程是不能更新UI界面的,而一些耗时的操作我们又不能放在UI线程,否则会导致界面卡顿。这种情况下,我们就需要切换线程来实现,即Handler和AsyncTask来实现,但是这两种都有个缺陷,代码非常多,非常杂,可读性非常差。所以,RxJava出现了,它能够两行代码就实现线程切换,非常的简单,使用起来就会让人感觉很爽,再也不用为异步操作写如此繁重的代码了。

RxJava基本用法

RxJava最核心的两个东西是Observables(被观察者,事件源)和Observer/Subscriber(观察者),还有将他们联系在一起的操作subscribe(订阅)。当被观察者发生变化时观察者能即使做出相应,就好像我们的按钮事件一样:

1
2
3
4
5
6
button.setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {

}
});

在这里button就是被观察者,OnClickListener就是观察者,setOnClickListener这个方法就相当于订阅操作,当button被按下时,OnClickListener监听到变化,调用OnClick做出反应,RxJava实现的就是类似这样的一个过程。注意这里的观察者有两种Observer,Subscriber,这两个其实是差不多的,Subscriber是对Observer的一种扩展,内部增加了OnStart方法,在事件未发送之前订阅,用于做一些准备工作,并且还有unsubscribe()用于取消订阅。
让我们来看一下ObServer的内部实现:

1
2
3
4
5
6
7
8
9
public interface Observer<T> {

void onCompleted();

void onError(Throwable e);

void onNext(T t);

}

可以看到ObServer本身是一个接口,内部有onNext(T t)方法:观测到所检测的被观察者有变化时做出相应反应。onCompleted()方法:RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。onCompleted():事件队列发生异常,要调用的方法。我们在定义一个观察者的时候,需要实现这些方法,来完成事件队列。
观察者有了,那么被观察者Observables怎么创建呢,RxJava提供了一系列操作符供我们调用,其中就有很多创建型操作符,举个例子,创建一个Observables,发出hello world字符串给观察者:

1
2
3
4
5
6
7
8
Observable<String> myObservable = Observable.create(  
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
} );

既然有了Observables,那我们就可以根据这个Observables创建一个观察者了,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Observer<String> TestObserver=new Observer<String>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(String s) {
Log.i(TAG,s);
}
}

这样我们就可以愉快的订阅了:

1
myObservable.subscribe(TestObserver);

这样,一个简单的RxJava订阅流程就完成了。这里可能很多人就有疑问了,关键的异步呢,体现在哪了?其实这个例子可能不是很明显,因为被观察者并不是一个耗时线程,不能很直观的体现异步。如果myObservable这是一个异步任务,比如网络请求,那么我们订阅之后,TestObserver会一直监听myObservable是否有返回,如果有,那么就做出响应,本质是一样的。

RxJava的操作符

RxJava一个强大的地方在于它的异步,另外一个强大的地方就在于它提供了强大的操作符支持。这里说明一下几个常用的操作符:

  • ceate操作符
1
2
3
4
5
6
7
8
Observable<String> myObservable = Observable.create(  
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) {
sub.onNext("Hello, world!");
sub.onCompleted();
}
} );

ceate操作符创建一个被观察者,在call方法里持有一个观察者Subscriber参数,当这个Observable被订阅时,执行观察者相应的方法。

  • just操作符
1
Observable<String> myObservable = Observable.just("Hello, world!");

上面的代码可以用这一句话,代替。just操作符的功能就是将一个对象转化为Observable。

  • from操作符

Observable myObservable = Observable.from("Hello"," world!");

1
2
3
4
5
6
7

既然有了将单一对象转化为Observable的操作符,那么必须要有将多个对象转化为Observable的操作符,那就是from,from接收一个对象数组,然后逐一发射给观察者。

现在,用一个例子来说明其他操作符,比如我们有这样一个方法,根据学生姓名关键字查询学生列表,返回一个Observable

```java
Observable<List<Student>> query(String name);

然后我们的需求是一个一个的输出学生姓名,实现如下:

1
2
3
4
query("王").subscribe(list -> {
for(Student student:list){
Log.i(TAG,student.getName());
});
  • flatMap操作符
1
2
3
query("王").flatMap(list -> Observable.from(list)) 
.subscribe(student ->Log.i(TAG,student.getName());
);

上面的例子用flatMap操作符,就可以变得很简洁,flatMap操作符的功能是接收一个接收一个Observable的输出作为输入,同时输出另外一个Observable,通常是接收一个list,然后逐一发送list的元素。比如这边的Student数组,变成了逐一发送student的Observable。

  • Map操作符
1
2
3
4
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.subscribe(grade->Log.i(TAG,grade+"");
);

现在我们只想输出每个学生的成绩,我们就需要Map操作符,它的功能是接收一种类型的Observable,转化为另外一种Observable,比如这边的Student类型转化为了Int型的Observable。

1
2
3
4
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.subscribe(grade->Log.i(TAG,grade+"");
);
  • filter操作符
1
2
3
4
5
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.subscribe(grade->Log.i(TAG,grade+"");
);

顾名思义filter操作符就是过滤用的,相当于加个判断条件,比如这边的就是加上分数大于80的条件.

  • take操作符
1
2
3
4
5
6
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.subscribe(grade->Log.i(TAG,grade+"");
);

take操作符的功能是限定个数,比如这边的功能就是限定我最多需要5个成绩。

  • doOnNext操作符
1
2
3
4
5
6
7
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.doOnNext(grade->save(grade))
.subscribe(grade->Log.i(TAG,grade+"");
);

doOnNext()允许我们在每次输出一个元素之前做一些额外的事情,比如这里的我们用来保存成绩。

  • subscribeOn/observeOn操作符
1
2
3
4
5
6
7
8
9
query("王").flatMap(list -> Observable.from(list)) 
.Map(student->return student.getGrade())
.filter(grade->grade>80)
.take(5)
.doOnNext(grade->save(grade))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(grade->Log.i(TAG,grade+"");
);

这两个操作符一般都是成对出现的,他们的功能就是切换线程。subscribeOn是指定被观察者的线程,observeOn是指定观察者的线程。比如这个例子中前面的订阅的工作在IO线程做,后面的打印功能在主线程做。

  • 小结
    怎么样,看起来我好像做了很多事情,又有判断数据,又有保存数据,又有选取数据,关键还有线程切换,然而,我实际上就写了那么一点代码,看起来是不是酷!这就是RxJava的魅力所在。

RxAndroid

一开始说了,RxAndroid其实跟RxJava是差不多的,但是总归还是有一点变化的。比如Android上会有生命周期的问题,可能会导致内存泄漏:Observable持有Context导致的内存泄露。在这个问题上,我们的解决方法是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
private Subscription mTestSubscription= Subscriptions.empty();

public void test(){
mTestSubscription=myObservable.subscribe(TestObserver);
}
@Override
public void onDestroy() {
super.onDestroy();
if (mTestSubscription != null && !mTestSubscription.isUnsubscribed()) {
mTestSubscription.unsubscribe();
}

}

就是在订阅的时候,用一个Subscription来保存它,然后在退出这个Activity的时候取消订阅。
另外还有一些专门为Android设计的RxView,比如以下防抖动的View:

java RxView.clicks(btn_click) .throttleFirst(3, TimeUnit.SECONDS) .subscribe();

参考:https://www.jianshu.com/p/d9fca152017b