RxJava操作符(09-算术/聚合操作&连接操作)

转载请标明出处:

http://blog.csdn.net/xmxkf/article/details/51692493

本文出自:【openXu的博客】

目录:

  • 算术聚合

    • Count
    • Concat
    • Reduce
  • 连接操作
    • Publish
    • Connect
    • RefCount
    • Replay
  • 源码下载

算术&聚合

1. Count

??Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量。

?? 如果原始Observable发生错误终止,Count不发射数据而是直接传递错误通知。如果原始Observable永远不终止,Count既不会发射数据也不会终止。

????

示例代码:

Observable.from(new String[] { "one", "two", "three" })
        .count()
        .subscribe(integer->Log.v(TAG, "count:"+integer));

Observable.from(new String[] { "one", "two", "three" })
        .countLong()
        .subscribe(aLong->Log.v(TAG, "countLong:"+aLong));

输出:

count:3

countLong:3

2. Concat

??concat操作符会依次发射多个Observable的数据,第一个Observable发射的所有数据在第二个Observable发射的任何数据前面,以此类推,直到前面一个Observable终止,Concat才会订阅额外的一个Observable。

Merge操作符也差不多,它结合两个或多个Observable的发射物,但是数据可能交错,而Concat不会让多个Observable的发射物交错。

????

示例代码:

//还有一个实例方法叫concatWith,这两者是等价的:Observable.concat(a,b)和a.concatWith(b)
Observable.concat(
        Observable.interval(100,TimeUnit.MILLISECONDS).take(4),
        Observable.interval(200,TimeUnit.MILLISECONDS).take(5))
        .subscribe(aLong -> Log.v(TAG, "concat:"+aLong));

输出:

concat:0

concat:1

concat:2

concat:3

concat:0

concat:1

concat:2

concat:3

concat:4

3. Reduce

??Reduce操作符对原始Observable发射数据的第一项应用一个函数,然后再将这个函数的返回值与第二项数据一起传递给函数,以此类推,持续这个过程知道原始Observable发射它的最后一项数据并终止,此时Reduce返回的Observable发射这个函数返回的最终值。

??注意如果原始Observable没有发射任何数据,reduce抛出异常IllegalArgumentException。

??在其它场景中,这种操作有时被称为累积,聚集,压缩,折叠,注射等。

????

示例代码:

Observable.just(1,2,3,4)
        .reduce(new Func2<Integer, Integer, Integer>() {
            //integer为前面几项只和,integer2为当前发射的数据
            @Override
            public Integer call(Integer integer, Integer integer2) {
                Log.v(TAG, "integer:"+integer+"  integer2:"+integer2);
                return integer+integer2;
            }
        }).subscribe(integer -> Log.v(TAG, "reduce:"+integer));

输出:

integer:1 integer2:2

integer:3 integer2:3

integer:6 integer2:4

reduce:10


连接操作

1. Publish

??Publish 操作符将普通的Observable转换为可连接的Observable(ConnectableObservable),ConnectableObservable是Observable的子类。 可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始,这样可以更灵活的控制发射数据的时机。

??注意:如果一个ConnectableObservable已经开始发射数据,再对其进行订阅只能接受之后发射的数据,订阅之前已经发射过的数据就丢失了。

