Rxjava2 Observable的创建详解及实例

目录

  • 简要:
  • 1. Create
  • 2. Defer
  • 3. Empty/Never/Error
  • 4. Just
  • 5. From
  • 6. Repeat
  • 7. RepeatWhen
  • 8. RepeatUntil
  • 9. Range
  • 10. interval
  • 11. Timer
  • 小结

简要:

几种主要的需求

  • 直接创建一个Observable(创建操作)
  • 组合多个Observable(组合操作)
  • 对Observable发射的数据执行变换操作(变换操作)
  • 从Observable发射的数据中取特定的值(过滤操作)
  • 转发Observable的部分值(条件/布尔/过滤操作)
  • 对Observable发射的数据序列求值(算术/聚合操作)

创建Observable的各种方式

  • create():使用一个函数从头创建一个Observable
  • defer():只有当订阅者订阅才创建Observable;为每个订阅创建一个新的 Observable
  • empty() :创建一个什么都不做直接通知完成的Observable
  • never():创建一个不发射任何数据的Observable
  • error():—创建一个什么都不做直接通知错误的Observable
  • just():将一个或多个对象转换成发射这个或这些对象的一个Observable
  • from():将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
  • repeat():创建一个重复发射指定数据或数据序列的Observable
  • repeatWhen() :创建一个重复发射指定数据或数据序列的Observable,它依赖于另一 个Observable发射的数据
  • repeatUntil():根据条件(函数BooleanSupplier)判断是否需要继续订阅
  • range():创建一个发射指定范围的整数序列的Observable
  • interval():创建一个按照给定的时间间隔发射整数序列的Observable
  • timer():—创建一个在给定的延时之后发射单个数据的Observable

1. Create

使用 Create 操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数可以调用观察者的 onNextonErroronCompleted 方法,当发生订阅的时候会自动调用观察者的 onSubscribe 方法。

通过 Subscribe 进行Observable 与 Observer 的订阅,其中 subscribe 方法可以接收一个完整通知参数的 Observer 对象,也可以接收部分通知参数的 Consumer(接收数据) 或者 Action (仅接收通知) 对象。

实例代码:

    // 创建Observable(被观察者)
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {

        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("Hello");
            emitter.onNext("World");
            emitter.onComplete();
        }
    });

    // 创建Observer(观察者), 可以接受所有通知
    Observer<String> observer = new Observer<String>() {

        public void onSubscribe(Disposable d) {
            System.out.println("--> onSubscribe");
        }

        public void onNext(String t) {
            System.out.println("--> onNext = " + t);
        }

        public void onError(Throwable e) {
            System.out.println("--> onError");
        }

        public void onComplete() {
            System.out.println("--> onComplete");
        }
    };

    // 创建只接受 onNext(item) 通知的Consumer(观察者)
    Consumer<String> nextConsumer = new Consumer<String>() {

        @Override
        public void accept(String t) throws Exception {
            System.out.println("--> accept nextConsumer: " + t);
        }
    };

    // 创建只接受 onError(Throwable) 通知的Consumer(观察者)
    Consumer<Throwable> errorConsumer = new Consumer<Throwable>() {

        @Override
        public void accept(Throwable t) throws Exception {
            System.out.println("-- accept errorConsumer: " + t);
        }
    };

    // 创建只接受 onComplete() 通知的Action(观察者)
    Action completedAction = new Action() {

        @Override
        public void run() throws Exception {
            System.out.println("--> run completedAction");
        }
    };

    // 创建只接受 onSubscribe 通知的Consumer(观察者)
    Consumer<Disposable> onSubscribeComsumer = new Consumer<Disposable>() {

        @Override
        public void accept(Disposable t) throws Exception {
            System.out.println("--> accept onSubscribeComsumer ");
        }
    };

    // 1. 进行订阅,subscribe(Observer)
    observable.subscribe(observer);

    System.out.println("---------------------------------------------");
    // 2. 进行订阅,subscribe(Consumer onNext)
    observable.subscribe(nextConsumer);

    System.out.println("---------------------------------------------");
    // 3. 进行订阅,subscribe(Consumer onNext, Consumer onError)
    observable.subscribe(nextConsumer, errorConsumer);

    System.out.println("---------------------------------------------");
    // 4. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted)
    observable.subscribe(nextConsumer, errorConsumer, completedAction);

    System.out.println("---------------------------------------------");
    // 5. 进行订阅,subscribe(Consumer onNext, Consumer onError, Action onCompleted, Consumer onSubscribe)
    observable.subscribe(nextConsumer, errorConsumer, completedAction, onSubscribeComsumer);

