RxJava 学习笔记(八) --- Combining 结合操作

@(Rxjava学习笔记)

RxJava 学习笔记(八) — Combining 结合操作

  • RxJava 学习笔记八 Combining 结合操作

      • StartWith 在数据序列的开头插入一条指定的项
      • Merge 将多个Observable合并为一个
      • MergeDelayError 将多个Observable合并为一个
      • Zip 使用一个函数组合多个Observable发射的数据集合然后再发射这个结果
      • CombineLatest 当两个Observables中的任何一个发射了一个数据时通过一个指定的函数组合每个Observable发射的最新数据一共两个数据然后发射这个函数的结果
      • Join 无论何时如果一个Observable发射了一个数据项只要在另一个Observable发射的数据项定义的时间窗口内就将两个Observable发射的数据合并发射
      • GroupJoin groupJoin操作符非常类似于join操作符区别在于join操作符中第四个参数的传入函数不一致
      • SwitchOnNext 将一个发射Observables的Observable转换成另一个Observable后者发射这些Observables最近发射的数据

1. StartWith —> 在数据序列的开头插入一条指定的项

一个Observable在发射数据之前先发射一个指定的数据序列

示例代码:

Observable.just(10,20,30).startWith(2, 3, 4).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

输出:

Next:2
Next:3
Next:4
Next:10
Next:20
Next:30
Sequence complete.

你也可以传递一个ObservablestartWith,它会将那个Observable的发射物插在原始Observable发射的数据序列之前,然后把这个当做自己的发射物集合。这可以看作是Concat的反转。

示例代码:

Observable.just(1,2,3,4,5).startWith(Observable.just(9,8,5)).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                System.out.println(integer+"");
            }
        });

输出:

9
8
5
1
2
3
4
5

2. Merge —> 将多个Observable合并为一个

使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

正如图例上展示的,任何一个原始ObservableonError通知会被立即传递给观察者,而且会终止合并后的Observable

示例代码:

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler);
Observable<Integer> evens = Observable.just(2, 4, 6);

Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });

输出:

Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

除了传递多个Observablemerge,你还可以传递一个Observable列表List,数组,甚至是一个发射Observable序列的Observablemerge将合并它们的输出作为单个Observable的输出:

如果你传递一个发射Observables序列的Observable,你可以指定merge应该同时订阅的Observable‘的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted通知。

3. MergeDelayError —> 将多个Observable合并为一个

它的行为有一点不同,它会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

示例代码:

        //产生0,5,10数列,最后会产生一个错误
        Observable<Long> errorObservable = Observable.error(new Exception("this is end!"));
        Observable < Long > observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(3).mergeWith(errorObservable.delay(3500, TimeUnit.MILLISECONDS));

        //产生0,10,20,30,40数列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        Observable.mergeDelayError(observable1, observable2)
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Sequence complete.");
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.err.println("Error: " + e.getMessage());
                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("Next:" + aLong);
                    }
                });

输出:

Next:0
Next:0
Next:10
Next:20
Next:30
Next:40
Error: this is end!

4. Zip —> 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果

Zip

通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

Zip操作符返回一个Obversable,它使用这个函数按顺序结合两个或多个Observables发射的数据项,然后它发射这个函数返回的结果。它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

zip的最后一个参数接受每个Observable发射的一项数据,返回被压缩后的数据,它可以接受一到九个参数:一个Observable序列,或者一些发射ObservableObservables

Observable<Integer> observable1 = Observable.just(10,20,30);
        Observable<Integer> observable2 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer + integer2;
            }
        }).subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Integer value) {
                System.out.println("Next:" + value);
            }
        });

输出:

Next:14
Next:28
Next:42
Sequence complete.

ZipWith

zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

zipzipWith默认不在任何特定的操作符上执行。

5. CombineLatest —> 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

示例代码:

//产生0,5,10,15,20数列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //产生0,10,20,30,40数列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong+aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });

输出:

Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.

6. Join —> 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射

zip()merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJavajoin()函数基于时间窗口将两个Observables发射的数据结合在一起。+