????

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(5);
//使用publish操作符将普通Observable转换为可连接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//第一个订阅者订阅,不会开始发射数据
connectableObservable.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "1.onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "1.onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "1.onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
//开始发射数据
Log.v(TAG, "start time:" + sdf.format(new Date()));
connectableObservable.connect();
//第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据
connectableObservable
        .delaySubscription(2, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "2.onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "2.onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "2.onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});

/*
输出:
start time:23:01:30
1.onNext:0->time:23:01:31
1.onNext:1->time:23:01:32
2.onNext:1->time:23:01:32
1.onNext:2->time:23:01:33
2.onNext:2->time:23:01:33
1.onNext:3->time:23:01:34
2.onNext:3->time:23:01:34
1.onNext:4->time:23:01:35
2.onNext:4->time:23:01:35
1.onCompleted
2.onCompleted
 */

2. Connect

??connect是ConnectableObservable接口的一个方法,它的作用就是让ConnectableObservable开始发射数据(即使没有任何订阅者订阅这个Observable,调用connect都会开始发射数据)。

??connect方法返回一个Subscription对象,可以调用它的unsubscribe方法让Observable停止发射数据给观察者。

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS);
//使用publish操作符将普通Observable转换为可连接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//开始发射数据
Subscription sub = connectableObservable.connect();
//第二个订阅者延迟2s订阅,这将导致丢失前面2s内发射的数据
connectableObservable
        .delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.v(TAG, "onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "onError");
            }
            @Override
            public void onNext(Long along) {
                Log.v(TAG, "onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });

new Timer().schedule(new TimerTask() {
    @Override
    public void run() {
        //6s之后停止发射数据
        sub.unsubscribe();
    }
},6000);

/*
输出:
onNext:3->time:23:10:49
onNext:4->time:23:10:50
onNext:5->time:23:10:51
 */

3. RefCount

??RefCount操作符可以看做是Publish的逆向,它能将一个ConnectableObservable对象再重新转化为一个普通的Observable对象,如果转化后有订阅者对其进行订阅将会开始发射数据,后面如果有其他订阅者订阅,将只能接受后面的数据(这也是转化之后的Observable 与普通的Observable的一点区别 )。

??还有一个操作符叫share,它的作用等价于对一个Observable同时应用publish和refCount操作。

????

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");

Observable<Long> obs = Observable.interval(1, TimeUnit.SECONDS).take(4);
//使用publish操作符将普通Observable转换为可连接的Observable
ConnectableObservable<Long> connectableObservable = obs.publish();
//refCount:将ConnectableObservable转化为普通Observable
Observable obsRefCount = connectableObservable.refCount();

obs.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "普通obs1:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "普通obs1:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "普通obs1:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
obs.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "普通obs2:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "普通obs2:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "普通obs2:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});

obsRefCount.subscribe(new Subscriber<Long>() {
    @Override
    public void onCompleted() {
        Log.v(TAG, "obsRefCount1:onCompleted");
    }
    @Override
    public void onError(Throwable e) {
        Log.v(TAG, "obsRefCount1:onError");
    }
    @Override
    public void onNext(Long along) {
        Log.v(TAG, "obsRefCount1:onNext:"+along+"->time:"+ sdf.format(new Date()));
    }
});
obsRefCount.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(new Subscriber<Long>() {
            @Override
            public void onCompleted() {
                Log.v(TAG, "obsRefCount2:onCompleted");
            }
            @Override
            public void onError(Throwable e) {
                Log.v(TAG, "obsRefCount2:onError");
            }
            @Override
            public void onNext(Long along) {
                Log.v(TAG, "obsRefCount2:onNext:"+along+"->time:"+ sdf.format(new Date()));
            }
        });

/*
输出:
普通obs1:onNext:0->time:23:28:28
普通obs1:onNext:1->time:23:28:29
普通obs1:onNext:2->time:23:28:30
普通obs1:onNext:3->time:23:28:31
普通obs1:onCompleted

普通obs2:onNext:0->time:23:28:31
普通obs2:onNext:1->time:23:28:32
普通obs2:onNext:2->time:23:28:33
普通obs2:onNext:3->time:23:28:34
普通obs2:onCompleted

obsRefCount1:onNext:0->time:23:28:28
obsRefCount1:onNext:1->time:23:28:29
obsRefCount1:onNext:2->time:23:28:30
obsRefCount1:onNext:3->time:23:28:31
obsRefCount1:onCompleted

obsRefCount2:onNext:3->time:23:28:31
obsRefCount2:onCompleted
 */

4. Replay

??通过上面的介绍我们了解到,ConnectableObservable和普通的Observable最大的区别就是,调用Connect操作符开始发射数据,后面的订阅者会丢失之前发射过的数据。

??使用Replay操作符返回的ConnectableObservable 会缓存订阅者订阅之前已经发射的数据,这样即使有订阅者在其发射数据开始之后进行订阅也能收到之前发射过的数据。Replay操作符能指定缓存的大小或者时间,这样能避免耗费太多内存。

示例代码:

SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
Log.v(TAG, "start time:" + sdf.format(new Date()));

//没有缓存的情况
ConnectableObservable<Long> obs = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .publish();
obs.connect();  //开始发射数据
obs.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG, "onNext:"+aLong+"->time:"+ sdf.format(new Date())));

//缓存一个数据
ConnectableObservable<Long> obs1 = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .replay(1);   //缓存1个数据
obs1.connect();  //开始发射数据
obs1.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG,
                "1.onNext:"+aLong+"->time:"+ sdf.format(new Date())));

//缓存3s内发射的数据
ConnectableObservable<Long> obs2 = Observable.interval(1, TimeUnit.SECONDS)
        .take(5)
        .replay(3, TimeUnit.SECONDS);   //缓存3s
obs2.connect();  //开始发射数据
obs2.delaySubscription(3, TimeUnit.SECONDS)
        .subscribe(aLong -> Log.v(TAG,
                "2.onNext:"+aLong+"->time:"+ sdf.format(new Date())));