输出:

--> onSubscribe
--> onNext = Hello
--> onNext = World
--> onComplete
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
---------------------------------------------
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction
---------------------------------------------
--> accept onSubscribeComsumer
--> accept nextConsumer: Hello
--> accept nextConsumer: World
--> run completedAction

注意:create 方法默认不在任何特定的调度器上执行。

2. Defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable.

Defer 操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个 Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个 Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。

实例代码:

    // 创建一个Defer类型的Observable
    Observable<Integer> deferObservable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
        public ObservableSource<? extends Integer> call() throws Exception {
            // 创建每个观察者订阅所返回的 Observable
            Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {

                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onNext(4);
                    emitter.onNext(5);
                    emitter.onComplete();
                }
            });
            return observable;
        }
    });

    // 创建第一个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {

        public void accept(Integer t) throws Exception {
            System.out.println("No.1 --> accept = " + t);
        }
    });

    // 创建第二个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {

        public void accept(Integer t) throws Exception {
            System.out.println("No.2 --> accept = " + t);
        }
    });

    // 创建第三个观察者并订阅defer Observable
    deferObservable.subscribe(new Consumer<Integer>() {

        public void accept(Integer t) throws Exception {
            System.out.println("No.3 --> accept = " + t);
        }
    });

输出:

No.1 --> accept = 1
No.1 --> accept = 2
No.1 --> accept = 3
No.1 --> accept = 4
No.1 --> accept = 5
No.2 --> accept = 1
No.2 --> accept = 2
No.2 --> accept = 3
No.2 --> accept = 4
No.2 --> accept = 5
No.3 --> accept = 1
No.3 --> accept = 2
No.3 --> accept = 3
No.3 --> accept = 4
No.3 --> accept = 5

注意:defer 方法默认不在任何特定的调度器上执行。
Javadoc: defer(Func0)

3. Empty/Never/Error

Empty:创建一个不发射任何数据但是正常终止的Observable
Never:创建一个不发射数据也不终止的Observable
Error:创建一个不发射数据以一个错误终止的Observable

这三个操作符生成的 Observable 行为非常特殊和受限,多用于一些特殊的场景(某些操作状态异常后返回一个error、empty、never 的 Observable)。测试的时候很有用,有时候也用于结合其它的 Observables,或者作为其它需要 Observable 的操作符的参数。

