【知识整理】这可能是最好的RxJava 2.x 入门教程(四)

这可能是最好的RxJava 2.x入门教程系列专栏

文章链接:

这可能是最好的RxJava 2.x 入门教程(一)

这可能是最好的RxJava 2.x 入门教程(二)

这可能是最好的RxJava 2.x 入门教程(三)

这可能是最好的RxJava 2.x 入门教程(四)

GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples

为了满足大家的饥渴难耐,GitHub将同步更新代码,主要包含基本的代码封装,RxJava 2.x所有操作符应用场景介绍和实际应用场景,后期除了RxJava可能还会增添其他东西,总之,GitHub上的Demo专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

一、前言

最近很多小伙伴私信我,说自己很懊恼,对于RxJava 2.x 系列一看就能明白,但自己写却又写不出来。如果 LZ 能放上实战情景教程就最好不过了。也是哈,单讲我们的操作符,也让我们的教程不温不火,但 LZ 自己选择的路,那跪着也要走完呀。所以,也就让我可怜的小伙伴们忍忍了,操作符马上就讲完了。

二、正题

16、Single

顾名思义,Single 只会接收一个参数,而SingleObserver 只会调用onError 或者onSuccess。

 1 Single.just(new Random().nextInt())
 2                 .subscribe(new SingleObserver<Integer>() {
 3                     @Override
 4                     public void onSubscribe(@NonNull Disposable d) {
 5
 6                     }
 7
 8                     @Override
 9                     public void onSuccess(@NonNull Integer integer) {
10                         mRxOperatorsText.append("single : onSuccess : "+integer+"\n");
11                         Log.e(TAG, "single : onSuccess : "+integer+"\n" );
12                     }
13
14                     @Override
15                     public void onError(@NonNull Throwable e) {
16                         mRxOperatorsText.append("single : onError : "+e.getMessage()+"\n");
17                         Log.e(TAG, "single : onError : "+e.getMessage()+"\n");
18                     }
19                 });

输出:

17、distinct

去重操作符,简单的作用就是去重。

1 Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
2                 .distinct()
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("distinct : " + integer + "\n");
7                         Log.e(TAG, "distinct : " + integer + "\n");
8                     }
9                 });

输出:

很明显,发射器发送的事件,在接收的时候被去重了。

18、debounce

去除发送频率过快的项,看起来好像没啥用处,但你信我,后面绝对有地方很有用武之地。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        }).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        mRxOperatorsText.append("debounce :" + integer + "\n");
                        Log.e(TAG,"debounce :" + integer + "\n");
                    }
                });

输出:

代码很清晰,去除发送间隔时间小于500毫秒的发射事件,所以1 和 3 被去掉了。

19、defer

简单地时候就是每次订阅都会创建一个新的Observable,并且如果没有被订阅,就不会产生新的Observable

 1 Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
 2             @Override
 3             public ObservableSource<Integer> call() throws Exception {
 4                 return Observable.just(1, 2, 3);
 5             }
 6         });
 7
 8
 9         observable.subscribe(new Observer<Integer>() {
10             @Override
11             public void onSubscribe(@NonNull Disposable d) {
12
13             }
14
15             @Override
16             public void onNext(@NonNull Integer integer) {
17                 mRxOperatorsText.append("defer : " + integer + "\n");
18                 Log.e(TAG, "defer : " + integer + "\n");
19             }
20
21             @Override
22             public void onError(@NonNull Throwable e) {
23                 mRxOperatorsText.append("defer : onError : " + e.getMessage() + "\n");
24                 Log.e(TAG, "defer : onError : " + e.getMessage() + "\n");
25             }
26
27             @Override
28             public void onComplete() {
29                 mRxOperatorsText.append("defer : onComplete\n");
30                 Log.e(TAG, "defer : onComplete\n");
31             }
32         });

输出:

20、last

last 操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。

1 Observable.just(1, 2, 3)
2                 .last(4)
3                 .subscribe(new Consumer<Integer>() {
4                     @Override
5                     public void accept(@NonNull Integer integer) throws Exception {
6                         mRxOperatorsText.append("last : " + integer + "\n");
7                         Log.e(TAG, "last : " + integer + "\n");
8                     }
9                 });

输出:

21、merge

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