join方法的用法如下:

observableA.join(observableB,

observableA产生结果生命周期控制函数,

observableB产生结果生命周期控制函数,

observableA产生的结果与observableB产生的结果的合并规则)

示例代码:

//产生0,5,10,15,20数列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //产生0,10,20,30,40数列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        observable1.join(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //使Observable延迟600毫秒执行
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //使Observable延迟600毫秒执行
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Long, Long>() {
            @Override
            public Long call(Long aLong, Long aLong2) {
                return aLong + aLong2;
            }
        }).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next: " + aLong);
            }
        });

输出:

Next: 0
Next: 5
Next: 15
Next: 20
Next: 30
Next: 35
Next: 45
Next: 50
Next: 60
Sequence complete.

7. GroupJoin —> groupJoin操作符非常类似于join操作符,区别在于join操作符中第四个参数的传入函数不一致

示例代码:

        //产生0,5,10,15,20数列
        Observable<Long> observable1 = Observable.timer(0, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 5;
                    }
                }).take(5);

        //产生0,10,20,30,40数列
        Observable<Long> observable2 = Observable.timer(500, 1000, TimeUnit.MILLISECONDS)
                .map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);

        observable1.groupJoin(observable2, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(1600, TimeUnit.MILLISECONDS);
            }
        }, new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                return Observable.just(aLong).delay(600, TimeUnit.MILLISECONDS);
            }
        }, new Func2<Long, Observable<Long>, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong, Observable<Long> observable) {
                return observable.map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong2) {
                        return aLong + aLong2;
                    }
                });
            }
        }).subscribe(new Subscriber<Observable<Long>>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Observable<Long> observable) {
                observable.subscribe(new Subscriber<Long>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Long aLong) {
                        System.out.println("Next: " + aLong);
                    }
                });
            }
        });

输出:

Next: 5
Next: 15
Next: 10
Next: 20
Next: 25
Next: 30
Next: 35
Next: 45
Next: 40
Next: 50
Next: 60
Next: 55
Sequence complete.

8. SwitchOnNext —> 将一个发射Observables的Observable转换成另一个Observable,后者发射这些Observables最近发射的数据

switchOnNext操作符是把一组Observable转换成一个Observable,转换规则为:对于这组Observable中的每一个Observable所产生的结果,如果在同一个时间内存在两个或多个Observable提交的结果,只取最后一个Observable提交的结果给订阅者

示例代码:

        //每隔500毫秒产生一个observable
        Observable<Observable<Long>> observable = Observable.timer(0, 500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
            @Override
            public Observable<Long> call(Long aLong) {
                //每隔200毫秒产生一组数据(0,10,20,30,40)
                return Observable.timer(0, 200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
                    @Override
                    public Long call(Long aLong) {
                        return aLong * 10;
                    }
                }).take(5);
            }
        }).take(2);

        Observable.switchOnNext(observable).subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                System.out.println("Sequence complete.");
            }

            @Override
            public void onError(Throwable e) {
                System.err.println("Error: " + e.getMessage());
            }

            @Override
            public void onNext(Long aLong) {
                System.out.println("Next:" + aLong);
            }
        });

输出:

Next:0
Next:10
Next:20
Next:0
Next:10
Next:20
Next:30
Next:40
Sequence complete.
时间: 2024-09-30 22:55:49

RxJava 学习笔记(八) --- Combining 结合操作的相关文章

Python学习笔记八:文件操作(续),文件编码与解码,函数,递归,函数式编程介绍,高阶函数

文件操作(续) 获得文件句柄位置,f.tell(),从0开始,按字符数计数 f.read(5),读取5个字符 返回文件句柄到某位置,f.seek(0) 文件在编辑过程中改变编码,f.detech() 获取文件编码,f.encoding() 获取文件在内存中的编号,f.fileno() 获取文件终端类型(tty.打印机等),f.isatty() 获取文件名,f.name() 判断文件句柄是否可移动(tty等不可移动),f.seekable() 判断文件是否可读,f.readable() 判断文件是