实例代码:

    System.out.println("--> 1 -----------------------------------");
    // 1.  创建一个不发射任何数据但是正常终止的Observable
    Observable.empty()
        .subscribe(new Observer<Object>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

    System.out.println("--> 2 -----------------------------------");
    // 2.  创建一个不输出数据,并且不会终止的Observable
    Observable.never()
        .subscribe(new Observer<Object>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

    System.out.println("--> 3 -----------------------------------");
    // 3.  创建一个不发射数据以一个错误终止的Observable
    Observable.error(new NullPointerException("error test"))
        .subscribe(new Observer<Object>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(Object t) {
                System.out.println("onNext: " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete");
            }
        });

输出:

--> 1 -----------------------------------
onSubscribe
onComplete
--> 2 -----------------------------------
onSubscribe
--> 3 -----------------------------------
onSubscribe
onError: java.lang.NullPointerException: error test

注意

  • RxJava将这些操作符实现为 empty,never和 error。
  • error 操作符需要一 个 Throwable参数,你的Observable会以此终止。
  • 这些操作符默认不在任何特定的调度器上执行,但是 empty 和 error 有一个可选参数是Scheduler,如果你传递了Scheduler参数,它 们会在这个调度器上发送通知.
    Javadoc: empty()
    Javadoc: never()
    Javadoc: error(java.lang.Throwable)

4. Just

创建一个发射指定值的Observable。

Just 将单个数据转换为发射那个数据的Observable。类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。

注意: 如果你传递 nullJust,它会返回一个发射 null 值的 Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。

实例代码:

    // 单个对象发送
    Observable.just(1)
            .subscribe(new Consumer<Integer>() {

                public void accept(Integer t) throws Exception {
                    System.out.println("--> singe accept: " + t);
                }
            });

    System.out.println("---------------------------------");
    // 多个对象发送,内部实际使用from实现 (接受一至九个参数,返回一个按参数列表顺序发射这些数据的Observable)
    Observable.just(1, 2, 3, 4, 5)
            .subscribe(new Consumer<Integer>() {

                public void accept(Integer t) throws Exception {
                    System.out.println("--> mutil accept: " + t);
                }
            });

输出:

--> singe accept: 1
---------------------------------
--> mutil accept: 1
--> mutil accept: 2
--> mutil accept: 3
--> mutil accept: 4
--> mutil accept: 5

Javadoc: just(item ...)

5. From

将其它种类的对象和数据类型转换为Observable,发射来自对应数据源数据类型的数据,在RxJava中,from 操作符可以转换 FutureIterable数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。

实例代码:

        // 初始化数据
        Integer[] array = { 1, 2, 3, 4, 5, 6 };
        List<String> iterable = new ArrayList<String>();
        iterable.add("A");
        iterable.add("B");
        iterable.add("C");
        iterable.add("D");
        iterable.add("E");

        // 1. fromArray
        Observable.fromArray(array).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept(1):fromArray: " + t);
            }
        });

        System.out.println("---------------------------------------");
        // 2. fromIterable
        Observable.fromIterable(iterable)
            .subscribe(new Consumer<String>() {

                @Override
                public void accept(String t) throws Exception {
                    System.out.println("--> accept(2) fromIterable: " + t);
                }
            });

        System.out.println("---------------------------------------");
        // 3. fromCallable
        Observable.fromCallable(new Callable<Integer>() {

            @Override
            public Integer call() throws Exception {
                return 1;
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept(3): fromCallable: " + t);
            }
        });

        System.out.println("---------------------------------------");
        // 4. fromFuture
        Observable.fromFuture(new Future<String>() {

            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return false;
            }

            @Override
            public String get() throws InterruptedException, ExecutionException {
                System.out.println("--> fromFutrue: get()");
                return "hello";
            }

            @Override
            public boolean isCancelled() {
                return false;
            }

            @Override
            public boolean isDone() {
                return false;
            }

            @Override
            public String get(long timeout, TimeUnit unit)
                    throws InterruptedException, ExecutionException, TimeoutException {
                return null;
            }
        }).subscribe(new Consumer<String>() {

            @Override
            public void accept(String t) throws Exception {
                System.out.println("--> accept(4): fromFuture: " + t);
            }
        });

输出:

--> accept(1):fromArray: 1
--> accept(1):fromArray: 2
--> accept(1):fromArray: 3
--> accept(1):fromArray: 4
--> accept(1):fromArray: 5
--> accept(1):fromArray: 6
---------------------------------------
--> accept(2) fromIterable: A
--> accept(2) fromIterable: B
--> accept(2) fromIterable: C
--> accept(2) fromIterable: D
--> accept(2) fromIterable: E
---------------------------------------
--> accept(3): fromCallable: 1
---------------------------------------
--> fromFutrue: get()
--> accept(4): fromFuture: hello

注意:from默认不在任何特定的调度器上执行。然而你可以将Scheduler作为可选的第二个参数传递给Observable,它会在那个调度器上管理这个Future。
Javadoc: from(array)
Javadoc: from(Iterable)
Javadoc: from(Callable)
Javadoc: from(Future)
Javadoc: from(Future,Scheduler)
Javadoc: from(Future,timeout,timeUnit)

6. Repeat

