Rxjava2 Observable的错误处理操作详解及实例

目录

  • 简要:
  • 1. Catch
    • 1.1 onErrorReturn
    • 1.2 onErrorResumeNext
    • 1.3 onExceptionResumeNext
  • 2. Retry
    • 2.1 retry()
    • 2.2 retry(long times)
    • 2.3 retry(long times, Predicate predicate)
    • 2.4 retry(Predicate predicate)
    • 2.5 retry(BiPredicate predicate)
    • 2.6 retryUntil(BooleanSupplier stop)
    • 2.7 retryWhen(Function handler)
  • 小结

简要:

需求了解:

Rxjava 中当数据处理派发中发生了异常 ,观察者会接受到一个 Error 的通知,那如果不想发射这个异常的通知,自己处理掉呢?答案当然是可以的,在 Rxjava 中很多操作符可用于对 Observable 发射的 onError 通知做出响应或者从错误中恢复。

例如:

  1. 吞掉这个错误,切换到一个备用的Observable继续发射数据
  2. 吞掉这个错误然后发射默认值
  3. 吞掉这个错误并立即尝试重启这个Observable
  4. 吞掉这个错误,在一些回退间隔后重启这个Observable

Rxjava中常见的错误处理操作符有如下几类:

  • onErrorReturn():指示Observable在遇到错误时发射一个特定的数据
  • onErrorResumeNext():指示Observable在遇到错误时发射一个数据序列
  • onExceptionResumeNext():指示Observable遇到错误时继续发射数据
  • retry():指示Observable遇到错误时重试
  • retryWhen():指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable

1. Catch

从 onError 通知中恢复发射数据。

Catch 操作符拦截原始Observable的 onError 通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

1.1 onErrorReturn

onErrorReturn 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的 onCompleted 方法。

  • onErrorReturnItem(T item): 让Observable遇到错误时发射一个指定的项(item)并且正常终止。

  • onErrorReturn(Function<Throwable, T> valueSupplier):让Observable遇到错误时通过一个函数Function来进行判断返回指定的类型数据,并且正常终止。

示例代码:

    // 创建一个可以发射异常的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(1 / 0);  // 产生一个异常
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });

    /** 1. onErrorReturnItem(T item)
     * 让Observable遇到错误时发射一个指定的项(item)并且正常终止。
     */
    observable.onErrorReturnItem(888)   // 源Observable发生异常时发射指定的888数据
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(1)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(1): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(1): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(1)");
                }
            });

    System.out.println("-----------------------------------------------");
    /**
     * 2. onErrorReturn(Function<Throwable, T> valueSupplier)
     * 让Observable遇到错误时通过一个函数Function来接受Error参数并进行判断返回指定的类型数据,并且正常终止。
     */
    observable.onErrorReturn(new Function<Throwable, Integer>() {
        @Override
        public Integer apply(Throwable throwable) throws Exception {
            System.out.println("--> apply(1): e = " + throwable);
            return 888; // 源Observable发生异常时发射指定的888数据
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(2)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(2): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(2): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(2)");
        }
    });

输出:

--> onSubscribe(1)
--> onNext(1): 1
--> onNext(1): 2
--> onNext(1): 888
--> onCompleted(1)
-----------------------------------------------
--> onSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> apply(1): e = java.lang.ArithmeticException: / by zero
--> onNext(2): 888
--> onCompleted(2)

Javadoc: onErrorReturnItem(T item)
Javadoc: onErrorReturn(Function<Throwable, T> valueSupplier)

1.2 onErrorResumeNext

onErrorResumeNext 方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的 onError 调用,不会将错误传递给观察者,作为替代,它会开始另一个指定的备用Observable。

  • onErrorResumeNext(ObservableSource next): 让Observable在遇到错误时开始发射第二个指定的Observable的数据序列。
  • onErrorResumeNext(Function<Throwable, ObservableSource> resumeFunction):让Observable在遇到错误时通过一个函数Function来接受Error参数并进行判断返回指定的第二个Observable的数据序列。

