RxJava 变换操作符 map flatMap concatMap buffer

demo地址:https://github.com/baiqiantao/RxJavaDemo.git

常用的变换操作符

  • map:【数据类型转换】将被观察者发送的事件转换为另一种类型的事件
  • flatMap:【化解循环嵌套和接口嵌套】将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送
  • concatMap:【有序】与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致
  • flatMapIterable:相当于对 flatMap 的数据进行了二次扁平化
  • buffer:定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射

map

Observable.just(new Date()) // Date 类型
      .map(Date::getTime) // long 类型
      .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
      .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
      .subscribe(this::log);

5

5

1

Observable.just(new Date()) // Date 类型

2

      .map(Date::getTime) // long 类型

3

      .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值

4

      .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型

5

      .subscribe(this::log);

flatMap concatMap flatMapIterable

基础用法:化解循环嵌套

flatMap 使用一个指定的函数对原始 Observable 发射的每一项数据之行相应的变换操作,这个函数返回一个本身也发射数据的 Observable,然后 flatMap 合并这些 Observables 发射的数据,最后将合并后的结果当做它自己的数据序列发射。

Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
      .subscribe(this::log);

4

4

1

Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))

2

      .map(person -> person.loves)

3

      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素

4

      .subscribe(this::log);
篮球,22:56:43 009,true
足球,22:56:43 010,true
排球,22:56:43 010,true
画画,22:56:43 011,true
跳舞,22:56:43 012,true

5

1

篮球,22:56:43 009,true

2

足球,22:56:43 010,true

3

排球,22:56:43 010,true

4

画画,22:56:43 011,true

5

跳舞,22:56:43 012,true

flatMap() 执行的过程:

  • 使用传入的事件对象创建一个 Observable 对象;
  • 并不发送这个 Observable,而是将它激活,于是它开始发送事件;
  • 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable,而这个 Observable 负责将这些事件统一交给 Observer 的回调方法。

注意:如果任何一个通过这个 flatMap 操作产生的单独的 Observable 调用 onError 异常终止了,这个 Observable 自身会立即调用 onError 并终止。例如:

Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
      .subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));

4

4

1

Observable.just(new Person(Arrays.asList("篮球", null, "排球")), new Person(Arrays.asList("画画", "跳舞")))

2

      .map(person -> person.loves)

3

      .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素

4

      .subscribe(this::log, e -> log("onError:" + e.getMessage()), () -> log("onComplete"));
篮球,00:20:14 762,true
onError:The iterator returned a null value,00:20:14 767,true

2

1

篮球,00:20:14 762,true

2

onError:The iterator returned a null value,00:20:14 767,true

flatMap 和 concatMap

concatMap 操作符的功能和 flatMap 非常相似,只不过经过 flatMap 操作变换后,最后输出的序列有可能是交错的(flatMap最后合并结果采用的是 merge 操作符),而 concatMap 最终输出的数据序列和原数据序列是一致的。

long start = System.currentTimeMillis();
Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
      .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
      .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
      .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
      .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒

7

7

1

long start = System.currentTimeMillis();

2

Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))

3

      .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的

4

      .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒

5

Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))

6

      .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的

7

      .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
f:4,23:21:07 944,false   //flatMap后,订阅者首先接收到的事件是【4】而不是【1】
f:5,23:21:07 945,false
f:1,23:21:08 942,false
f:2,23:21:08 943,false
f:3,23:21:08 943,false
f耗时3025,23:21:08 945,false    //flatMap耗时3秒

c:A,23:21:08 949,false   //concatMap后,订阅者首先接收到的事件是【1】
c:B,23:21:08 950,false
c:C,23:21:08 950,false
c:D,23:21:10 953,false
c:E,23:21:10 953,false
c耗时5034,23:21:10 954,false    //concatMap耗时5秒

13

1

f:4,23:21:07 944,false   //flatMap后,订阅者首先接收到的事件是【4】而不是【1】

2

f:5,23:21:07 945,false

3

f:1,23:21:08 942,false

4

f:2,23:21:08 943,false

5

f:3,23:21:08 943,false

6

f耗时3025,23:21:08 945,false    //flatMap耗时3秒

7


8

c:A,23:21:08 949,false   //concatMap后,订阅者首先接收到的事件是【1】

9

c:B,23:21:08 950,false

10