创建一个发射特定数据重复多次的Observable,它不是创建一个Observable,而是重复发射原始 Observable的数据序列,这个序列或者是无限的,或者通过 repeat(n) 指定重复次数。

实例代码:

    // 1. repeat(): 一直重复发射原始 Observable的数据序列
    Observable.range(1, 5)
            .repeat()
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            });

    System.out.println("----------------------------------------");
    // 2. repeat(n): 重复执行5次
    Observable.range(1, 2)
            .repeat(3)
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
......
----------------------------------------
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2
--> accept(2): 1
--> accept(2): 2

注意: repeat 操作符默认在 trampoline 调度器上执行。有一个变体可以通过可选参数指定 Scheduler。
Javadoc: repeat()
Javadoc: repeat(long)
Javadoc: repeat(Scheduler)
Javadoc: repeat(long,Scheduler)

7. RepeatWhen

repeatWhen的操作符,它不是缓存和重放原始 Observable 的数据序列,接收到原始 Observable 终止通知后,有条件的决定是否重新订阅原来的 Observable 。

将原始 Observable 的终止通知(完成或错误)当做一个 void 数据传递给一个通知处理器,它以此来决定是否要重新订阅和发射原来的 Observable。这个通知处理器就像一个 Observable 操作符,接受一个发射 void通知的 Observable为输入,返回一个发射 void 数据(意思是,重新订阅和发射原始 Observable)或者直接终止(意思是,使用 repeatWhen 终止发射数据)的 Observable。

实例代码:

    // repeatWhen(Func1()):接收到终止通知后,在函数中决定是否重新订阅原来的Observable
    // 需要注意的是repeatWhen的objectObservable处理(也可以单独自定义Observable返回),这里使用flathMap进行处理,
    // 让它延时发出onNext,这里onNext发出什么数据都不重要,它只是仅仅用来处理重订阅的通知,如果发出的是onComplete/onError,则不会触发重订阅
    Observable.range(1, 2)
            .doOnComplete(new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("-----------> 完成一次订阅");
                }
            }).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                private int n = 0;

                @Override
                public ObservableSource<?> apply(Observable<Object> t) throws Exception {
                    // 接收到原始Observable的终止通知,决定是否重新订阅
                    System.out.println("--> apply repeat ");
                    return t.flatMap(new Function<Object, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Object t) throws Exception {
                            if(n < 3) { // 重新订阅3次
                                n ++;
                                return Observable.just(0);
                            } else {
                                return Observable.empty();
                            }
                        }
                    });
                    // return Observable.timer(1, TimeUnit.SECONDS);        // 间隔一秒后重新订阅一次
                    // return Observable.interval(1, TimeUnit.SECONDS); // 每间隔一秒重新订阅一次
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> apply repeat
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
--> accept: 1
--> accept: 2
-----------> 完成一次订阅

注意:repeatWhen操作符默认在 trampoline 调度器上执行。

Javadoc: repeatWhen(Func1)

8. RepeatUntil

根据条件(函数BooleanSupplier)判断是否需要继续订阅: false:继续订阅; true:取消订阅

实例代码:

    // repeatUntil 根据条件(BooleanSupplier)判断是否需要继续订阅
    Observable.range(1, 2)
            .doOnComplete(new Action() {

                @Override
                public void run() throws Exception {
                    System.out.println("-----------> 完成一次订阅");
                }
            }).repeatUntil(new BooleanSupplier() {

                private int n = 0;

                @Override
                public boolean getAsBoolean() throws Exception {
                    System.out.println("getAsBoolean = " + (n < 3? false:true) );
                    // 是否需要终止
                    if (n < 3) {
                        n++;
                        return false;   // 继续重新订阅
                    }
                    return true;        // 终止重新订阅
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = false
--> accept: 1
--> accept: 2
-----------> 完成一次订阅
getAsBoolean = true

Javadoc: repeatWhen(Func1)

9. Range

创建一个发射特定整数序列的Observable。

Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。

RxJava将这个操作符实现为 range 函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置 为负数,会抛异常)。

实例代码:

    // 1. range(n,m) 发射从n开始的m个整数序列,序列区间[n,n+m-1)
    Observable.range(0, 5)
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("-- accept(range): " + t);
                }
            });

    System.out.println("------------------------------");
    // 2. rangeLong(n,m) 发射从n开始的m个长整型序列,序列区间[n,n+m-1)
    Observable.rangeLong(1, 5)
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("-- accept(rangeLong): " + t);
                }
            });

