demo地址:https://github.com/baiqiantao/RxJavaDemo.git
常用的变换操作符
- map:【数据类型转换】将被观察者发送的事件转换为另一种类型的事件
- flatMap:【化解循环嵌套和接口嵌套】将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送
- concatMap:【有序】与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致
- flatMapIterable:相当于对 flatMap 的数据进行了二次扁平化
- buffer:定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射
map
Observable.just(new Date()) // Date 类型
.map(Date::getTime) // long 类型
.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
.subscribe(this::log);
5
5
1
Observable.just(new Date()) // Date 类型
2
.map(Date::getTime) // long 类型
3
.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
4
.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
5
.subscribe(this::log);
flatMap concatMap flatMapIterable
基础用法:化解循环嵌套
flatMap 使用一个指定的函数对原始 Observable 发射的每一项数据之行相应的变换操作,这个函数返回一个本身也发射数据的 Observable,然后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果当做它自己的数据序列发射。
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
.subscribe(this::log);
4
4
1
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
2
.map(person -> person.loves)
3
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
4
.subscribe(this::log);
篮球,22:56:43 009,true
足球,22:56:43 010,true
排球,22:56:43 010,true
画画,22:56:43 011,true
跳舞,22:56:43 012,true
5
1
篮球,22:56:43 009,true
2
足球,22:56:43 010,true
3
排球,22:56:43 010,true
4
画画,22:56:43 011,true
5
跳舞,22:56:43 012,true
flatMap() 执行的过程:
- 使用传入的事件对象创建一个 Observable 对象;
- 并不发送这个 Observable,而是将它激活,于是它开始发送事件;
- 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable,而这个 Observable 负责将这些事件统一交给 Observer 的回调方法。
注意:如果任何一个通过这个 flatMap 操作产生的单独的 Observable 调用 onError 异常终止了,这个 Observable 自身会立即调用 onError 并终止。例如:
Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
.subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));
4
4
1
Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))
2
.map(person -> person.loves)
3
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
4
.subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));
篮球,00:20:14 762,true
onError:The iterator returned a null value,00:20:14 767,true
2
1
篮球,00:20:14 762,true
2
onError:The iterator returned a null value,00:20:14 767,true
flatMap 和 concatMap
concatMap 操作符的功能和 flatMap 非常相似,只不过经过 flatMap 操作变换后,最后输出的序列有可能是交错的(flatMap最后合并结果采用的是 merge 操作符),而 concatMap 最终输出的数据序列和原数据序列是一致的。
long start = System.currentTimeMillis();
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
7
7
1
long start = System.currentTimeMillis();
2
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
3
.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
4
.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
5
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
6
.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
7
.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
f:4,23:21:07 944,false //flatMap后,订阅者首先接收到的事件是【4】而不是【1】
f:5,23:21:07 945,false
f:1,23:21:08 942,false
f:2,23:21:08 943,false
f:3,23:21:08 943,false
f耗时3025,23:21:08 945,false //flatMap耗时3秒
c:A,23:21:08 949,false //concatMap后,订阅者首先接收到的事件是【1】
c:B,23:21:08 950,false
c:C,23:21:08 950,false
c:D,23:21:10 953,false
c:E,23:21:10 953,false
c耗时5034,23:21:10 954,false //concatMap耗时5秒
13
1
f:4,23:21:07 944,false //flatMap后,订阅者首先接收到的事件是【4】而不是【1】
2
f:5,23:21:07 945,false
3
f:1,23:21:08 942,false
4
f:2,23:21:08 943,false
5
f:3,23:21:08 943,false
6
f耗时3025,23:21:08 945,false //flatMap耗时3秒
7
8
c:A,23:21:08 949,false //concatMap后,订阅者首先接收到的事件是【1】
9
c:B,23:21:08 950,false
10
c:C,23:21:08 950,false
11
c:D,23:21:10 953,false
12
c:E,23:21:10 953,false
13
c耗时5034,23:21:10 954,false //concatMap耗时5秒
扩展用法:化解接口嵌套
可以利用 flatMap 操作符实现网络请求依次依赖,即:第一个接口的返回值包含第二个接口请求需要用到的数据。
首先是两个请求网络的操作:
private Observable<String> firstRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(2000);//模拟网络请求
emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
7
7
1
private Observable<String> firstRequest(String parameter) {
2
return Observable.create(emitter -> {
3
SystemClock.sleep(2000);//模拟网络请求
4
emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
5
emitter.onComplete();
6
});
7
}
private Observable<String> secondRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(3000);//模拟网络请求
emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
7
7
1
private Observable<String> secondRequest(String parameter) {
2
return Observable.create(emitter -> {
3
SystemClock.sleep(3000);//模拟网络请求
4
emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
5
emitter.onComplete();
6
});
7
}
然后可以通过 flatMap 将两者串联起来:
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
.flatMap(this::secondRequest)//实现多个网络请求依次依赖
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
8
8
1
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
2
.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
3
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
4
.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
5
.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
6
.flatMap(this::secondRequest)//实现多个网络请求依次依赖
7
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
8
.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
打印结果为:
【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true
【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true
2
1
【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true
2
【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true
简化形式的Demo代码为:
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
6
6
1
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
2
.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
3
.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
4
.subscribeOn(Schedulers.io())
5
.observeOn(AndroidSchedulers.mainThread())
6
.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
当然这种情况下使用 concatMap 的效果也是完全一样的,然而因为 concatMap 的核心是用来保证在合并时"有序"的,而这两种情况根本就没涉及到合并,所以这些情况下使用 concatMap是没有任何意义的。
flatMap 和 flatMapIterable
flatMapIterable 与 flatMap 在流程上大体都相同,唯一不同的是,flatMap 是将一个 Observable 转换成多个 Observables,每一个Observable 最后又得返回一个 Observable。而 flatMapIterable 在将一个 Observable 转换成多个 Observables 后,每一个 Observable 只能返回一个 Iterable 而不是另一个 Observable。
案例1:
Observable.just(Arrays.asList("篮球1", "足球1"))
.flatMap(Observable::fromIterable) //返回一个 Observable
.subscribe(string -> log("" + string));
Observable.just(Arrays.asList("篮球2", "足球2"))
.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
.subscribe(string -> log("" + string));
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
.subscribe(string -> log("" + string));
8
8
1
Observable.just(Arrays.asList("篮球1", "足球1"))
2
.flatMap(Observable::fromIterable) //返回一个 Observable
3
.subscribe(string -> log("" + string));
4
Observable.just(Arrays.asList("篮球2", "足球2"))
5
.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
6
.subscribe(string -> log("" + string));
7
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
8
.subscribe(string -> log("" + string));
篮球1,01:00:39 493,true
足球1,01:00:39 494,true
篮球2,01:00:39 496,true
足球2,01:00:39 496,true
篮球3,01:00:39 499,true
足球3,01:00:39 499,true
6
1
篮球1,01:00:39 493,true
2
足球1,01:00:39 494,true
3
篮球2,01:00:39 496,true
4
足球2,01:00:39 496,true
5
篮球3,01:00:39 499,true
6
足球3,01:00:39 499,true
案例2:
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一个 Observable
.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
.subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一个 Observable
.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
.subscribe(array -> log(Arrays.toString(array)));
10
10
1
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
2
.map(person -> person.loves)
3
.flatMap(Observable::fromIterable) //返回一个 Observable
4
.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
5
.subscribe(array -> log(Arrays.toString(array)));
6
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
7
.map(person -> person.loves)
8
.flatMap(Observable::fromIterable) //返回一个 Observable
9
.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
10
.subscribe(array -> log(Arrays.toString(array)));
[包, 青, 天],01:23:27 376,true
[哈, 哈],01:23:27 376,true
[白, 乾, 涛],01:23:27 377,true
[你, 好],01:23:27 377,true
[广, 州],01:23:27 380,true
[上, 海],01:23:27 380,true
[武, 汉],01:23:27 381,true
[长, 沙],01:23:27 382,true
8
1
[包, 青, 天],01:23:27 376,true
2
[哈, 哈],01:23:27 376,true
3
[白, 乾, 涛],01:23:27 377,true
4
[你, 好],01:23:27 377,true
5
[广, 州],01:23:27 380,true
6
[上, 海],01:23:27 380,true
7
[武, 汉],01:23:27 381,true
8
[长, 沙],01:23:27 382,true
buffer
定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射
请注意,如果源ObservableSource发出onError通知,则事件会立即传递而不是首先发到缓冲区[without first emitting the buffer it is in the process of assembling.]。
buffer(count)
每接收到 count 个数据包裹,将这 count 个包裹打包,发送给订阅者
一次订阅2个:
Observable.range(1, 5)
.buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
1
Observable.range(1, 5)
2
.buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
3
.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
一次全部订阅(将所有元素组装到集合中的效果):
Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
x
1
Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
buffer(count, skip)
生成的ObservableSource每隔 skip 项就会 emits buffers,每个 buffers 都包含 count 个 items。
队列效果(先进先出):
Observable.range(1, 5)
.buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)
.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
1
Observable.range(1, 5)
2
.buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)
3
.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
每次剔除一个效果:
Observable.range(1, 5).buffer(5, 1)
.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
x
1
Observable.range(1, 5).buffer(5, 1)
2
.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
只取奇数个效果:
Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]
x
1
Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]
buffer(timespan, unit)
持续收集直到指定的每隔时间后,然后发射一次并清空缓存区。
周期性订阅多个结果:
Observable.create(emitter -> {
for (int i = 0; i < 8; i++) {
SystemClock.sleep(100);//模拟耗时操作
emitter.onNext(i);
}
}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
x
1
Observable.create(emitter -> {
2
for (int i = 0; i < 8; i++) {
3
SystemClock.sleep(100);//模拟耗时操作
4
emitter.onNext(i);
5
}
6
}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE
7
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
buffer(timespan, unit, count)
当达到指定时间【或】缓冲区中达到指定数量时发射
Observable.create(emitter -> {
for (int i = 0; i < 8; i++) {
SystemClock.sleep(100);//每个对象均延迟后再单独发出去
emitter.onNext(i);
}
}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
x
1
Observable.create(emitter -> {
2
for (int i = 0; i < 8; i++) {
3
SystemClock.sleep(100);//每个对象均延迟后再单独发出去
4
emitter.onNext(i);
5
}
6
}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程
7
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
完整测试案例
public class TransformOperatorActivity extends ListActivity {
private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
String[] array = {"0、map",
"1、flatMap 基础用法",
"2、flatMap和concatMap的区别",
"3、flatMap 实现多个网络请求依次依赖",
"4、flatMap 实现多个网络请求依次依赖简化代码",
"5、flatMapIterable 案例1",
"6、flatMapIterable 案例2",
"7、buffer(int count)",
"8、buffer(count, skip)",
"9、buffer(timespan, unit, count)",
};
setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));
}
private int i;
@Override
protected void onListItemClick(ListView l, View v, int position, long id) {
i++;
switch (position) {
case 0:
Observable.just(new Date()) // Date 类型
.map(Date::getTime) // long 类型
.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
.subscribe(this::log);
break;
case 1:
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
.subscribe(this::log);
break;
case 2:
long start = System.currentTimeMillis();
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
break;
case 3:
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
.flatMap(this::secondRequest)//实现多个网络请求依次依赖
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
break;
case 4:
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
break;
case 5:
Observable.just(Arrays.asList("篮球1", "足球1"))
.flatMap(Observable::fromIterable) //返回一个 Observable
.subscribe(string -> log("" + string));
Observable.just(Arrays.asList("篮球2", "足球2"))
.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
.subscribe(string -> log("" + string));
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
.subscribe(string -> log("" + string));
break;
case 6:
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一个 Observable
.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
.subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
.map(person -> person.loves)
.flatMap(Observable::fromIterable) //返回一个 Observable
.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
.subscribe(array -> log(Arrays.toString(array)));
break;
case 7:
if (i % 2 == 0) {
Observable.range(1, 5).buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
} else {
Observable.range(1, 10).buffer(10) //将所有元素组装到集合中的效果
.subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
}
break;
case 8:
if (i % 3 == 0) {
Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)
.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
} else if (i % 3 == 1) {
Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果
.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
} else {
Observable.range(1, 5).buffer(1, 2) //只取奇数个效果
.subscribe(list -> log(list.toString()));//[1],[3],[5]
}
break;
case 9:
Observable<Integer> observable = Observable.create(emitter -> {
for (int i = 0; i < 8; i++) {
SystemClock.sleep(100);//模拟耗时操作
emitter.onNext(i);
}
emitter.onComplete();
});
if (i % 3 == 0) { //周期性订阅多个结果:
observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
} else { //当达到指定时间【或】缓冲区中达到指定数量时发射
observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
}
break;
}
}
private Observable<String> firstRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(2000);//模拟网络请求
emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
private Observable<String> secondRequest(String parameter) {
return Observable.create(emitter -> {
SystemClock.sleep(3000);//模拟网络请求
emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
emitter.onComplete();
});
}
private void log(String s) {
String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());
Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
}
}
148
1
public class TransformOperatorActivity extends ListActivity {
2
private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());
3
4
protected void onCreate(Bundle savedInstanceState) {
5
super.onCreate(savedInstanceState);
6
String[] array = {"0、map",
7
"1、flatMap 基础用法",
8
"2、flatMap和concatMap的区别",
9
"3、flatMap 实现多个网络请求依次依赖",
10
"4、flatMap 实现多个网络请求依次依赖简化代码",
11
"5、flatMapIterable 案例1",
12
"6、flatMapIterable 案例2",
13
"7、buffer(int count)",
14
"8、buffer(count, skip)",
15
"9、buffer(timespan, unit, count)",
16
};
17
setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));
18
}
19
20
private int i;
21
22
@Override
23
protected void onListItemClick(ListView l, View v, int position, long id) {
24
i++;
25
switch (position) {
26
case 0:
27
Observable.just(new Date()) // Date 类型
28
.map(Date::getTime) // long 类型
29
.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
30
.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
31
.subscribe(this::log);
32
break;
33
case 1:
34
Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
35
.map(person -> person.loves)
36
.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
37
.subscribe(this::log);
38
break;
39
case 2:
40
long start = System.currentTimeMillis();
41
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
42
.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
43
.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
44
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
45
.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
46
.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
47
break;
48
case 3:
49
firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
50
.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
51
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
52
.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
53
.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
54
.flatMap(this::secondRequest)//实现多个网络请求依次依赖
55
.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
56
.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
57
break;
58
case 4:
59
Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
60
.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
61
.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
62
.subscribeOn(Schedulers.io())
63
.observeOn(AndroidSchedulers.mainThread())
64
.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
65
break;
66
case 5:
67
Observable.just(Arrays.asList("篮球1", "足球1"))
68
.flatMap(Observable::fromIterable) //返回一个 Observable
69
.subscribe(string -> log("" + string));
70
Observable.just(Arrays.asList("篮球2", "足球2"))
71
.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
72
.subscribe(string -> log("" + string));
73
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
74
.subscribe(string -> log("" + string));
75
break;
76
case 6:
77
Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
78
.map(person -> person.loves)
79
.flatMap(Observable::fromIterable) //返回一个 Observable
80
.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
81
.subscribe(array -> log(Arrays.toString(array)));
82
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
83
.map(person -> person.loves)
84
.flatMap(Observable::fromIterable) //返回一个 Observable
85
.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
86
.subscribe(array -> log(Arrays.toString(array)));
87
break;
88
case 7:
89
if (i % 2 == 0) {
90
Observable.range(1, 5).buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
91
.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
92
} else {
93
Observable.range(1, 10).buffer(10) //将所有元素组装到集合中的效果
94
.subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
95
}
96
break;
97
case 8:
98
if (i % 3 == 0) {
99
Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)
100
.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
101
} else if (i % 3 == 1) {
102
Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果
103
.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
104
} else {
105
Observable.range(1, 5).buffer(1, 2) //只取奇数个效果
106
.subscribe(list -> log(list.toString()));//[1],[3],[5]
107
}
108
break;
109
case 9:
110
Observable<Integer> observable = Observable.create(emitter -> {
111
for (int i = 0; i < 8; i++) {
112
SystemClock.sleep(100);//模拟耗时操作
113
emitter.onNext(i);
114
}
115
emitter.onComplete();
116
});
117
if (i % 3 == 0) { //周期性订阅多个结果:
118
observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE
119
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
120
} else { //当达到指定时间【或】缓冲区中达到指定数量时发射
121
observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程
122
.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
123
}
124
break;
125
}
126
}
127
128
private Observable<String> firstRequest(String parameter) {
129
return Observable.create(emitter -> {
130
SystemClock.sleep(2000);//模拟网络请求
131
emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
132
emitter.onComplete();
133
});
134
}
135
136
private Observable<String> secondRequest(String parameter) {
137
return Observable.create(emitter -> {
138
SystemClock.sleep(3000);//模拟网络请求
139
emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
140
emitter.onComplete();
141
});
142
}
143
144
private void log(String s) {
145
String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());
146
Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
147
}
148
}
2018-9-18
原文地址:https://www.cnblogs.com/baiqiantao/p/9688484.html