c:C,23:21:08 950,false

11

c:D,23:21:10 953,false

12

c:E,23:21:10 953,false

13

c耗时5034,23:21:10 954,false    //concatMap耗时5秒

扩展用法:化解接口嵌套

可以利用 flatMap 操作符实现网络请求依次依赖,即:第一个接口的返回值包含第二个接口请求需要用到的数据。

首先是两个请求网络的操作:

private Observable<String> firstRequest(String parameter) {
   return Observable.create(emitter -> {
      SystemClock.sleep(2000);//模拟网络请求
      emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
      emitter.onComplete();
   });
}

7

7

1

private Observable<String> firstRequest(String parameter) {

2

   return Observable.create(emitter -> {

3

      SystemClock.sleep(2000);//模拟网络请求

4

      emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));

5

      emitter.onComplete();

6

   });

7

}
private Observable<String> secondRequest(String parameter) {
   return Observable.create(emitter -> {
      SystemClock.sleep(3000);//模拟网络请求
      emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
      emitter.onComplete();
   });
}

7

7

1

private Observable<String> secondRequest(String parameter) {

2

   return Observable.create(emitter -> {

3

      SystemClock.sleep(3000);//模拟网络请求

4

      emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));

5

      emitter.onComplete();

6

   });

7

}

然后可以通过 flatMap 将两者串联起来:

firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
      .subscribeOn(Schedulers.io()) // 在io线程进行网络请求
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
      .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
      .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
      .flatMap(this::secondRequest)//实现多个网络请求依次依赖
      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
      .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒

8

8

1

firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))

2

      .subscribeOn(Schedulers.io()) // 在io线程进行网络请求

3

      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果

4

      .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true

5

      .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求

6

      .flatMap(this::secondRequest)//实现多个网络请求依次依赖

7

      .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果

8

      .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒

打印结果为:

【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true
【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true

2

1

【第一个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,true

2

【第二个网络请求结束,响应为】原始值:23:58:11 220,第一次修改:23:58:13 245,第二次修改:23:58:16 256,true

简化形式的Demo代码为:

Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
      .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
      .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
      .subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())
      .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true

6

6

1

Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名

2

      .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别

3

      .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄

4

      .subscribeOn(Schedulers.io())

5

      .observeOn(AndroidSchedulers.mainThread())

6

      .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true

当然这种情况下使用 concatMap 的效果也是完全一样的,然而因为 concatMap 的核心是用来保证在合并时"有序"的,而这两种情况根本就没涉及到合并,所以这些情况下使用 concatMap是没有任何意义的。

flatMap 和 flatMapIterable

flatMapIterable 与 flatMap 在流程上大体都相同,唯一不同的是,flatMap 是将一个 Observable 转换成多个 Observables,每一个Observable 最后又得返回一个 Observable。而 flatMapIterable 在将一个 Observable 转换成多个 Observables 后,每一个 Observable 只能返回一个 Iterable 而不是另一个 Observable。

案例1:

Observable.just(Arrays.asList("篮球1", "足球1"))
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .subscribe(string -> log("" + string));
Observable.just(Arrays.asList("篮球2", "足球2"))
      .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
      .subscribe(string -> log("" + string));
Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
      .subscribe(string -> log("" + string));

8

8

1

Observable.just(Arrays.asList("篮球1", "足球1"))

2

      .flatMap(Observable::fromIterable) //返回一个 Observable

3

      .subscribe(string -> log("" + string));

4

Observable.just(Arrays.asList("篮球2", "足球2"))

5

      .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable

6

      .subscribe(string -> log("" + string));

7

Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样

8

      .subscribe(string -> log("" + string));
篮球1,01:00:39 493,true
足球1,01:00:39 494,true
篮球2,01:00:39 496,true
足球2,01:00:39 496,true
篮球3,01:00:39 499,true
足球3,01:00:39 499,true

6

1

篮球1,01:00:39 493,true

2

足球1,01:00:39 494,true

3

篮球2,01:00:39 496,true

4

足球2,01:00:39 496,true

5

篮球3,01:00:39 499,true

6

足球3,01:00:39 499,true

案例2:

Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
      .subscribe(array -> log(Arrays.toString(array)));
Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
      .map(person -> person.loves)
      .flatMap(Observable::fromIterable) //返回一个 Observable
      .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
      .subscribe(array -> log(Arrays.toString(array)));