输出:

-- accept(range): 0
-- accept(range): 1
-- accept(range): 2
-- accept(range): 3
-- accept(range): 4
------------------------------
-- accept(rangeLong): 1
-- accept(rangeLong): 2
-- accept(rangeLong): 3
-- accept(rangeLong): 4
-- accept(rangeLong): 5

Javadoc: range(int start,int count)
Javadoc: rangeLong(long start, long count)

10. interval

创建一个按固定时间间隔发射整数序列的Observable,它按固定的时间间隔发射一个无限递增的整数序列。
RxJava将这个操作符实现为 interval 方法。它接受一个表示时间间隔的参数和一个表示时间单位的参数。

实例代码:

    // [1] interval(long period, TimeUnit unit)
    // 每间隔period时间单位,发射一次整数序列
    Observable.interval(1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long l) throws Exception {
                    System.out.println("--> accept(1): " + l);
                }
            });

    System.out.println("------------------------------------");
    // [2] interval(long initialDelay, long period, TimeUnit unit)
    // 在延迟initialDelay秒后每隔period时间单位发射一个整数序列
    Observable.interval(0, 1, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            });

    System.out.println("------------------------------------");
    // [3] intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    // 延迟initialDelay秒后从起始数据start开始,每隔period秒发送一个数字序列,一共发送count个数据
    Observable.intervalRange(1, 5, 3, 2, TimeUnit.SECONDS)
            .subscribe(new Consumer<Long>() {

                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            });

注意:interval 默认在 computation 调度器上执行, 有一个变体可以通过可选参数指定 Scheduler。
Javadoc: interval(long period, TimeUnit unit)
Javadoc: interval(long period, TimeUnit unit, Scheduler scheduler)
Javadoc: interval(long initialDelay, long period, TimeUnit unit)
Javadoc: interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
Javadoc: intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

输出:

--> accept(1): 0
--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
...
------------------------------------
--> accept(2): 0
--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
...
------------------------------------
--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5

11. Timer

创建一个给定的延迟后发射一个特殊的值的Observable。

RxJava将这个操作符实现为 timer 函数。timer 返回一个Observable,它在延迟一段给定的时间后发射一个简单的数字0

实例代码:

    // timer(long delay, TimeUnit unit, Scheduler scheduler)
    // 定时delay时间 单位后发送数字0,指定可选参数Schedule调度器为trampoline(当前线程排队执行)
    Observable.timer(5, TimeUnit.SECONDS, Schedulers.trampoline())
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept: " + t);
                }
            });

输出:

--> accept: 0

注意:timer 操作符默认在 computation 调度器上执行。有一个变体可以通过可选参数指定 Scheduler。
Javadoc: timer(long delay, TimeUnit unit)
Javadoc: timer(long delay, TimeUnit unit, Scheduler scheduler)

小结

根据实际情况,使用不同的方式创建不同种类的Observable,这个在开发中非常有用,可以减少很多重复、复杂、冗余的操作,可以快速的创建一个符合要求的Observable,一定程度上提高了开发的效率。

提示:以上使用的Rxjava2版本: 2.2.12

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

原文地址:https://www.cnblogs.com/jiangming-blogs/p/12127246.html

时间: 2024-10-29 01:07:35

Rxjava2 Observable的创建详解及实例的相关文章

Rxjava2 Observable的辅助操作详解及实例(二)

目录 8. TimeInterval 9. Timeout 9.1 timeout(timeout, timeUnit) 9.2 timeout(timeout, timeUnit, scheduler, other) 9.3 timeout(Function itemTimeoutIndicator, ObservableSource other) 10. Timestamp 11. Using 12. To 小结 接续上篇: Rxjava2 Observable的辅助操作详解及实例(一) 8