示例代码:

    // 创建一个可以发射异常的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(1 / 0);  // 产生一个异常
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });

    /**
     * 3. onErrorResumeNext(ObservableSource next)
     * 让Observable在遇到错误时开始发射第二个指定的Observable的数据序列
     */
    observable.onErrorResumeNext(Observable.just(888))  // 当发生异常的时候继续发射此项Observable
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(3)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(3): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(3): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(3)");
                }
            });

    System.out.println("-----------------------------------------------");
    /**
     * 4. onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)
     * 让Observable在遇到错误时通过一个函数Function来接受Error参数并进行判断返回指定的第二个Observable的数据序列
     */
    observable.onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
        @Override
        public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {
            System.out.println("--> apply(4): " + throwable);
            return Observable.just(888);    // 当发生异常的时候继续发射此项Observable
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(4)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(4): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(4): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(4)");
        }
    });

输出:

--> onSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onNext(3): 888
--> onCompleted(3)
-----------------------------------------------
--> onSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> apply(4): java.lang.ArithmeticException: / by zero
--> onNext(4): 888
--> onCompleted(4)

Javadoc: onErrorResumeNext(ObservableSource next)
Javadoc: onErrorResumeNext(Function<Throwable, ObservableSource<T>> resumeFunction)

1.3 onExceptionResumeNext

与 onErrorResumeNext 类似, onExceptionResumeNext 方法返回一个镜像原有Observable行为的新Observable,也使用一个备用的Observable,不同的是,如果 onError 收到的 Throwable 不是一个 Exception ,它会将错误传递给观察者的 onError 方法,不会使用备用的Observable。

解析: onExceptionResumeNext 只会对Exception类型的异常进行处理,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable 。

示例代码:

    // 创建一个可以发射异常的Observable
    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
        //  emitter.onError(new Throwable("This is Throwable!"));  // Throwable类型异常,直接通知观察者
        //  emitter.onError(new Error("This is Error!"));          // Error类型异常,直接通知观察者
            emitter.onError(new Exception("This is Exception!"));  // Exception类型异常,进行处理,发送备用的Observable数据
        //    emitter.onNext(1 / 0);  // 会产生一个ArithmeticException异常,异常会被处理,发送备用的Observable数据
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });
    /**
     * 5. onExceptionResumeNext(ObservableSource next)
     *  如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable
     *  只对Exception类型的异常通知进行备用Observable处理
     */
    observable1.onExceptionResumeNext(Observable.just(888))
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("--> onSubscribe(5)");
                }

                @Override
                public void onNext(Integer integer) {
                    System.out.println("--> onNext(5): " + integer);
                }

                @Override
                public void onError(Throwable e) {
                    System.out.println("--> onError(5): " + e);
                }

                @Override
                public void onComplete() {
                    System.out.println("--> onCompleted(5)");
                }
            });

输出:

--> onSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 888
--> onCompleted(5)

Javadoc: onExceptionResumeNext(ObservableSource next)

2. Retry

如果原始Observable遇到错误,重新订阅它期望它能正常终止。

Retry 操作符不会将原始 Observable 的 onError 通知传递给观察者,它会订阅这个Observable,再给它机会无错误地完成它的数据序列。 Retry 总是传递 onNext 通知给观察者,由于重新订阅,可能会造成数据项重复情况。

2.1 retry()

retry():无论收到多少次 onError 通知,无参数版本的 retry 都会继续订阅并发射原始Observable。


注意: 因为如果遇到异常,将会无条件的重新订阅原始的Observable,知道没有异常的发射全部的数据序列为止。所以如果你的异常发生后重新订阅也不会恢复正常的话,会一直订阅下去,有内存泄露的风险。

2.2 retry(long times)

retry(long times):接受单个 count 参数的 retry 会最多重新订阅指定的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个 onError 通知传递给它的观察者。

2.3 retry(long times, Predicate predicate)

retry(long times, Predicate<Throwable> predicate):遇到异常后最多重新订阅 times 次,每次重新订阅经过函数predicate 最终判断是否继续重新订阅,如果 times 到达上限或者 predicate 返回 false 中任意一个最先满足条件,都会终止重新订阅,retry 会将最新的一个 onError 通知传递给它的观察者。

2.4 retry(Predicate predicate)

retry(Predicate<Throwable> predicate):接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射 onError 通知的 Throwable 。这个函数返回一个布尔值,如果返回 true , retry 应该再次订阅和镜像原始的Observable,如果返回 false , retry 会将最新的一个 onError 通知传递给它的观察者

2.5 retry(BiPredicate predicate)

