作用
- 对事件序列中的事件/整个事件序列进行加工处理(即变换),使得其转换成不同的事件/整个事件序列
- 具体原理如下所示
类型
Map()
-
作用
- 对被观察者发送的每一个事件都通过指定的函数处理,从而变换成另外一种事件,即将被观察者发送的事件转换为任意的类型事件。
-
原理
-
应用场景
- 数据类型转换
-
具体使用
- 下面以将使用Map()将事件的参数从整型变换成字符串类型为例子说明。从下面的例子可以看出,map()将参数中的Integer类型对象转换成一个String类型对象后返回。同时,事件的参数类型也由Integer类型变成了String类型。
- 下面以将使用Map()将事件的参数从整型变换成字符串类型为例子说明。从下面的例子可以看出,map()将参数中的Integer类型对象转换成一个String类型对象后返回。同时,事件的参数类型也由Integer类型变成了String类型。
//采用RxJava基于事件流的链式操作 Observable.create(new ObservableOnSubscribe() { //1. 被观察者发送事件=参数为整型=1、2、3 @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } //2. 使用Map变换操作符中的Function函数对被观察者发送的事件进行统一变换: // 整型变换成字符串类型 }).map(new Function () { @Override public String apply(Integer integer) throws Exception { return "使用Map变换操作符将事件" + integer + "的参数从整型" + integer + "变换成字符串类型" + integer; } }).subscribe(new Consumer () { //3. 观察者接收事件时,是接收到变换后的事件=字符串事件 @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });复制代码
FlatMap()
-
作用
- 将被观察者发送的事件序列进行拆分和单独转换,再合并成一个新的事件序列,最后再进行发送。新合并生成的事件序列顺序是无序的,即与旧序列发送事件的顺序无关。
-
原理
- 为事件序列中的每个事件都创建一个Observable对象
- 将对每个原始事件转换后的新事件都放入到对应的Observable对象
- 将新建的每个Observable都合并到一个新建的、总的Observable对象
- 新建的、总的Observable对象将新合并的事件序列发送到观察者(Observer)
-
应用场景
- 无序的将被观察者发送的整个事件序列进行变换
-
具体使用
//采用RxJava基于事件流的链式操作 Observable.create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }) //采用flatMap()变换操作符 .flatMap(new Function >() { @Override public ObservableSource apply(Integer integer) throws Exception { final List list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("我是事件" + integer + "拆分后的子事件" + i); //通过flatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送3个String事件 //最终合并,再发送给被观察者 } return Observable.fromIterable(list); } }) .subscribe(new Consumer () { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });复制代码
ConcatMap()
-
作用
- 类似 FlatMap()操作符
- 与FlatMap()的区别在于:拆分和重新合并生成的事件序列的顺序=被观察者旧序列生产的顺序。
-
原理
-
应用场景
- 有序的将被观察者发送的整个事件序列进行变换
-
具体使用
//采用RxJava基于事件流的链式操作 Observable .create(new ObservableOnSubscribe() { @Override public void subscribe(ObservableEmitter emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }) //采用concatMap()变换操作符 .concatMap(new Function >() { @Override public ObservableSource apply(Integer integer) throws Exception { final List list = new ArrayList<>(); for (int i = 0; i < 3; i++) { list.add("我是事件" + integer + "拆分后的子事件" + i); //通过concatMap中将被观察者生产的事件序列先进行拆分,再将每个事件转换为一个新的发送3个String事件 //最终合并,再发送给被观察者 } return null; } }) .subscribe(new Consumer () { @Override public void accept(String s) throws Exception { Log.d(TAG, s); } });复制代码
Buffer()
-
作用
- 定期从被观察者(Observable)需要发送的事件中获取一定数量的事件和放缓存区中,最终发送。
-
原理
-
应用场景
- 缓存被观察者发送的事件
-
具体使用
- 那么Buffer()每次是获取多少个事件放到缓存区中的呢,下面我们将通过一个例子来说明。
//被观察者需要发送5个数字 Observable.just(1, 2, 3, 4, 5) //设置缓存区大小和步长 //缓存区大小 = 每次从被观察者中获取的事件数量 //步长 = 每次获取新事件的数量 .buffer(3, 1) .subscribe(new Observer
>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List stringList) { Log.d(TAG, "缓存区里的事件数量=" + stringList.size()); for (Integer value : stringList) { Log.d(TAG, "事件 = " + value); } } @Override public void onError(Throwable e) { Log.d(TAG, "对Error事件作出响应"); } @Override public void onComplete() { Log.d(TAG, "对Complete事件作出响应"); } });复制代码
- 过程解释
- 下面,将通过一个图来解释Buffer()原理和整个例子的结果