Rxjava2 Observable的数据变换详解及实例(一)

目录

  • 简要:

    • 1.1 buffer(count)
    • 1.2 buffer(boundary)
    • 1.3 buffer(count, skip)
    • 1.4 buffer(timespan, TimeUnit)
    • 1.5 buffer(timespan, TimeUnit, count)
    • 1.6 buffer(timespan, timeskip, TimeUnit)
    • 1.7 buffer(bufferClosingSelector)
  • 2. Map
  • 3. FlatMap
    • 3.1 flatMap(mapper)
    • 3.2 flatMap(mapper, maxConcurrency)
    • 3.3 flatMap(mapper, delayErrors)
    • 3.4 flatMapIterable(mapper)
    • 3.5 flatMapIterable(mapper, resultSelector)
  • 4. ConcatMap
  • 5. SwitchMap
  • 接续:

简要:

需求了解:

对于 Observable 发射的数据有的时候可能不满足我们的要求,或者需要转化为其他类型的数据,比如:缓存,数据类型转化,数据拦截等。此时可以使用 Rx 中的一些对于数据操作的操作进行数据的变换,方便我们的开发。

执行变换的操作方法:

  • Buffer:它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
  • Map:对序列的每一项都应用一个函数来变换Observable发射的数据序列
  • FlatMap,FlatMapIterable,ConcatMap:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平铺化的放进一个单独的 Observable
  • SwitchMap:将Observable发射的数据集合变换为Observables集合,然后只发射这些Observables最近发射的数据
  • Window:定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项
  • GroupBy:将Observable分拆为Observable集合,将原始Observable发射的数据按 Key
    分组,每一个Observable发射一组不同的数据
  • Scan:对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值
  • Cast:在发射之前强制将Observable发射的所有数据转换为指定类型

    1. Buffer

    定期收集Observable的数据放进一个数据包裹(缓存),然后发射这些数据包裹,而不是一次发射一个值。

Buffer 操作符将一个Observable变换为另一个,原来的Observable正常发射数据,变换产生 的Observable发射这些数据的缓存集合。 Buffer 操作符在很多语言特定的实现中有很多种变 体,它们在如何缓存这个问题上存在区别。

Window 操作符与 Buffer 类似,但是它在发射之前把收集到的数据放进单独的Observable, 而不是放进一个数据结构。

注意: 如果原来的Observable发射了一个 onError 通知, Buffer 会立即传递这个通知,而不是首先发射缓存的数据,即使在这之前缓存中包含了原始Observable发射的数据。

在RxJava中的一些 Buffer 的操作如下:

1.1 buffer(count)

以列表(List)的形式发射非重叠的缓存,每一个缓存至多包含来自原始 Observable 的 count 项数据(最后发射的列表数据可能少于count项)。

实例代码:

    // 1. buffer(count)
    // 以列表(List)的形式发射非重叠的缓存,
    // 每一个缓存至多包含来自原始 Observable的count项数据(最后发射的列表数据可能少于count项)
    Observable.range(1, 10)
        .buffer(3)
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(1) accept: " + t);
            }
        });

输出:

--> bufferr(1) accept: [1, 2, 3]
--> bufferr(1) accept: [4, 5, 6]
--> bufferr(1) accept: [7, 8, 9]
--> bufferr(1) accept: [10]

Javadoc: buffer(count)

1.2 buffer(boundary)

开始创建一个List 收集原始 Observable 数据,监视一个名叫 boundary 的Observable,每当这个Observable发射了一个值,它就创建一个新的 List 开始收集来自原始Observable的数据并发射原来已经收集数据的 List, 当 boundary Observable 发送了完成通知,会将此时还未发送的 List 发送。

注意: 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始 Observable 数据。

实例代码:

    // 2. buffer(boundary) 监视一个名叫boundary的Observable,
    // 开始创建一个List收集原始 Observable 数据,监视一个名叫boundary的Observable,
    // 每当这个Observable发射了一个值,它就创建一个新的List开始收集来自原始Observable的数据并发射原来已经收集数据的List,
    // 当boundary发送了完成通知,会将此时还未发送的 List 发送。
    // 所有发送的 List 可能没有收集到数据,此时数据的收集可能并不会完整收集所有原始Observable数据。
    Observable.range(1, 10000)
        .buffer(Observable.timer(1, TimeUnit.MILLISECONDS))         // 1毫秒后开始接受原始数据
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> accept(2): " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> accept(2): 2858
--> accept(2): 5471