retry(BiPredicate<Integer, Throwable> predicate):遇到异常时,通过函数 predicate 判断是否重新订阅源Observable,并且通过参数 Integer 传递给 predicate 重新订阅的次数,retry 会将最新的一个 onError 通知传递给它的观察者。

2.6 retryUntil(BooleanSupplier stop)

retryUntil(BooleanSupplier stop):重试重新订阅,直到给定的停止函数 stop 返回 true,retry 会将最新的一个 onError 通知传递给它的观察者。

2.7 retryWhen(Function handler)

retryWhen(Function<Observable<Throwable>, ObservableSource> handler):retryWhen 和 retry 类似,区别是, retryWhen 将 onError 中的 Throwable 传递给一个函数,这个函数产生另一个 Observable, retryWhen 观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是 onError 通知,它就将这个通知传递给观察者然后终止。

实例代码:

    // flag for emitted onError times
    public static int temp = 0;

    // 创建可以发送Error通知的Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            if (temp <= 2) {
                emitter.onError(new Exception("Test Error!"));
                temp++;
            }
            emitter.onNext(3);
            emitter.onNext(4);
        }
    });

    /**
     * 1. retry()
     *  无论收到多少次onError通知, 都会去继续订阅并发射原始Observable。
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(1)");
        }
    }).retry().subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println("--> accept(1): " + integer);
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 2. retry(long times)
     *  遇到异常后,最多重新订阅源Observable times次
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(2)");
        }
    }).retry(1) // 遇到异常后,重复订阅的1次
      .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(2)");
            }

            @Override
            public void onNext(Integer integer) {
                System.out.println("--> onNext(2): " + integer);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(2): " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onCompleted(2)");
            }
      });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 3. retry(long times, Predicate<Throwable> predicate)
     *  遇到异常后最多重新订阅times次,每次重新订阅经过函数predicate最终判断是否继续重新订阅
     *  如果times到达上限或者predicate返回false中任意一个最先满足条件,都会终止重新订阅
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(3)");
        }
    }).retry(2, new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) throws Exception {
            System.out.println("--> test(3)");
            if(throwable instanceof Exception) {
                return true;    // 遇到异常通知后是否继续继续订阅
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(3)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(3): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(3): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(3)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 4. retry(Predicate<Throwable> predicate)
     *  遇到异常时,通过函数predicate判断是否重新订阅源Observable
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(4)");
        }
    }).retry(new Predicate<Throwable>() {
        @Override
        public boolean test(Throwable throwable) throws Exception {
            if (throwable instanceof Exception) {
                return true;    // 遇到异常通知后是否继续继续订阅
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(4)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(4): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(4): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(4)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 5. retry(BiPredicate<Integer, Throwable> predicate)
     *   遇到异常时,通过函数predicate判断是否重新订阅源Observable,并且通过参数integer传递给predicate重新订阅的次数
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(5)");
        }
    }).retry(new BiPredicate<Integer, Throwable>() {
        @Override
        public boolean test(Integer integer, Throwable throwable) throws Exception {
            System.out.println("--> test(5): " + integer);
            if (throwable instanceof Exception) {
                return true;    // 遇到异常通知后是否继续继续订阅
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(5)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(5): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(5): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(5)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 6. retryUntil(BooleanSupplier stop)
     * 重试重新订阅,直到给定的停止函数stop返回true
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(6)");
        }
    }).retryUntil(new BooleanSupplier() {
        @Override
        public boolean getAsBoolean() throws Exception {
            System.out.println("--> getAsBoolean(6)");
            if(temp == 1){  // 满足条件,停止重新订阅
                return true;
            }
            return false;
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(6)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(6): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(6): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(6)");
        }
    });

    System.out.println("---------------------------------------------");
    temp = 0;
    /**
     * 7. retryWhen(Function<Observable<Throwable>, ObservableSource> handler)
     *  将onError中的Throwable传递给一个函数handler,这个函数产生另一个Observable,
     *  retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。
     *  如果这个Observable发射了一项数据,它就重新订阅,
     *  如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
     */
    observable.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("----> doOnSubscribe(7)");
        }
    }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
            System.out.println("--> apply(7)");
            // 根据产生的Error的Observable是否正常发射数据来进行重新订阅,如果发射Error通知,则直接传递给观察者后终止
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    if (temp == 1) {
                        return Observable.error(throwable); // 满足条件后,传递这个Error,终止重新订阅
                    }
                    return Observable.timer(1, TimeUnit.MILLISECONDS);  // 正常发射数据,可以重新订阅
                }
            });
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe(7)");
        }

        @Override
        public void onNext(Integer integer) {
            System.out.println("--> onNext(7): " + integer);
        }

        @Override
        public void onError(Throwable e) {
            System.out.println("--> onError(7): " + e);
        }

        @Override
        public void onComplete() {
            System.out.println("--> onCompleted(7)");
        }
    });

    System.in.read();

