RxJava之错误处理

在Observable发射数据时,有时发送onError通知,导致观察者不能正常接收数据。可是,有时我们希望对Observable发射的onError通知做出响应或者从错误中恢复。此时我们该如何处理呢?下面我们了解下RxJava错误处理相关的操作符。

catch

流程图

概述

catch操作符拦截原Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

在RxJava中,catch实现为三个不同的操作符:

  • onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
  • onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
  • onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。

onErrorReturn

流程图

概述

onErrorReturn方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会通过参数函数,创建一个特殊项并发发射,最后调用观察者的onCompleted方法。

API

Javadoc: onErrorReturn(Func1))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorReturn(new Func1<Throwable, Student>() {
            @Override
            public Student call(Throwable throwable) {
                return new Student(1001, "error - 1 ", 10);
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                mAdaStudent.addData(student);
            }
        });

Log打印

OperateActivity: do onNext
Student{id=‘1‘name=‘A‘, age=23}
OperateActivity: do onNext
Student{id=‘2‘name=‘B‘, age=33}
OperateActivity: do onNext
Student{id=‘3‘name=‘C‘, age=24}
OperateActivity: do onNext
Student{id=‘1001‘name=‘error - 1 ‘, age=10}
OperateActivity: do onCompleted

示例解析

在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。而在onErrorReturn方法处理中,其参数函数中,创建并返回了一个特殊项( new Student(1001, “error - 1 “, 10)).

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了一个特殊项后,调用了onCompleted方法,结束了此次订阅。而这个特殊项,正是在onErrorReturn中参数函数中,创建的特殊项。

onErrorResumeNext

流程图

概述

onErrorResumeNext方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将onError通知传递给观察者,但作为替代,=新的Observable开始发射数据。

onErrorResumeNext方法与onErrorReturn()方法类似,都是拦截原Observable的onError通知,不同的是拦截后的处理方式,onErrorReturn创建并返回一个特殊项,而onErrorResumeNext创建并返回一个新的Observabl,观察者会订阅它,并接收其发射的数据。

API

Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorResumeNext(new Func1<Throwable, Observable<Student>>() {
            @Override
            public Observable<Student> call(Throwable throwable) {
                return Observable.just(new Student(1001, "error - 1 ", 10), new Student(1002, "error - 2 ", 10));
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
            });

Log打印

OperateActivity: do onNext
Student{id=‘1‘name=‘A‘, age=23}
OperateActivity: do onNext
Student{id=‘2‘name=‘B‘, age=33}
OperateActivity: do onNext
Student{id=‘3‘name=‘C‘, age=24}
OperateActivity: do onNext
Student{id=‘1001‘name=‘error - 1 ‘, age=10}
OperateActivity: do onNext
Student{id=‘1002‘name=‘error - 2 ‘, age=10}
OperateActivity: do onCompleted

示例解析

在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。在onErrorResumeNext方法中的参数函数中,创建了一个新的Observable。

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了新建的创建了一个新的Observable发射的出具。在新Observable发射完数据后,调用了onCompleted方法,结束了此次订阅。

onExceptionResumeNext

流程图

概述

onExceptionResumeNext方法与onErrorResumeNext方法类似创建并返回一个拥有类似原Observable的新Observable,,也使用这个备用的Observable。不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

API

Javadoc: onExceptionResumeNext(Observable))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onExceptionResumeNext(Observable.just(new Student(1001, "error - 1 ", 10),
                new Student(1002, "error - 2 ", 10)))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

 Observable.create(new Observable.OnSubscribe<Student>() {
        @Override
        public void call(Subscriber<? super Student> subscriber) {
            subscriber.onNext(getListOfStudent().get(0));
            subscriber.onNext(getListOfStudent().get(1));
            subscriber.onNext(getListOfStudent().get(2));
            subscriber.onError(new Exception("do onError"));
            subscriber.onNext(getListOfStudent().get(3));
            subscriber.onNext(getListOfStudent().get(4));
            subscriber.onNext(getListOfStudent().get(5));
        }
    }) ***

Log打印