Rxjava2 Observable的辅助操作详解及实例(一)

目录 简要: 1. Delay 2. Do 3. SubscribeOn 4. ObserverOn 5. Serialize 6. Materialize 7. Dematerialize 接续: 简要: 需求了解: Rxjava中有一些方便的辅助操作符,来更方便我们的函数式的编程.比如延迟.定时.指定操作的监听.数据类型转换等一系列的操作. 下面列出了一些用于Observable的辅助操作符: Delay:延时发射Observable的结果. Do:注册一个动作作为原始Observable生

Rxjava2 Observable的数据变换详解及实例(二)

目录 1. Window 1.1 window(closingSelector) 1.2 window(openingIndicator, closingIndicator) 1.3 window(count) 1.4 window(count, skip) 1.5 window(timespan, TimeUnit) 1.6 window(timespan, TimeUnit, count) 1.7 window(timespan, timeskip, TimeUnit) 2. GroupBy

Rxjava2 Observable的数据变换详解及实例(一)

目录 简要: 1.1 buffer(count) 1.2 buffer(boundary) 1.3 buffer(count, skip) 1.4 buffer(timespan, TimeUnit) 1.5 buffer(timespan, TimeUnit, count) 1.6 buffer(timespan, timeskip, TimeUnit) 1.7 buffer(bufferClosingSelector) 2. Map 3. FlatMap 3.1 flatMap(mapper

Rxjava2 Observable的数据过滤详解及实例(一)

目录 简要: 1. Debounce 1.1 debounce(timeout, unit) 1.2 debounce(debounceSelector) 2. Throttle 2.1 throttleFirst(windowDuration, unit) 2.2 throttleLast(intervalDuration, unit) 2.3 throttleWithTimeout(timeout, unit) 3. Sample 3.1 sample(period, unit) 3.2 s

Rxjava2 Observable的数据过滤详解及实例(二)

目录 6. Filter 7. Frist 7.1 firstElement() 7.2 first(defaultItem) 7.3 firstOrError() 8. Single 8.1 singleElement() 8.2 single(defaultItem) 8.3 singleOrError() 9. ElementAt 9.1 elementAt(index) 9.2 elementAt(index, defaultItem) 9.3 elementAtOrError(inde

RxJava2.0的使用详解

RxJava2.0的使用详解 1,初识RxJava RxJava就是一种用Java语言实现的响应式编程,来创建基于事件的异步程序 RxJava是一个基于事件订阅的异步执行的一个类库,目前比较火的一些技术框架! 参考资料: Github上RxJava的项目地址: https://github.com/ReactiveX/RxJava 技术文档Api: http://reactivex.io/RxJava/javadoc/ RxAndroid,用于 Android 开发: https://githu

红帽Linux故障定位技术详解与实例(3)

红帽Linux故障定位技术详解与实例(3) 在线故障定位就是在故障发生时, 故障所处的操作系统环境仍然可以访问,故障处理人员可通过console, ssh等方式登录到操作系统上,在shell上执行各种操作命令或测试程序的方式对故障环境进行观察,分析,测试,以定位出故障发生的原因. AD:2014WOT全球软件技术峰会北京站 课程视频发布 5.用kdump工具内核故障定位实例 A) 部署Kdump 部署 kdump 收集故障信息的步骤如下: (1)设置好相关的内核启动参数 在 /boot/grub

CvMat、Mat、IplImage之间的转换详解及实例

见原博客:http://blog.sina.com.cn/s/blog_74a459380101obhm.html OpenCV学习之CvMat的用法详解及实例 CvMat是OpenCV比较基础的函数.初学者应该掌握并熟练应用.但是我认为计算机专业学习的方法是,不断的总结并且提炼,同时还要做大量的实践,如编码,才能记忆深刻,体会深刻,从而引导自己想更高层次迈进. 1.初始化矩阵: 方式一.逐点赋值式: CvMat* mat = cvCreateMat( 2, 2, CV_64FC1 ); cvZ