10

10

1

Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))

2

      .map(person -> person.loves)

3

      .flatMap(Observable::fromIterable) //返回一个 Observable

4

      .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable

5

      .subscribe(array -> log(Arrays.toString(array)));

6

Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))

7

      .map(person -> person.loves)

8

      .flatMap(Observable::fromIterable) //返回一个 Observable

9

      .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable

10

      .subscribe(array -> log(Arrays.toString(array)));
[包, 青, 天],01:23:27 376,true
[哈, 哈],01:23:27 376,true
[白, 乾, 涛],01:23:27 377,true
[你, 好],01:23:27 377,true
[广, 州],01:23:27 380,true
[上, 海],01:23:27 380,true
[武, 汉],01:23:27 381,true
[长, 沙],01:23:27 382,true

8

1

[包, 青, 天],01:23:27 376,true

2

[哈, 哈],01:23:27 376,true

3

[白, 乾, 涛],01:23:27 377,true

4

[你, 好],01:23:27 377,true

5

[广, 州],01:23:27 380,true

6

[上, 海],01:23:27 380,true

7

[武, 汉],01:23:27 381,true

8

[长, 沙],01:23:27 382,true

buffer

定期从被观察者发送的事件中获取一定数量的事件并放到缓存区中,然后把这些数据集合打包发射

请注意,如果源ObservableSource发出onError通知,则事件会立即传递而不是首先发到缓冲区[without first emitting the buffer it is in the process of assembling.]。

buffer(count)

每接收到 count 个数据包裹,将这 count 个包裹打包,发送给订阅者

一次订阅2个:

Observable.range(1, 5)
    .buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
    .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成

1

Observable.range(1, 5)

2

    .buffer(2) //缓存区大小,步长==缓存区大小,等价于buffer(count, count)

3

    .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成

一次全部订阅(将所有元素组装到集合中的效果):

Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
x

1

Observable.range(1, 10).buffer(10).subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

buffer(count, skip)

生成的ObservableSource每隔 skip 项就会 emits buffers,每个 buffers 都包含 count 个 items。

队列效果(先进先出):

Observable.range(1, 5)
      .buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)
      .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]

1

Observable.range(1, 5)

2

      .buffer(3, 1) // 缓存区大小,步长(每次获取新事件的数量)

3

      .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]

每次剔除一个效果:

Observable.range(1, 5).buffer(5, 1)
    .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
x

1

Observable.range(1, 5).buffer(5, 1)

2

    .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]

只取奇数个效果:

Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]
x

1

Observable.range(1, 5).buffer(1, 2).subscribe(list -> log(list.toString()));//[1],[3],[5]

buffer(timespan, unit)

持续收集直到指定的每隔时间后,然后发射一次并清空缓存区。

周期性订阅多个结果:

Observable.create(emitter -> {
   for (int i = 0; i < 8; i++) {
      SystemClock.sleep(100);//模拟耗时操作
      emitter.onNext(i);
   }
}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
x

1

Observable.create(emitter -> {

2

   for (int i = 0; i < 8; i++) {

3

      SystemClock.sleep(100);//模拟耗时操作

4

      emitter.onNext(i);

5

   }

6

}).buffer(250, TimeUnit.MICROSECONDS) //等价于 count = Integer.MAX_VALUE

7

      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]

buffer(timespan, unit, count)

当达到指定时间【或】缓冲区中达到指定数量时发射

Observable.create(emitter -> {
   for (int i = 0; i < 8; i++) {
      SystemClock.sleep(100);//每个对象均延迟后再单独发出去
      emitter.onNext(i);
   }
}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程
      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
x

1

Observable.create(emitter -> {

2

   for (int i = 0; i < 8; i++) {

3

      SystemClock.sleep(100);//每个对象均延迟后再单独发出去

4

      emitter.onNext(i);

5

   }

6

}).buffer(250, TimeUnit.MICROSECONDS, 2) //可以指定工作所在的线程

7

      .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]

完整测试案例