1.
do onError
2.
OperateActivity: do onNext
Student{id=‘1‘name=‘A‘, age=23}
OperateActivity: do onNext
Student{id=‘2‘name=‘B‘, age=33}
OperateActivity: do onNext
Student{id=‘3‘name=‘C‘, age=24}
OperateActivity: do onNext
Student{id=‘1001‘name=‘error - 1 ‘, age=10}
OperateActivity: do onNext
Student{id=‘1002‘name=‘error - 2 ‘, age=10}
OperateActivity: do onCompleted

示例解析

在创建Observale发送OnError通知时,error采用了两种方式,一个是Throwable,另外一个是Exception。从打印的Log中可以看出,在采用第一种方式时,原Observable直接发送了onError通知,并结束发射。但是采用发射Exception作为onError通知时,原Observale的onError通知被拦截,并使用了onExceptionResumeNext()创建的备用Observale。正如概述中叙述的,onExceptionResumeNext方法至拦截原Observale中Exception作为onError的通知,并将在参数函数中创建的备用Observable中的数据发射出去。

retry

流程图

概述

retry()操作符将拦截原Observable传递onError给观察者,而是重新订阅此Observable。由于是重新订阅会造成数据重复。

在RxJava中,retry()操作符有几个变体

retry()变体在出现onError通知时,将无限的重新订阅原Observable.

retry(long)变体通过参数指定最多重新订阅的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。

retry(Func2)变体通过参数接受两个参数的函数,参数为重试次数和导致发射onError通知的Throwable,而函数返回一个布尔值,如果返回true,retry应该再次订阅原Observable,如果返回false,retry会将最新的一个onError通知传递给它的观察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).retry(3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

Log打印

OperateActivity: do onNext
Student{id=‘1‘name=‘A‘, age=23}
OperateActivity: do onNext
Student{id=‘2‘name=‘B‘, age=33}
OperateActivity: do onNext
Student{id=‘3‘name=‘C‘, age=24}
OperateActivity: do onNext
Student{id=‘1‘name=‘A‘, age=23}
OperateActivity: do onNext
Student{id=‘2‘name=‘B‘, age=33}
OperateActivity: do onNext
Student{id=‘3‘name=‘C‘, age=24}
OperateActivity: do onNext
Student{id=‘4‘name=‘D‘, age=24}
OperateActivity: do onNext
Student{id=‘5‘name=‘E‘, age=33}
OperateActivity: do onNext
Student{id=‘6‘name=‘F‘, age=23}

示例解析

从示例代码中可以看出,第一次订阅时,发射完第三个通知后,发送onError通知。但,通过Log打印可以清晰的看出,onError通知并没有发射出去,而是重新订阅,将之前发射的数据,重新发了一遍。正如之前说的,retry()操作符会拦截onError通知并重新订阅,但是会造成数据的重复。

retryWhen

流程图

概述

retryWhen()默认在trampoline调度器上执行,可以通过参数指定其它的调度器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例代码

1.
Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });
2.
***
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<Throwable> call(Observable<? extends Throwable> observable) {
        return Observable.error(new Throwable(" do retryWhen"));
    }
})
****

Log打印

1.

OperateActivity: do onNext

Student{id=’1’name=’A’, age=23}

OperateActivity: do onNext

Student{id=’2’name=’B’, age=33}

OperateActivity: do onNext

Student{id=’3’name=’C’, age=24}

OperateActivity: do onNext

Student{id=’1’name=’A’, age=23}

OperateActivity: do onNext

Student{id=’2’name=’B’, age=33}

OperateActivity: do onNext

Student{id=’3’name=’C’, age=24}

OperateActivity: do onNext

Student{id=’4’name=’D’, age=24}

OperateActivity: do onNext

Student{id=’5’name=’E’, age=33}

OperateActivity: do onNext

Student{id=’6’name=’F’, age=23}

2.

do onError

示例解析

示例1中,在retryWhend(Func1)的参数函数中,创建并返回了一个可发射数据的Observable对象,而在示例2中,其参数函数,创建并返回了一个发射onError通知的Observable。通过Log打印可以出,示例1在拦截了原Observable中的onError通知,并重新订阅了原Observable,但是示例2中,观察者接收了onError通知,意味着原Observable中的onError通知未被拦截,直接发射出去。示例2中,正体现了retryWhen()和retry()的不同之处。