/*
输出:
start time:14:25:51
onNext:3->time:14:25:55
onNext:4->time:14:25:56

1.onNext:2->time:14:25:54
1.onNext:3->time:14:25:55
1.onNext:4->time:14:25:56

2.onNext:0->time:14:25:54
2.onNext:1->time:14:25:54
2.onNext:2->time:14:25:54
2.onNext:3->time:14:25:55
2.onNext:4->time:14:25:56
 */

从log可以看出,没有缓存机制的只能收到3.4;缓存1个数据的能收到前面已经发射过的2;缓存3s的将所有已经发射的数据都缓存起来了,所以数据都能收到。缓存的数据在订阅者订阅之后立马发射给订阅者。

源码下载:

https://github.com/openXu/RxJavaTest

时间: 2024-10-29 19:06:34

RxJava操作符(09-算术/聚合操作&连接操作)的相关文章

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

Update:sparksql:第3节 Dataset (DataFrame) 的基础操作 &amp; 第4节 SparkSQL_聚合操作_连接操作

8. Dataset (DataFrame) 的基础操作 8.1. 有类型操作 8.2. 无类型转换 8.5. Column 对象 9. 缺失值处理 10. 聚合 11. 连接 8. Dataset (DataFrame) 的基础操作 导读 这一章节主要目的是介绍 Dataset 的基础操作, 当然, DataFrame 就是 Dataset, 所以这些操作大部分也适用于 DataFrame 有类型的转换操作 无类型的转换操作 基础 Action 空值如何处理 统计操作 8.1. 有类型操作 分

红黑树上的连接操作

a)①对于插入调整红黑树性质函数,注意到只有case1才会增加黑高,引用书中的图13-5所示 不管是a图还是b图,结点C从左图到右图黑高增加了1.其他结点黑高均无变化.具体13-5和13-6图所有结点黑高可以 参考13.3-3题目答案. ②对于删除调整红黑树性质函数,可以使用13.3-3类似的方法计算其黑高变化,结果就是case2的B结点减少1,case4结点D增加1,结点B减少1.(请参考图13-7)其他case和结点均无变化. ③对于删除函数如果是第3种情况,需要找到删除结点的后继,那么我们

【数据结构】两个单循环链表的连接操作

如果在单链表或头指针表示的链表上操作这个比较消耗性能,因为都需要遍历第一个链表,找到an,然后将b1链接到an的后面,时间复杂度是:O(n).若在尾指针表示的单循环链表上实现,则只需改变指针,无需遍历,时间复杂度是:O(1) 现在看算法实现,需要4个主要步骤,都注释了: LinkList Connect(LinkList A,LinkList B){ //假设A,B为非空循环链表的尾指针 LinkList p = A->next;//1.保存A链表的头结点的位置 A->next = B->

Java 连接操作 Redis 出现错误

Exception in thread "main" redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect at redis.clients.jedis.Connection.connect(Connection.java:207) at redis.clients.jedis.BinaryClient.connect

表的连接操作(学习笔记)

在数据库中对于数据表的连接操作一共提供了两种: 内连接:也称等值连接在where中消除笛卡尔积的条件就是采用了内连接方式进行的 外连接: 内连接中只能显示等值满足的条件,不满足的条件则无法显示,如果希望显示特定表中的全部数据就要用要外连接 外连接分3种    在Oracle中使用(+)表示连接 左外连接(左连接) 左关系属性=右关系属性(+) 表示左外连接 右外连接(右连接) 左关系属性(+)=右关系属性 表示右外连接 全外连接(全连接) 示例一 (内连接) 将emp和dept表联合查询 SEL

ESP8266学习笔记6:ESP8266规范wifi连接操作

一.前言 我整理了从2015年至今关于ESP8266的学习笔记,梳理出来了开发环境.基础功能.进阶学习三大部分.方便自己和他人.可点此查看,欢迎交流. 之前在笔记4<ESP8266的SmartConfig>http://blog.csdn.net/iotisan/article/details/54849410中,做了smartconfig例程的測试. 这个例程是因为DEMO演示的,离商用还有段距离. 几周前,有网友就问我esp8266又一次上电无法自己主动连接之前配置的路由器.这确实是眼下D

python 连接操作数据库

一.下面我们所说的就是连接mysql的应用: 1.其实在python中连接操作mysql的模块有多个,在这里我只给大家演示pymysql这一个模块(其实我是感觉它比较好用而已): pymysql是第三方的一个模块,需要我们自己安装,在这里顺便给大家普及一下安装第三方模块的知识:首先,如果我们在windows上安装的话,需要先安装python解释器,然后里面自带 了pip模块(我们安装第三方模块,都是需要这个命令去安装的),配置完环境变量之后我们就可以直接在dos命令行中执行要安装的模块了,如下: