RxJava操作符doOnNext

doOnNext官方介绍:

The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

可以这么理解:

  • do系列的作用是side effect,当onNext发生时,它被调用,不改变数据流。
  • doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

我们先通过例子了解下。

有如下场景:

你的程序取到的 User 并不应该直接显示,而是需要先与数据库中的数据进行比对和修正后再显示。

RxJava与Retrofit结合使用,代码如下:

getUser(userId)
    .doOnNext(new Action1<User>() {
        @Override
        public void call(User user) {
            processUser(user);
        })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<User>() {
        @Override
        public void onNext(User user) {
            userView.setUser(user);
        }

        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable error) {
            // Error handling
            ...
        }
    });

后台代码和前台代码全都写在一条链中,清晰很多。在使用User信息之前,在doOnNext方法里对User信息进行了处理,最后执行了subscriber里的OnNext方法。

从github上很多项目和这篇文章来看,do系列的作用

  • 使用doOnNext()来调试
  • 在flatMap()里使用doOnError()作为错误处理。
  • 使用doOnNext()去保存/缓存网络结果

看下面代码:

final SimpleDateFormat sDateFormat = new SimpleDateFormat("yyyy-MM-dd    hh:mm:ss");
        Observable.create(new Observable.OnSubscribe<Person>() {
            @Override
            public void call(Subscriber<? super Person> subscriber) {
                String date = sDateFormat.format(new Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                Person person = new Person(201);
                subscriber.onNext(person);
            }
        }).subscribeOn(Schedulers.io()) //指定耗时进程
                .observeOn(Schedulers.newThread()) //指定doOnNext执行线程是新线程
                .doOnNext(new Action1<Person>() {
                    @Override
                    public void call(Person person) {
                        String date = sDateFormat.format(new Date());
                        System.out.println(date + " call " + Thread.currentThread().getName());
                        person.age = 301;
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).observeOn(AndroidSchedulers.mainThread())//指定最后观察者在主线程
                .subscribe(new Action1<Person>() {
                    @Override
                    public void call(Person person) {
                        String date = sDateFormat.format(new Date());
                        System.out.println(date + " call " + Thread.currentThread().getName());
                        Log.d(TAG, "call: " + person.age);
                    }
                });

执行结果:

03-01 14:49:29.897 23442-24145/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxCachedThreadScheduler-2
03-01 14:49:29.907 23442-24144/com.example.myrxlearn I/System.out: 2016-03-01    02:49:29 call RxNewThreadScheduler-2
03-01 14:49:31.907 23442-23442/com.example.myrxlearn I/System.out: 2016-03-01    02:49:31 call main

也就是说直到doOnNext里的方法在新线程执行完毕,subscribe里的call才有机会在主线程执行。

一直没看到有合适的方法解决这个问题,因为缓存的时间不应该去阻碍主线程里数据的显示。

今天做回顾时看到了这篇文章

非阻塞I/O操作

现在我们可以使用Schedulers.io()创建非阻塞的版本:

public static void storeBitmap(Context context, Bitmap bitmap,
                                   String filename) {
        Schedulers.io().createWorker().schedule(() -> {
            blockingStoreBitmap(context, bitmap, filename);
        });
    }

然后想起来一直存在的这个问题。只需要把上面的代码改成

.doOnNext(new Action1<Person>() {
            @Override
            public void call(Person person) {
                String date = sDateFormat.format(new Date());
                System.out.println(date + " call " + Thread.currentThread().getName());
                person.age = 301;
                Schedulers.io().createWorker().schedule(new Action0() {
                    @Override
                    public void call() {
                        try {
                            Thread.sleep(2000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        })

不需要在用observeOn指定在新线程就可以实现

03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.307 30368-30406/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call RxCachedThreadScheduler-1
03-01 14:55:02.347 30368-30368/com.example.myrxlearn I/System.out: 2016-03-01    02:55:02 call main

这样doOnNext可以方便的用来调试,用来缓存。

官网说:

doOnNext is for side-effects: you want to react (eg. log) to item

emissions in an intermediate step of your stream, for example before

the stream is filtered, for transverse behavior like logging, but you

still want the value to propagate down the stream.

onNext is more final, it consumes the value.

使用doOnNext的副作用是什么,还需要项目中来发现。

最后补充点。

Observable.create(new Observable.OnSubscribe<Person>() {
            @Override
            public void call(Subscriber<? super Person> subscriber) {
                Person person = new Person(201);
                subscriber.onNext(person);
            }
        }).doOnNext(new Action1<Person>() {
            @Override
            public void call(Person person) {
                person.age = 301;
            }
        }).subscribe(new Action1<Person>() {
            @Override
            public void call(Person person) {
                Log.d(TAG, "call: " + person.age);//输出301
            }
        });

上面的代码执行结果在doOnNext方法执行完,改变了流里的数据。是不是与官方文档说的不一样呢?

参考:http://blog.csdn.net/wangkai0681080/article/details/50772721

时间: 2024-10-20 01:24:16

RxJava操作符doOnNext的相关文章

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

RxJava操作符(二) __变换操作

RxJava变换操作符 这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer-.等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符.好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧. map 官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable. 还是举个例子吧,

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse

RxJava操作符repeatWhen()和retryWhen()

第一次见到.repeatWhen()和.retryWhen()这两个操作符的时候就非常困惑了.不得不说,它们绝对是"最令人困惑弹珠图"的有力角逐者. 然而它们都是非常有用的操作符:允许你有条件的重新订阅已经结束的Observable.我最近研究了它们的工作原理,现在我希望尝试着去解释它们(因为,我也是耗费了一些精力才参透它们). Repeat与Retry的对比 首先,来了解一下.repeat()和.retry()之间最直观的区别是什么?这个问题并不难:区别就在于什么样的终止事件会触发重

RxJava操作符(04-过滤操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656494 本文出自:[openXu的博客] 目录: Debounce Distinct ElementAt Filter First Last IgnoreElements SampleThrottleFirst SkipSkipLast TakeTakeLast 源码下载 "过滤操作",顾名思义,就是过滤掉Observable发射的一些数据,不让他发射出去,也就是忽略丢弃掉