1 Observable.merge(Observable.just(1, 2), Observable.just(3, 4, 5))
2                 .subscribe(new Consumer<Integer>() {
3                     @Override
4                     public void accept(@NonNull Integer integer) throws Exception {
5                         mRxOperatorsText.append("merge :" + integer + "\n");
6                         Log.e(TAG, "accept: merge :" + integer + "\n" );
7                     }
8                 });

输出:

22、reduce

reduce 操作符每次用一个方法处理一个值,可以有一个 seed 作为初始值。

 1 Observable.just(1, 2, 3)
 2                 .reduce(new BiFunction<Integer, Integer, Integer>() {
 3                     @Override
 4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
 5                         return integer + integer2;
 6                     }
 7                 }).subscribe(new Consumer<Integer>() {
 8             @Override
 9             public void accept(@NonNull Integer integer) throws Exception {
10                 mRxOperatorsText.append("reduce : " + integer + "\n");
11                 Log.e(TAG, "accept: reduce : " + integer + "\n");
12             }
13         });

输出:

可以看到,代码中,我们中间采用 reduce ,支持一个 function 为两数值相加,所以应该最后的值是:1 + 2 = 3 + 3 = 6 , 而Log 日志完美解决了我们的问题。

23、scan

scan 操作符作用和上面的 reduce 一致,唯一区别是 reduce 是个只追求结果的坏人,而 scan 会始终如一地把每一个步骤都输出。

 1 Observable.just(1, 2, 3)
 2                 .scan(new BiFunction<Integer, Integer, Integer>() {
 3                     @Override
 4                     public Integer apply(@NonNull Integer integer, @NonNull Integer integer2) throws Exception {
 5                         return integer + integer2;
 6                     }
 7                 }).subscribe(new Consumer<Integer>() {
 8             @Override
 9             public void accept(@NonNull Integer integer) throws Exception {
10                 mRxOperatorsText.append("scan " + integer + "\n");
11                 Log.e(TAG, "accept: scan " + integer + "\n");
12             }
13         });

输出:

看日志,没毛病。

24、window

按照实际划分窗口,将数据发送给不同的Observable

 1 mRxOperatorsText.append("window\n");
 2         Log.e(TAG, "window\n");
 3         Observable.interval(1, TimeUnit.SECONDS) // 间隔一秒发一次
 4                 .take(15) // 最多接收15个
 5                 .window(3, TimeUnit.SECONDS)
 6                 .subscribeOn(Schedulers.io())
 7                 .observeOn(AndroidSchedulers.mainThread())
 8                 .subscribe(new Consumer<Observable<Long>>() {
 9                     @Override
10                     public void accept(@NonNull Observable<Long> longObservable) throws Exception {
11                         mRxOperatorsText.append("Sub Divide begin...\n");
12                         Log.e(TAG, "Sub Divide begin...\n");
13                         longObservable.subscribeOn(Schedulers.io())
14                                 .observeOn(AndroidSchedulers.mainThread())
15                                 .subscribe(new Consumer<Long>() {
16                                     @Override
17                                     public void accept(@NonNull Long aLong) throws Exception {
18                                         mRxOperatorsText.append("Next:" + aLong + "\n");
19                                         Log.e(TAG, "Next:" + aLong + "\n");
20                                     }
21                                 });
22                     }
23                 });

输出:

三、写在最后

至此,大部分 RxJava 2.x 的操作符就告一段落了,当然还有一些没有提到的操作符,不是说它们不重要,而是 LZ 也要考虑大家的情况,接下来就会根据实际应用场景来对 RxJava 2.x 发起冲锋。如果想看更多的数据,请移步 GitHub:https://github.com/nanchen2251/RxJava2Examples

时间: 2024-12-18 11:22:28

【知识整理】这可能是最好的RxJava 2.x 入门教程(四)的相关文章

【知识整理】这可能是最好的RxJava 2.x 入门教程(五)

这可能是最好的RxJava 2.x入门教程系列专栏 文章链接: 这可能是最好的RxJava 2.x 入门教程(一) 这可能是最好的RxJava 2.x 入门教程(二) 这可能是最好的RxJava 2.x 入门教程(三) 这可能是最好的RxJava 2.x 入门教程(四) 这可能是最好的RxJava 2.x 入门教程(五) GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples 为了满足大家的饥渴难耐,GitHub将同步更新代码,主