时间: 2024-07-29 19:20:03

RxJava之错误处理的相关文章

RxJava 错误处理

在Observable发射数据时,有时发送onError通知,导致Observer不能接收数据.可是,我们很希望对Observable发射的onError通知做出响应或者从错误中恢复.此时我们该如何处理呢? 解决办法就是使用Error handling相关的操作符来集中统一地处理错误.RxJava中错误处理的操作符为 Catch和 Retry. Catch ??Catch操作符能够拦截原始Observable的onError通知,不让Observable因为产生错误而终止.相当于Java中try

Python爬取CSDN博客文章

之前解析出问题,刚刚看到,这次仔细审查了 0 url :http://blog.csdn.net/youyou1543724847/article/details/52818339Redis一点基础的东西目录 1.基础底层数据结构 2.windows下环境搭建 3.java里连接redis数据库 4.关于认证 5.redis高级功能总结1.基础底层数据结构1.1.简单动态字符串SDS定义: ...47分钟前1 url :http://blog.csdn.net/youyou1543724847/

Retrofit+RxJava 优雅的处理服务器返回异常、错误

开始本博客之前,请先阅读: Retrofit请求数据对错误以及网络异常的处理 异常&错误 实际开发经常有这种情况,比如登录请求,接口返回的 信息包括请求返回的状态:失败还是成功,错误码,User对象等等.如果网络等原因引起的登录失败可以归结为异常,如果是用户信息输入错误导致的登录失败算是错误. 假如服务器返回的是统一数据格式: /** * 标准数据格式 * @param <T> */ public class Response<T> { public int state;

RxJava retryWhen操作符实现错误重试机制

业务需求 当我们在app里发起网络请求时,可能会因为各种问题导致失败.如何利用RxJava来实现出现错误后重试若干次,并且可以设定重试的时间间隔. 具体实现 网络请求使用Retrofit来做,请求用户信息接口 @GET("/userinfo?noToken=1") Observable<Response> getUserInfoNoToken(); 请求用户信息接口的逻辑代码 userApi.getUserInfoNoToken() //总共重试3次,重试间隔3000毫秒

Android Retrofit+RxJava 优雅的处理服务器返回异常、错误

开始本博客之前,请先阅读: Retrofit请求数据对错误以及网络异常的处理 异常&错误 实际开发经常有这种情况,比如登录请求,接口返回的 信息包括请求返回的状态:失败还是成功,错误码,User对象等等.如果网络等原因引起的登录失败可以归结为异常,如果是用户信息输入错误导致的登录失败算是错误. 假如服务器返回的是统一数据格式: /** * 标准数据格式 * @param <T> */ public class Response<T> { public int state;

Spring Cloud ZooKeeper集成Feign的坑2,服务调用了一次后第二次调用就变成了500,错误:Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is com.n

错误如下: 2017-09-19 15:05:24.659 INFO 9986 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]56528192: startup date [Tue Sep 19 15:05:24 CST 2017]; root of context hierarchy 2017-09-19 15:05:24.858 INFO 9986 --

RxJava概叙

给Android开发者的 RxJava 详解:http://gank.io/post/560e15be2dca930e00da1083 响应式编程是一种异步数据流交互的编程范式,而RxJava就是基于事件操作异步数据流在Java上实现的库 核心的理念是将一切都当做数据流来看待,各种变量,用户输入,数据结构,缓存等等 而Rx库提供了高度抽象的函数来操作流,创建.流入流出.过滤.合并.映射等等各种变换 不仅如此,Rx库还使得异步操作,和错误处理变得非常简洁. 使用了RxJava后明显的好处就是 1解

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

Retrofit自定义GsonConverter处理请求错误异常处理

通常从服务端拿到的JSON数据格式大概如下: { "code":1, "message":"查询成功", "detail":{"aa":"123","bb":"123","cc":"123"} } 因此通常我们会定义一个实体类来解析对应的json: public class Response { @Seria