Javadoc: buffer(boundary)

1.3 buffer(count, skip)

从原始Observable的第一项数据开始创建新的缓存,此后每当收 到 skip 项数据,用 count 项数据填充缓存:开头的一项和后续的 count-1 项,它以列表 (List)的形式发射缓存,取决于 count 和 skip 的值,这些缓存可能会有重叠部分(比如skip < count时),也可能会有间隙(比如skip > count时)。

解析: 在指定的数据序列中移动指针来获取缓存数据:指针每次移动 skip 个数据长度,每次缓存指针位置及后面count个数据,指针初始位置在原始数据的第一个(存在的情况下)。

实例代码:

    // 3. buffer(int count, int skip)
    // 在指定的数据中移动指针来获取缓存数据:指针每次移动1个数据长度,每次缓存3个数据
    Observable.range(1, 5)
        .buffer(3, 1)
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(3) accept: " + t);
            }
        });

输出:

--> bufferr(3) accept: [1, 2, 3]
--> bufferr(3) accept: [2, 3, 4]
--> bufferr(3) accept: [3, 4, 5]
--> bufferr(3) accept: [4, 5]
--> bufferr(3) accept: [5]

Javadoc: buffer(count, skip)

1.4 buffer(timespan, TimeUnit)

定期以 List 的形式发射新的数据,在每个时间段,收集来自原始 Observable 的数据(从前面一个数据包裹之后,或者如果是第一个数据包裹,从有观察者订阅原来的 Observale 之后开始)。还有另一个版本的 buffer 接受一个 Scheduler 参数。


解析: 每隔 timespan 时间段以 List 的形式收集原始Observable的数据

实例代码:

    // 4. buffer(long timespan, TimeUnit unit)
    // 每隔timespan时间段以list的形式收集数据
    Observable.range(1, 50000)
        .buffer(1, TimeUnit.MILLISECONDS)                                   // 每隔1毫秒收集一次原始序列数据
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(4) accept: " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> bufferr(4) accept: 2571
--> bufferr(4) accept: 5457
--> bufferr(4) accept: 13248
--> bufferr(4) accept: 12755
--> bufferr(4) accept: 9543
--> bufferr(4) accept: 6426

注意: buffer(timespan,TimeUnit) 默认情况下会使用 computation 调度器
Javadoc: buffer(timespan,TimeUnit)
Javadoc: buffer(timespan,TimeUnit,Scheduler)

1.5 buffer(timespan, TimeUnit, count)

每当收到来自原始 Observablecount 项数据,或者每过了一段指定 timespan 时间后, 就以 List 的形式发射这期间的数据,即使数据项少于 count 项。还有另一个版本的 buffer 接受一个 Scheduler 参数。

实例代码:

    // 5. buffer(long timespan, TimeUnit unit, int count)
    // 每隔1毫秒缓存50个数据
    Observable.range(1, 1000)
        .buffer(1, TimeUnit.MILLISECONDS, 50)                               // 每隔1毫秒收集50个数据序列
        .subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> bufferr(5) accept: " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 20
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 4
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 50
--> bufferr(5) accept: 26

注意: buffer(timespan, TimeUnit, count) 默认情况下会使用 computation 调度器
Javadoc: buffer(timespan, TimeUnit, count)
Javadoc: buffer(timespan, TimeUnit, scheduler, count)

1.6 buffer(timespan, timeskip, TimeUnit)

在每一个 timeskip 时期内都创建一个新的 List ,然后用原始 Observable 发射的每一项数据填充这个列表(在把这个 List 当做自己的数据发射前,从创建时开始,直到过了 timespan 这么长的时间)。如果 timespan 长于 timeskip ,它发射的数据包将会重叠,因此可能包含重复的数据项。

解析: 在每隔 timeskip 时间段都创建一个新的 List ,每个 List 都独立收集 timespan 时间段原始Observable发射的数据。 因此在 timespan 长于 timeskip 时,它发射的数据包将会重叠,因此不同 List 中可能包含重复的数据项。 还有另一个版本的 buffer 接受一个 Scheduler 参数。