public class TransformOperatorActivity extends ListActivity {
	private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());

	protected void onCreate(Bundle savedInstanceState) {
		super.onCreate(savedInstanceState);
		String[] array = {"0、map",
				"1、flatMap 基础用法",
				"2、flatMap和concatMap的区别",
				"3、flatMap 实现多个网络请求依次依赖",
				"4、flatMap 实现多个网络请求依次依赖简化代码",
				"5、flatMapIterable 案例1",
				"6、flatMapIterable 案例2",
				"7、buffer(int count)",
				"8、buffer(count, skip)",
				"9、buffer(timespan, unit, count)",
		};
		setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));
	}

	private int i;

	@Override
	protected void onListItemClick(ListView l, View v, int position, long id) {
		i++;
		switch (position) {
			case 0:
				Observable.just(new Date()) // Date 类型
						.map(Date::getTime) // long 类型
						.map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值
						.map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型
						.subscribe(this::log);
				break;
			case 1:
				Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素
						.subscribe(this::log);
				break;
			case 2:
				long start = System.currentTimeMillis();
				Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))
						.flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的
						.subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒
				Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))
						.concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的
						.subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒
				break;
			case 3:
				firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))
						.subscribeOn(Schedulers.io()) // 在io线程进行网络请求
						.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
						.doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true
						.observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求
						.flatMap(this::secondRequest)//实现多个网络请求依次依赖
						.observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果
						.subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒
				break;
			case 4:
				Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名
						.flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别
						.flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄
						.subscribeOn(Schedulers.io())
						.observeOn(AndroidSchedulers.mainThread())
						.subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true
				break;
			case 5:
				Observable.just(Arrays.asList("篮球1", "足球1"))
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.subscribe(string -> log("" + string));
				Observable.just(Arrays.asList("篮球2", "足球2"))
						.flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable
						.subscribe(string -> log("" + string));
				Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样
						.subscribe(string -> log("" + string));
				break;
			case 6:
				Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable
						.subscribe(array -> log(Arrays.toString(array)));
				Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))
						.map(person -> person.loves)
						.flatMap(Observable::fromIterable) //返回一个 Observable
						.flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable
						.subscribe(array -> log(Arrays.toString(array)));
				break;
			case 7:
				if (i % 2 == 0) {
					Observable.range(1, 5).buffer(2)  //缓存区大小,步长==缓存区大小,等价于buffer(count, count)
							.subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成
				} else {
					Observable.range(1, 10).buffer(10)  //将所有元素组装到集合中的效果
							.subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
				}
				break;
			case 8:
				if (i % 3 == 0) {
					Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)
							.subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]
				} else if (i % 3 == 1) {
					Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果
							.subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]
				} else {
					Observable.range(1, 5).buffer(1, 2) //只取奇数个效果
							.subscribe(list -> log(list.toString()));//[1],[3],[5]
				}
				break;
			case 9:
				Observable<Integer> observable = Observable.create(emitter -> {
					for (int i = 0; i < 8; i++) {
						SystemClock.sleep(100);//模拟耗时操作
						emitter.onNext(i);
					}
					emitter.onComplete();
				});
				if (i % 3 == 0) { //周期性订阅多个结果:
					observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE
							.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]
				} else { //当达到指定时间【或】缓冲区中达到指定数量时发射
					observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程
							.subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]
				}
				break;
		}
	}

	private Observable<String> firstRequest(String parameter) {
		return Observable.create(emitter -> {
			SystemClock.sleep(2000);//模拟网络请求
			emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
			emitter.onComplete();
		});
	}

	private Observable<String> secondRequest(String parameter) {
		return Observable.create(emitter -> {
			SystemClock.sleep(3000);//模拟网络请求
			emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));
			emitter.onComplete();
		});
	}

	private void log(String s) {
		String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());
		Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));
	}
}

148

1

public class TransformOperatorActivity extends ListActivity {

2

    private static Format FORMAT = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault());

3

    

4

    protected void onCreate(Bundle savedInstanceState) {

5

        super.onCreate(savedInstanceState);

6

        String[] array = {"0、map",

7

                "1、flatMap 基础用法",

8

                "2、flatMap和concatMap的区别",

9

                "3、flatMap 实现多个网络请求依次依赖",

10

                "4、flatMap 实现多个网络请求依次依赖简化代码",

11

                "5、flatMapIterable 案例1",

12

                "6、flatMapIterable 案例2",

13

                "7、buffer(int count)",

14

                "8、buffer(count, skip)",

15

                "9、buffer(timespan, unit, count)",

16

        };

17

        setListAdapter(new ArrayAdapter<>(this, android.R.layout.simple_list_item_1, Arrays.asList(array)));

18

    }