输出:

----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
----> doOnSubscribe(1)
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
---------------------------------------------
--> onSubscribe(2)
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
----> doOnSubscribe(2)
--> onNext(2): 1
--> onNext(2): 2
--> onError(2): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> test(3)
----> doOnSubscribe(3)
--> onNext(3): 1
--> onNext(3): 2
--> onError(3): java.lang.Exception: Test Error!
---------------------------------------------
--> onSubscribe(4)
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
----> doOnSubscribe(4)
--> onNext(4): 1
--> onNext(4): 2
--> onNext(4): 3
--> onNext(4): 4
---------------------------------------------
--> onSubscribe(5)
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 1
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 2
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> test(5): 3
----> doOnSubscribe(5)
--> onNext(5): 1
--> onNext(5): 2
--> onNext(5): 3
--> onNext(5): 4
---------------------------------------------
--> onSubscribe(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
----> doOnSubscribe(6)
--> onNext(6): 1
--> onNext(6): 2
--> getAsBoolean(6)
--> onError(6): java.lang.Exception: Test Error!
---------------------------------------------
--> apply(7)
--> onSubscribe(7)
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
----> doOnSubscribe(7)
--> onNext(7): 1
--> onNext(7): 2
--> onError(7): java.lang.Exception: Test Error!

Javadoc: retry()
Javadoc: retry(long times)
Javadoc: retry(long times, Predicate<Throwable> predicate)
Javadoc: retry(Predicate<Throwable> predicate)
Javadoc: retry(BiPredicate<Integer, Throwable> predicate)
Javadoc: retryUntil(BooleanSupplier stop)
Javadoc: retryWhen(Function<Observable<Throwable>, ObservableSource> handler)

小结

本节主要介绍了 Rxjava 中关于 Error 通知的处理,主要是在遇到异常通知时,无条件或者指定条件的去重新订阅原始 Observable 直到没有异常(正常发射所有数据序列)或者满足指定的条件后终止重新订阅,发射异常通知给观察者。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127311.html

时间: 2024-10-03 02:59:16

Rxjava2 Observable的错误处理操作详解及实例的相关文章

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

目录 简要: 1. ConnectableObservable 2. Publish 3. Connect 4. RefCount 5. Share 6. Replay 小结 简要: 需求了解: Rxjava中的普通的 Observable 在观察者订阅的时候就会发射数据,但是有的时候我们想自己控制数据的发射,比如在有指定的观察者或者全部的观察者订阅后开始发射数据,这个时候我们就要要用到Rxjava中的可连接的Observable来完成这个需求. 这一节主要介绍 ConnectableObser

Rxjava2 Observable的结合操作详解及实例

目录 简要: 1. CombineLatest 2. Join 3. Merge 3.1 merge 3.2 mergeDelayError 4. Zip 5. StartWith 6. SwitchOnNext 6.1 switchOnNext 6.2 switchOnNextDelayError 小结 简要: 需求了解: 在使用 RxJava 开发的过程中,很多时候需要结合多个条件或者数据的逻辑判断,比如登录功能的表单验证,实时数据比对等.这个时候我们就需要使用 RxJava 的结合操作符来

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Windows DIB文件操作详解-4.使用DIB Section

前面讲了为了提高DIB的显示性能和效率,我们将DIB转换成DDB,但是这又遇到一个问题,如果我想操作DIB的数据的话,显然是不能使用DDB:一是因为DIB转DDB时发生了颜色转换,再就是DDB无法直接提取指定像素点的数据.那么我们怎么办呢,Windows使用一种折中的方式来达到这一目标(既提高了显示效率和性能,又可以直接操作像素点). 1.DIB Section存储和显示 Windows使用DIB块(DIB Section)来存储DIB数据,其内存结构示意图如下 其实,和我们自己读入DIB数据到