Lua学习笔记(八):数据结构

table是Lua中唯一的数据结构,其他语言所提供的数据结构,如:arrays.records.lists.queues.sets等,Lua都是通过table来实现,并且在Lua中table很好的实现了这些数据结构. 1.数组 在Lua中通过整数下标访问table中元素,既是数组,并且数组大小不固定,可动态增长.通常我们初始化数组时,就间接地定义了数组的大小,例如: 1 a = {} -- new array 2 for i=1, 1000 do 3 a[i] = 0 4 end 5 6 --数

iOS学习笔记(八)——iOS网络通信http之NSURLConnection

转自:http://blog.csdn.net/xyz_lmn/article/details/8968182 移动互联网时代,网络通信已是手机终端必不可少的功能.我们的应用中也必不可少的使用了网络通信,增强客户端与服务器交互.这一篇提供了使用NSURLConnection实现http通信的方式. NSURLConnection提供了异步请求.同步请求两种通信方式. 1.异步请求 iOS5.0 SDK NSURLConnection类新增的sendAsynchronousRequest:queu

angular学习笔记(八)

本篇介绍angular控制视图的显示和隐藏: 通过给元素添加ng-show属性或者ng-hide属性来控制视图的显示或隐藏: ng-show: 绑定的数据值为true时,显示元素,值为false时,隐藏元素 ng-hide: 绑定的数据值为true时,隐藏元素,值为false时,显示元素 (其实只要用到其中一个就可以了) 下面来看个简单的例子,点击按钮可以显示/隐藏元素: <!DOCTYPE html> <html ng-app> <head> <title>

Android:日常学习笔记(10)———使用LitePal操作数据库

Android:日常学习笔记(10)---使用LitePal操作数据库 引入LitePal 什么是LitePal LitePal是一款开源的Android数据库框架,采用了对象关系映射(ORM)的模式,将平时开发时最常用的一些数据库功能进行了封装,使得开发者不用编写一行SQL语句就可以完成各种建表.増删改查的操作.并且LitePal很"轻",jar包大小不到100k,而且近乎零配置,这一点和Hibernate这类的框架有很大区别.目前LitePal的源码已经托管到了GitHub上. 关

Linux System Programming 学习笔记(八) 文件和目录管理

1. 文件和元数据 每个文件都是通过inode引用,每个inode索引节点都具有文件系统中唯一的inode number 一个inode索引节点是存储在Linux文件系统的磁盘介质上的物理对象,也是LInux内核通过数据结构表示的实体 inode存储相关联文件的元数据 ls -i 命令获取文件的inode number /* obtaining the metadata of a file */ #include <sys/types.h> #include <sys/stat.h>

马哥学习笔记八——LAMP编译安装之PHP及xcache

1.解决依赖关系: 请配置好yum源(可以是本地系统光盘)后执行如下命令: # yum -y groupinstall "X Software Development" 如果想让编译的php支持mcrypt扩展,此处还需要下载如下两个rpm包并安装之: libmcrypt-2.5.7-5.el5.i386.rpm libmcrypt-devel-2.5.7-5.el5.i386.rpm 2.编译安装php-5.4.13 首先下载源码包至本地目录. # tar xf php-5.4.13

easyui学习笔记1—增删改操作【转载】

最近公司要用easyui,这里自己看了官网几篇文章,遇到些问题,大多数的问题都是敲代码的时候笔误,其他有些地方确实需要注意一下,这里做些笔记. 1.在mysql中建好表之后修改id字段为递增字段,发现这个奇怪的mysql语法,如下 alter table student change id id int auto_increment; 这句是在student表已经建好的情况下来修改字段id为自增列,奇怪的是为嘛change id id,并且后面还要带上id的类型int? 2.html5标记 如何

RX学习笔记:JavaScript数组操作

RX学习笔记:JavaScript数组操作 2016-07-03 增删元素 unshift() 在数组开关添加元素 array.unshift("value"); array.unshift(123); array.unshift("value1","value2"); push() 在数组末尾添加元素 array.push("value"); array.push(123); array.push("value1&