19

    

20

    private int i;

21

    

22

    @Override

23

    protected void onListItemClick(ListView l, View v, int position, long id) {

24

        i++;

25

        switch (position) {

26

            case 0:

27

                Observable.just(new Date()) // Date 类型

28

                        .map(Date::getTime) // long 类型

29

                        .map(time -> time + 1000 * 60 * 60)// 改变 long 类型时间的值

30

                        .map(time -> new SimpleDateFormat("HH:mm:ss", Locale.getDefault()).format(new Date(time))) //String 类型

31

                        .subscribe(this::log);

32

                break;

33

            case 1:

34

                Observable.just(new Person(Arrays.asList("篮球", "足球", "排球")), new Person(Arrays.asList("画画", "跳舞")))

35

                        .map(person -> person.loves)

36

                        .flatMap(Observable::fromIterable) //fromIterable:逐个发送集合中的元素

37

                        .subscribe(this::log);

38

                break;

39

            case 2:

40

                long start = System.currentTimeMillis();

41

                Observable.just(Arrays.asList(1, 2, 3), Arrays.asList(4, 5))

42

                        .flatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//flatMap是无序的

43

                        .subscribe((s -> log("f:" + s)), e -> log("f"), () -> log("f耗时" + (System.currentTimeMillis() - start))); //3秒

44

                Observable.just(Arrays.asList("A", "B", "C"), Arrays.asList("D", "E"))

45

                        .concatMap(list -> Observable.fromIterable(list).delay(list.size(), TimeUnit.SECONDS))//concatMap是有序的

46

                        .subscribe(s -> log("c:" + s), e -> log("c"), () -> log("c耗时" + (System.currentTimeMillis() - start))); //5秒

47

                break;

48

            case 3:

49

                firstRequest("原始值:" + FORMAT.format(new Date(System.currentTimeMillis())))

50

                        .subscribeOn(Schedulers.io()) // 在io线程进行网络请求

51

                        .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果

52

                        .doOnNext(response -> log("【第一个网络请求结束,响应为】" + response))//true

53

                        .observeOn(Schedulers.io()) // 回到 io 线程去处理下一个网络请求

54

                        .flatMap(this::secondRequest)//实现多个网络请求依次依赖

55

                        .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理请求结果

56

                        .subscribe(string -> log("【第二个网络请求结束,响应为】" + string));//true,5 秒

57

                break;

58

            case 4:

59

                Observable.just("包青天").delay(1000, TimeUnit.MILLISECONDS) //第一个网络请求,返回姓名

60

                        .flatMap(s -> Observable.just(s + ",男").delay(1000, TimeUnit.MILLISECONDS)) //第二个网络请求,返回性别

61

                        .flatMap(s -> Observable.just(s + ",28岁").delay(1000, TimeUnit.MILLISECONDS)) //第三个网络请求,返回年龄

62

                        .subscribeOn(Schedulers.io())

63

                        .observeOn(AndroidSchedulers.mainThread())

64

                        .subscribe(this::log); //包青天,男,28岁,耗时:3058毫秒,true

65

                break;

66

            case 5:

67

                Observable.just(Arrays.asList("篮球1", "足球1"))

68

                        .flatMap(Observable::fromIterable) //返回一个 Observable

69

                        .subscribe(string -> log("" + string));

70

                Observable.just(Arrays.asList("篮球2", "足球2"))

71

                        .flatMapIterable(list -> list) //返回一个 Iterable 而不是另一个 Observable

72

                        .subscribe(string -> log("" + string));

73

                Observable.fromIterable(Arrays.asList("篮球3", "足球3")) //和上面两种方式的结果一样

74

                        .subscribe(string -> log("" + string));

75

                break;

76

            case 6:

77

                Observable.just(new Person(Arrays.asList("包青天", "哈哈")), new Person(Arrays.asList("白乾涛", "你好")))

78

                        .map(person -> person.loves)

79

                        .flatMap(Observable::fromIterable) //返回一个 Observable

80

                        .flatMap(string -> Observable.fromArray(string.toCharArray())) //返回一个 Observable

81

                        .subscribe(array -> log(Arrays.toString(array)));

82

                Observable.just(new Person(Arrays.asList("广州", "上海")), new Person(Arrays.asList("武汉", "长沙")))

83

                        .map(person -> person.loves)

84

                        .flatMap(Observable::fromIterable) //返回一个 Observable

85

                        .flatMapIterable(string -> Arrays.asList(string.toCharArray())) //返回一个 Iterable 而不是另一个 Observable

86

                        .subscribe(array -> log(Arrays.toString(array)));

87

                break;

88

            case 7:

89

                if (i % 2 == 0) {

90

                    Observable.range(1, 5).buffer(2)  //缓存区大小,步长==缓存区大小,等价于buffer(count, count)

91

                            .subscribe(list -> log(list.toString()), t -> log(""), () -> log("完成")); //[1, 2],[3, 4],[5],完成

92

                } else {

93

                    Observable.range(1, 10).buffer(10)  //将所有元素组装到集合中的效果

94

                            .subscribe(list -> log(list.toString())); //[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

95

                }

96

                break;

97

            case 8:

98

                if (i % 3 == 0) {

99

                    Observable.range(1, 5).buffer(3, 1) // 缓存区大小,步长;队列效果(先进先出)

100

                            .subscribe(list -> log(list.toString()));//[1, 2, 3],[2, 3, 4],[3, 4, 5],[4, 5],[5]

101

                } else if (i % 3 == 1) {

102

                    Observable.range(1, 5).buffer(5, 1) //每次剔除一个效果

103

                            .subscribe(list -> log(list.toString()));//[1, 2, 3, 4, 5],[2, 3, 4, 5],[3, 4, 5],[4, 5],[5]

104

                } else {

105

                    Observable.range(1, 5).buffer(1, 2) //只取奇数个效果

106

                            .subscribe(list -> log(list.toString()));//[1],[3],[5]

107

                }

108

                break;

109

            case 9:

110

                Observable<Integer> observable = Observable.create(emitter -> {

111

                    for (int i = 0; i < 8; i++) {

112

                        SystemClock.sleep(100);//模拟耗时操作

113

                        emitter.onNext(i);

114

                    }

115

                    emitter.onComplete();

116

                });

117

                if (i % 3 == 0) { //周期性订阅多个结果:

118

                    observable.buffer(250, TimeUnit.MILLISECONDS) //等价于 count = Integer.MAX_VALUE

119

                            .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[2, 3],[4, 5, 6],[7]

120

                } else { //当达到指定时间【或】缓冲区中达到指定数量时发射

121

                    observable.buffer(250, TimeUnit.MILLISECONDS, 2) //可以指定工作所在的线程

122

                            .subscribe(list -> log("缓存区中事件:" + list.toString())); //[0, 1],[],[2, 3],[],[4, 5],[6],[7]

123

                }

124

                break;

125

        }

126

    }

127

    

128

    private Observable<String> firstRequest(String parameter) {

129

        return Observable.create(emitter -> {

130

            SystemClock.sleep(2000);//模拟网络请求

131

            emitter.onNext(parameter + ",第一次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));

132

            emitter.onComplete();

133

        });

134

    }

135

    

136

    private Observable<String> secondRequest(String parameter) {

137

        return Observable.create(emitter -> {

138

            SystemClock.sleep(3000);//模拟网络请求

139

            emitter.onNext(parameter + ",第二次修改:" + FORMAT.format(new Date(System.currentTimeMillis())));

140

            emitter.onComplete();

141

        });

142

    }

143

    

144

    private void log(String s) {

145

        String date = new SimpleDateFormat("HH:mm:ss SSS", Locale.getDefault()).format(new Date());

146

        Log.i("【bqt】", s + "," + date + "," + (Looper.myLooper() == Looper.getMainLooper()));

147

    }

148

}

2018-9-18

原文地址:https://www.cnblogs.com/baiqiantao/p/9688484.html

时间: 2024-10-08 11:36:19

RxJava 变换操作符 map flatMap concatMap buffer的相关文章

Android RxJava使用介绍(三) RxJava的操作符

上一篇文章已经具体解说了RxJava的创建型操作符.本片文章将继续解说RxJava操作符.包括: Transforming Observables(Observable的转换操作符) Filtering Observables(Observable的过滤操作符) Transforming Observables(Observable的转换操作符) buffer操作符 buffer操作符周期性地收集源Observable产生的结果到列表中.并把这个列表提交给订阅者,订阅者处理后,清空buffer列

Android RxJava使用介绍(二) RxJava的操作符

上一篇文章我们通过一个简单的例子来给大家展示了RxJava的基本用法,相信大家已经对RxJava有了大概的了解,由于上篇文章对RxJava的使用介绍都是点到为止,并没有进行深入展开,也许你对RxJava有种名不副实的感觉.OK,下面我们就进入正题,一步步的揭开RxJava的神秘面纱! 一个例子 RxJava的强大之处,在于它提供了非常丰富且功能强悍的操作符,通过使用和组合这些操作符,你几乎能完成所有你想要完成的任务,举个例子如下: 现在有一个需求:app启动时显示一张图片(一般是app的logo

Scala learning(2): map, flatMap, filter与For表达式

本文叙述Collections里最常见的三种操作map, flatMap, filter,与For表达式的关系. List对三种方法的实现 map在List的实现: abstract class List[+T] { def map[U](f: T => U): List[U] = this match { case x :: xs => f(x) :: xs.map(f) case Nil => Nil } } flatMap在List的实现: abstract class List[

RxJava defer操作符实现代码支持链式调用

前言 现在越来越多Android开发者使用到RxJava,在Android使用RxJava主要有如下好处: 1,轻松切换线程.以前我们切换线程主要使用Handler等手段来做. 2,轻松解决回调的嵌套问题.现在的app业务逻辑越来越复杂,多的时候3,4层回调嵌套,使得代码可维护性变得很差.RxJava链式调用使得这些调用变得扁平化. 随着RxJava的流行,越来越多的开源项目开始支持RxJava,像Retrofit.GreenDao等.这些开源项目支持RxJava使得我们解决复杂业务变得非常方便

Android 勤用RXJava compose操作符消除重复代码

相信小伙伴在使用RXJava与Retrofit请求网络时,都有遇到过这样的场景,在IO线程请求网络解析数据,接着返回主线程setData.更新View试图,那么也肯定熟悉下面这几句代码: .subscribeOn(Schedulers.io()) .unsubscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); 如果网络请求的次数比较少, 作为一名不拘小节(懒癌)的

Android RxJava使用介绍(二) RxJava的操作符上

上一篇文章我们通过一个简单的例子来给大家展示了RxJava的基本用法,相信大家已经对RxJava有了大概的了解,由于上篇文章对RxJava的使用介绍都是点到为止,并没有进行深入展开,也许你对RxJava有种名不副实的感觉.OK,下面我们就进入正题,一步步的揭开RxJava的神秘面纱! 一个例子 RxJava的强大之处,在于它提供了非常丰富且功能强悍的操作符,通过使用和组合这些操作符,你几乎能完成所有你想要完成的任务,举个例子如下: 现在有一个需求:app启动时显示一张图片(一般是app的logo

Android RxJava使用介绍(四) RxJava的操作符

本篇文章继续介绍以下类型的操作符 Combining Observables(Observable的组合操作符) Error Handling Operators(Observable的错误处理操作符) Combining Observables(Observable的组合操作符) combineLatest操作符 combineLatest操作符把两个Observable产生的结果进行合并,合并的结果组成一个新的Observable.这两个Observable中任意一个Observable产生

Swift --&gt; Map &amp; FlatMap

转载自:https://segmentfault.com/a/1190000004050907 Map map函数能够被数组调用,它接受一个闭包作为参数,作用于数组中的每个元素.闭包返回一个变换后的元素,接着将所有这些变换后的元素组成一个新的数组. 这听起来有些复杂,但它是相当简单的.想象你拥有一个string类型的数组: let testArray = ["test1","test1234","","test56"] map

SparkContext, map, flatMap, zip以及例程wordcount

SparkContext 通常作为入口函数,可以创建并返回一个RDD. 如把Spark集群当作服务端那Spark Driver就是客户端,SparkContext则是客户端的核心: 如注释所说 SparkContext用于连接Spark集群.创建RDD.累加器(accumlator).广播变量(broadcast variables) map操作: 会对每一条输入进行指定的操作,然后为每一条输入返回一个对象: flatMap操作: "先映射后扁平化" 操作1:同map函数一样:对每一条