【知识整理】这可能是最好的RxJava 2.x 入门教程(三)

目录 一.前言 二.正题 7.distinct 8.Filter 9.buffer 10.timer 11.interval 12.doOnNext 13.skip 14.take 15.just 三.写在最后 正文 这可能是最好的RxJava 2.x入门教程系列专栏 文章链接: 这可能是最好的RxJava 2.x 入门教程(一) 这可能是最好的RxJava 2.x 入门教程(二) 这可能是最好的RxJava 2.x 入门教程(三) GitHub 代码同步更新:https://github.co

这可能是最好的RxJava 2.x 入门教程(一)

这可能是最好的 RxJava 2.x 入门教程系列专栏 文章链接: 这可能是最好的 RxJava 2.x 入门教程(完结版)[重磅推出] 这可能是最好的RxJava 2.x 入门教程(一) 这可能是最好的RxJava 2.x 入门教程(二) 这可能是最好的RxJava 2.x 入门教程(三) 这可能是最好的RxJava 2.x 入门教程(四) 这可能是最好的RxJava 2.x 入门教程(五) GitHub 代码同步更新:https://github.com/nanchen2251/RxJava

Web网站服务及知识整理(二)

Web网站服务及知识整理(二)

Java基础知识整理(一)

概述 公司业务需要,产品既要有.NET又需要Java,没得选择,只能业余时间学习Java,整体觉得Java也.NET还是很相似的,只是语法有差别,差别也不是很大,这就将学习Java的基础知识整理下,以便于自己的学习.作为个.NET程序猿也可以学习Java ,毕竟技多不压身,学习多也要精通. 开发工具 eclipse ,开发java类似.NET 需要装JDK类似.NET Framework. Java开发工具eclipse设置 1.设置字体:window设置: 2.设置快捷键:window--ke

Linux动态库相关知识整理

动态库和静态库在C/C++开发中很常见,相比静态库直接被编译到可执行程序, 动态库运行时加载使得可执行程序的体积更小,更新动态库可以不用重新编译可执 行程序等诸多好处.作者是一个Linux后台开发,这些知识经常用到,所以 整理了一下这方面的知识.静态库相对简单,本文只关心Linux平台下的动态库. 创建动态库 这里我把一个短小却很有用的哈希函数编译成动态库做为示例,ELFhash用于对字符串做哈希,返回一个无符号整数. //elfhash.h #include <stdio.h> unsign

数据库知识整理

关系型数据库知识整理: 一,关系型数据库管理系统简介: 1.1使用数据库的原因: 降低存储数据的冗余度 提高数据的一致性 可以建立数据库所遵循的标准 储存数据可以共享 便于维护数据的完整性 能够实现数据的安全性 1.2基本概念: 对于任何数据库来说,表之间的关联关系存在三种基本的关系类型:一对一,一对多,多对多仅此三种. 目前流行的关系型数据库服务器管理系统有:微软-MS SQL Server.甲骨文-Oracle.IBM-DB2.开源的MySql和PostgreSQL等.(在面试中有被问到过)

DIV+CSS网页布局常用的一些基础知识整理

CSS命名规范一.文件命名规范 全局样式:global.css:框架布局:layout.css:字体样式:font.css:链接样式:link.css:打印样式:print.css: 二.常用类/ID命名规范页 眉:header内 容:content容 器:container页 脚:footer 版 权:copyright 导 航:menu主导航:mainMenu子导航:subMenu 标 志:logo标 语:banner标 题:title侧边栏:sidebar 图 标:Icon注 释:note

WIFI基本知识整理

WIFI基本知识整理 这里对wifi的802.11协议中比较常见的知识做一个基本的总结和整理,便于后续的学习.因为无线网络中涉及术语很多,并且许多协议都是用英文描述,所以有些地方翻译出来会有歧义,这种情况就直接英文来描述了. 主要内容: 一.基本概述 二.实践基础 三.一些原理 四.补充 五.其它 一.基本概述 ============================ 1.有线和无线网络 目前有线网络中最著名的是以太网(Ethenet),但是无线网络WLAN是一个很有前景的发展领域,虽然可能不会