实例代码:

    // 6. buffer(long timespan, long timeskip, TimeUnit unit)
    // 在每一个timeskip时期内都创建一个新的 List,
    // 每个List都独立收集timespan时间段原始Observable发射的数据,
    // 如果 timespan 长于 timeskip,它发射的数据包将会重叠,因此不同List中可能包含重复的数据项
    Observable.range(1, 50000)
            .buffer(1, 1, TimeUnit.MILLISECONDS, Schedulers.newThread())
            .subscribe(new Consumer<List<Integer>>() {

                @Override
                public void accept(List<Integer> t) throws Exception {
                    System.out.println("--> accept(6): " + t.size());   // 每次收集的数据序列个数
                }
            });

输出:

--> accept(6): 1412
--> accept(6): 733
--> accept(6): 10431
--> accept(6): 694
--> accept(6): 18944
--> accept(6): 10710
--> accept(6): 944
--> accept(6): 6132

注意:buffer(imespan, timeskip, TimeUnit) 默认情况下会使用 computation 调度器。
Javadoc: buffer(imespan, timeskip, TimeUnit)
Javadoc: buffer(imespan, timeskip, TimeUnit, schedule)

1.7 buffer(bufferClosingSelector)

当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector 生成第二个Observable,当第二个Observable 发射一个TClosing 时,buffer 发射当前的 List,然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。

注意: 它会一直这样做直到原来的Observable执行完成,可以收集完整的原始 Observable 的数据

实例代码:

    // 7. buffer(Callable<ObservableSource<T>> boundarySupplier)
    // 当它订阅原来的Observable时,开始将数据收集到一个List,然后它调用 bufferClosingSelector 生成第二个Observable,
    // 当第二个Observable 发射一个 TClosing 时,buffer 发射当前的 List ,
    // 然后重复这个过程:开始组装一个新的List,然后调用bufferClosingSelector创建一个新的Observable并监视它。
    // 它会一直这样做直到原来的Observable执行完成。会收集完整的原始 Observable 的数据
    Observable.range(1, 50000)
        .buffer(new Callable<Observable<Long>>() {

            @Override
            public Observable<Long> call() throws Exception {
                return Observable.timer(1, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<List<Integer>>() {

            @Override
            public void accept(List<Integer> t) throws Exception {
                System.out.println("--> accept(7): " + t.size());   // 每次收集的数据序列个数
            }
        });

输出:

--> accept(7): 14650
--> accept(7): 9708
--> accept(7): 25642

Javadoc: buffer(bufferClosingSelector)

2. Map

对Observable发射的每一项数据应用一个函数,执行变换操作。

实例代码:

    // map(Function<T,R))
    // 接受原始Observable的数据,发送处理后的数据
    Observable.range(1, 5)
        .map(new Function<Integer, Integer>() {

            @Override
            public Integer apply(Integer t) throws Exception {
                System.out.println("--> apply: " + t);
                return t*t; // 计算原始数据的平方
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept Map: " + t);
            }
        });

输出:

--> apply: 1
--> accept Map: 1
--> apply: 2
--> accept Map: 4
--> apply: 3
--> accept Map: 9
--> apply: 4
--> accept Map: 16
--> apply: 5
--> accept Map: 25

Javadoc: map(mapper)

3. FlatMap

主要对原始数据进行转换操作后发送至订阅者。

RxJava2 中的一些 FlatMap 操作方法如下:

3.1 flatMap(mapper)

FlatMap 将一个发射数据的 Observable 变换为 多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable。

FlatMap 操作符使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后 FlatMap 合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

这个方法是很有用的,例如,当你有一个这样的Observable:它发射一个数据序列,这些数据本身包含Observable成员或者可以变换为Observable,因此你可以创建一个新的 Observable发射这些次级Observable发射的数据的完整集合。

注意: FlatMap 对这些Observables发射的数据做的是合并(merge)操作,因此它们可能是交错的。
在许多语言特定的实现中,还有一个操作符不会让变换后的Observables发射的数据交错,它按照严格的顺序发射这些数据,这个操作符通常被叫作ConcatMap或者类似的名字。

实例代码:

    //  1. flatMap(Function)
    //  对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,
    //  然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射
    Observable.range(1, 5)
        .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(1): " + t);                           // 原始数据
                return Observable.range(1, t).subscribeOn(Schedulers.newThread());  // 处理后数据
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMap(1): " + t);                  // 接受的所有数据
            }
        });

输出:

--> apply(1): 1
--> apply(1): 2
--> apply(1): 3
--> apply(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> apply(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3
--> accept flatMap(1): 4
--> accept flatMap(1): 5
--> accept flatMap(1): 1
--> accept flatMap(1): 2
--> accept flatMap(1): 3

Javadoc: flatMap(mapper)

3.2 flatMap(mapper, maxConcurrency)

maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。当达到这个限制时,它会等待其中一个终止然后再订阅另一个。

实例代码:

    // 2. flatMap(Function, maxConcurrency)
    // maxConcurrency 这个参数设置 flatMap 从原来的Observable映射Observables的最大同时订阅数。
    // 当达到这个限制时,它会等待其中一个终止然后再订阅另一个
    Observable.range(1, 5)
    .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(2): " + t);
            return Observable.range(1, t).subscribeOn(Schedulers.newThread());
        }
    // 指定最大订阅数为1,此时等待上一个订阅的Observable结束,在进行下一个Observable订阅
    }, 1).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept flatMap(2): "+ t);
        }
    });

输出:

--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> apply(2): 4
--> apply(2): 5
--> accept flatMap(2): 1
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 1
--> accept flatMap(2): 2
--> accept flatMap(2): 3
--> accept flatMap(2): 4
--> accept flatMap(2): 5

Javadoc: flatMap(mapper, maxConcurrency)

3.3 flatMap(mapper, delayErrors)

delayError 这个参数指定是否延迟发生 Error 的Observable通知。还有一个可以指定最大订阅数参数 maxConcurrency 的变体。


解析: 当值为 true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送 Error 这个Observable的通知,当值为 false 时则中断所有订阅的操作,并发送 Error 的通知。

实例代码:

    // 3. flatMap(Function, delayErrors)
    // delayErrors 这个参数指定是否延迟发生Error的Observable通知
    // 当true 时延迟发生Error的这个订阅的Observable通知,不中断当前的订阅操作,
    // 继续下一个Observable的订阅,在所有订阅的Observable全部结束后发送Error这个Observable的通知
    Observable.range(1, 5)
        .flatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(3): " + t);

                return Observable.create(new ObservableOnSubscribe<Integer>() {

                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        if( t == 3) {
                            throw new NullPointerException("delayErrors test!");    // 测试 Error
                        }
                        for (int i = 1; i <= t; i++) {
                            emitter.onNext(i);
                        }
                        emitter.onComplete();
                    }
                });
            }
        // 设置延迟 Error 通知到最后
        }, true).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMap(3): "+ t);
            }
        },new Consumer<Throwable>() {

            @Override
            public void accept(Throwable t) throws Exception {
                System.out.println("--> acceot Error(3): " + t);
            }
        });

输出:

--> apply(3): 1
--> accept flatMap(3): 1
--> apply(3): 2
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> apply(3): 3
--> apply(3): 4
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> apply(3): 5
--> accept flatMap(3): 1
--> accept flatMap(3): 2
--> accept flatMap(3): 3
--> accept flatMap(3): 4
--> accept flatMap(3): 5
--> acceot Error(3): java.lang.NullPointerException: delayErrors test!

Javadoc: flatMap(Function, delayErrors)
Javadoc: flatMap(Function, delayErrors, maxConcurrency)

3.4 flatMapIterable(mapper)

flatMapIterable 这个变体成对的打包数据,然后生成 Iterable 而不是原始数据和生成的 Observables,但是处理方式是相同的。

解析: 对数据进行处理转换成 Iterable 来发射数据。

实例代码:

    //  4. flatMapIterable(Function(T,R))
    //  对数据进行处理转换成Iterable来发射数据
    Observable.range(1, 5)
        .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

            @Override
            public Iterable<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply: " + t);
                ArrayList<Integer> list = new ArrayList<Integer>();
                list.add(888);
                list.add(999);
                return list; // 将原始数据转换为两个数字发送
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept flatMapIterable(4): " + t);
            }
        });

输出:

--> apply: 1
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 2
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 3
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 4
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999
--> apply: 5
--> accept flatMapIterable(4): 888
--> accept flatMapIterable(4): 999

Javadoc: flatMapIterable(mapper)

3.5 flatMapIterable(mapper, resultSelector)

参数 mapper 接收原始数据,resultSelector 同时接收原始数据和 mapper 处理的数据,进行二次数据转换。

实例代码:

    //  5. flatMapIterable(Function(T,R),Function(T,T,R))
    //  第一个func接受原始数据,转换数据,第二个func同时接受原始和处理的数据,进行二次转换处理
    Observable.range(1, 3)
            .flatMapIterable(new Function<Integer, Iterable<? extends Integer>>() {

                @Override
                public Iterable<? extends Integer> apply(Integer t) throws Exception {
                    ArrayList<Integer> list = new ArrayList<Integer>();
                    list.add(888);
                    list.add(999);
                    return list; // 将原始数据转换为两个数字发送
                }
            }, new BiFunction<Integer, Integer, Integer>() {

                @Override
                public Integer apply(Integer t1, Integer t2) throws Exception {
                    System.out.println("--> apply(5): t1 = " + t1 + ", t2 = " + t2);
                    return t1 + t2; // 将原始数据和处理过的数据组合进行二次处理发送
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept flatMapIterable(5): " + t);
                }
            });

输出:

--> apply(5): t1 = 1, t2 = 888
--> accept flatMapIterable(5): 889
--> apply(5): t1 = 1, t2 = 999
--> accept flatMapIterable(5): 1000
--> apply(5): t1 = 2, t2 = 888
--> accept flatMapIterable(5): 890
--> apply(5): t1 = 2, t2 = 999
--> accept flatMapIterable(5): 1001
--> apply(5): t1 = 3, t2 = 888
--> accept flatMapIterable(5): 891
--> apply(5): t1 = 3, t2 = 999
--> accept flatMapIterable(5): 1002

Javadoc: flatMapIterable(mapper, resultSelector)

4. ConcatMap

concatMap 操作符的功能和 flatMap 是非常相似的,只是有一点,concatMap 最终输出的数据序列和原数据序列是一致,它是按顺序链接Observables,而不是合并(flatMap用的是合并)。

通过 mapper 处理原数据后,转换成 Observables ,按照顺序进行连接 Observables 发送数据。


解析: concatMapflatMap的功能是一样的, 将一个发射数据的Observable变换为多个Observables,然后将它们发射的数据放进一个单独的Observable。只不过最后合并ObservablesflatMap采用的merge,而concatMap采用的是连接(concat)。区别:concatMap是有序的,flatMap是无序的,concatMap最终输出的顺序与原序列保持一致,而flatMap则不一定,有可能出现交错。

实例代码:

    // 1. concatMap(Function(T,R))
    // 按照顺序依次处理原始数据和处理的数据
    Observable.range(1, 3)
        .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(1): " + t);
                return Observable.range(1, t).doOnSubscribe(new Consumer<Disposable>() {

                    @Override
                    public void accept(Disposable t) throws Exception {
                        System.out.println("--> accept(1): Observable on Subscribe");   // 当前的Observable被订阅
                    }
                });
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept concatMap(1): " + t);
            }
        });

    System.out.println("--------------------------------------------");
    // 2. concatMap(mapper, prefetch)
    // prefetch 参数是在处理后的Observables发射的数据流中预读数据个数,不影响原数据的发射和接收顺序
    Observable.range(1, 3)
        .concatMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(2): " + t);
                return Observable.range(1, 3).doOnSubscribe(new Consumer<Disposable>() {

                    @Override
                    public void accept(Disposable t) throws Exception {
                        System.out.println("--> accept(2): Observable on Subscribe");   // 当前的Observable被订阅
                    }
                });
            }
        }, 2).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept concatMap(2): " + t);
            }
        });

输出:

--> apply(1): 1
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> apply(1): 2
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> apply(1): 3
--> accept(1): Observable on Subscribe
--> accept concatMap(1): 1
--> accept concatMap(1): 2
--> accept concatMap(1): 3
--------------------------------------------
--> apply(2): 1
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 2
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3
--> apply(2): 3
--> accept(2): Observable on Subscribe
--> accept concatMap(2): 1
--> accept concatMap(2): 2
--> accept concatMap(2): 3

Javadoc: concatMap(mapper)
Javadoc: concatMap(mapper, refetch)

5. SwitchMap

有选择的订阅 Observable,当原始 Observable 发射一个数据,通过 witchMap 返回一个 Observable,
当原始Observable发射一个新的数据时,它将取消订阅并停止监视产生执之前的Observable,开始监视当前新的Observable。

解析: 如果上一个任务尚未完成时,就开始下一个任务的话,上一个任务就会被取消掉。如果所有任务都是在同一个线程里执行的话,此时这个操作符与 ContactMap 一致,都是依次顺序执行。只有在不同的线程里执行的时候,即线程方案为newThread的时候,才会出现这种情况,常用于网络请求中。

实例代码:

    // 1. witchMap(Function(T,R))
    // 同一个线程执行
    Observable.range(1, 3)
    .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(1): " + t);
            return Observable.range(1, 3);  // 每个任务指定在同一个线程执行
        }
    }).subscribe(new Consumer<Integer>() {

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept switchMap(1): " + t);
        }
    });

    System.out.println("---------------------------------------");
    // 2. witchMap(Function(T,R))
    // 不同线程执行
    Observable.range(1, 3)
        .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

            @Override
            public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
                System.out.println("--> apply(2): " + t);
                return Observable.range(1, 3)
                                 .subscribeOn(Schedulers.newThread());  // 每个任务指定在子线程执行
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept switchMap(2): " + t);
            }
        });

    System.out.println("---------------------------------------");
    // 3. switchMap(mapper, bufferSize)
    // bufferSize 参数是从当前活动的Observable中预读数据的大小
    Observable.range(1, 3)
    .switchMap(new Function<Integer, ObservableSource<? extends Integer>>() {

        @Override
        public ObservableSource<? extends Integer> apply(Integer t) throws Exception {
            System.out.println("--> apply(3): " + t);
            return Observable.range(1, 5).subscribeOn(Schedulers.newThread());
        }
    }, 3).subscribe(new Consumer<Integer>() {   // 指定缓存大小为3

        @Override
        public void accept(Integer t) throws Exception {
            System.out.println("--> accept switchMap(3): " + t);
        }
    });

输出:

--> apply(1): 1
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 2
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
--> apply(1): 3
--> accept switchMap(1): 1
--> accept switchMap(1): 2
--> accept switchMap(1): 3
---------------------------------------
--> apply(2): 1
--> apply(2): 2
--> apply(2): 3
--> accept switchMap(2): 1
--> accept switchMap(2): 2
--> accept switchMap(2): 3
---------------------------------------
--> apply(3): 1
--> apply(3): 2
--> apply(3): 3
--> accept switchMap(3): 1
--> accept switchMap(3): 2
--> accept switchMap(3): 3
--> accept switchMap(3): 4
--> accept switchMap(3): 5

Javadoc: switchMap(mapper)
Javadoc: switchMap(mapper, bufferSize)

接续:

后续的Rx相关数据变换部分请参考: Rxjava2 Observable的数据变换详解及实例(二)

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127261.html

时间: 2024-10-12 19:56:43

Rxjava2 Observable的数据变换详解及实例(一)的相关文章

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

Rxjava2 Observable的条件操作符详解及实例

目录 简要: 1. Amb 2. DefaultIfEmpty 3. SwitchIfEmpty 4. SkipUntil 5. SkipWhile 6. TakeUntil 6.1 takeUntil(ObservableSource other) 6.2 takeUntil(Predicate stopPredicate) 7. TakeWhile 小结 简要: 需求了解: 在使用 Rxjava 开发中,经常有一些各种条件的操作 ,如比较两个 Observable 谁先发射了数据.跳过指定条

Rxjava2 Observable的结合操作详解及实例

目录 简要: 1. CombineLatest 2. Join 3. Merge 3.1 merge 3.2 mergeDelayError 4. Zip 5. StartWith 6. SwitchOnNext 6.1 switchOnNext 6.2 switchOnNextDelayError 小结 简要: 需求了解: 在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等.这个时候我们就需要使用 RxJava 的结合操作符来

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

JVM 运行时数据区详解

一.运行时数据区: Java虚拟机在执行Java程序的过程中会把它所管理的内存划分为若干个不同数据区域. 1.有一些是随虚拟机的启动而创建,随虚拟机的退出而销毁,所有的线程共享这些数据区. 2.第二种则是与线程一一对应,随线程的开始和结束而创建和销毁,线程之间相互隔离. java虚拟机所管理的内存将会包括以下几个运行时数据区域 二.数据区详解 1.程序计数器(Program Counter Register) 也叫PC寄存器是一块较小的内存空间,它的作用是存储当前线程所执行的字节码的信号指示器.

ContentProvider数据访问详解

ContentProvider数据访问详解 Android官方指出的数据存储方式总共有五种:Shared Preferences.网络存储.文件存储.外储存储.SQLite,这些存储方式一般都只是在一个单独的应用程序中实现数据的共享,而对于需要操作其他应用程序中的数据时(如媒体库.通讯录等),可能就需要借助ContentProvider了. 1.ContentProvider ContentProvider为存储和获取数据提供了统一的接口,使用表的形式来对数据进行封装,使得开发者在后续的开发过程