认识一下Rxjava

Rxjava

一.为啥要用它?她能干啥?

  1. 网络请求等耗时操作必须放到子线程。线程切换
  2. reactive 式编程。代码更加健壮

    使用观察者模式

    创建:Rx可以方便的创建事件流和数据流

    组合:Rx使用查询式的操作符组合和变换数据流

    监听:Rx可以订阅任何可观察的数据流并执行操作

    简化代码

    函数式风格:对可观察数据流使用无副作用的输入输出函数,避免了程序里错综复杂的状态

    简化代码:Rx的操作符通通常可以将复杂的难题简化为很少的几行代码

    异步错误处理:传统的try/catch没办法处理异步计算,Rx提供了合适的错误处理机制

    轻松使用并发:Rx的Observables和Schedulers让开发者可以摆脱底层的线程同步和各种并发问题

二.概念

  1. Observable :可观察对象,一旦数据产生或发生变化,会通过某种方式通知观察者(或订阅者)。Single 是它的特殊实现。
  2. 操作符 :对Observable 发射的数据项进行操作。
  3. Observer :观察者对象,监听Observable发射的数据项并做出响应,Subscriber是它的一个特殊实现。

NOTE: 真正强大的地方在于它的操作符,操作符让你可以变换、组合、操纵和处理Observable发射的数据。

三.操作符

“reactive”-某个问题已经发生时或之后执行某些操作。

大多数操作符处理一个Observable并返回一个Observable。这样你就可以在一个链条上一个接着一个地使用这些操作符。在这个链条上的每一个操作符可以“加工”前一个操作符处理后产生的Observable。

一个链条上的Observable操作符并不是孤立地处理产生这个链条的事件源Observable,而是轮流地处理。每一个操作符处理的是紧挨的上一个操作符产生的Observable。

创建操作符

create、just、from、defer、range、interval、timer、empty、never

create:通过调用observabler的一系列方法从零开始组装一个Observable。

from:将一些对象或其他数据结构(集合、Future)转化为Observable。

defer:只有当订阅者订阅才创建Observable;为每个订阅创建一个新的Observable。

empty/never/error:创建一个什么都不做直接通知完成/不发送任何元素/什么都不做直接通知错误的Observable。

just 类似于from,但是from会将数组或Iterable的元素取出然后逐个发射,而just只是简单的原样发射,将数组或Iterable当做单个数据。

注意:如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用empty操作符。

  • create(Observable.OnSubscribe f)
Observable.create(new Observable.OnSubscribe<PersonObj>() {
            @Override
            public void call(Subscriber<? super PersonObj> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        ArrayList<PersonObj> dataList = getDataList();
                        int size = dataList.size();
                        System.out.println("dataListSize = [" + size + "]");
                        for (int i = 0; i < size; i++) {
                            subscriber.onNext(dataList.get(i));
                            //if(i==3)subscriber.onCompleted();
                        }
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribe(new Subscriber<PersonObj>() {
            @Override
            public void onCompleted() {
                System.out.println("completed");
            }
            @Override
            public void onError(Throwable e) {
                System.out.println("e = [" + e.getMessage() + "]");
            }
            @Override
            public void onNext(PersonObj personObj) {
                System.out.println("personObj = [" + personObj + "]");
            }
        });
  • `defer(rx.functions.Func0<Observable<T>> observableFactory)

    可以延迟发射Observable的数据项,直到观察者Observer订阅了这个Observable,这样可以使一个Observer获取最新的数据。

public class PersonObj {
    ....
    public Observable<String> getAddressObservable() {
//      return Observable.just(address);
        return Observable.defer(() -> Observable.just(address));
    }
    ...
}

改变这个人的地址后

public void defer() {
        PersonObj personObj = new PersonObj("hefei", 19);
        Observable<String> addressObservable = personObj.getAddressObservable();
        personObj.setAddress("beijing");
        addressObservable.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("s = [" + s + "]");
            }
        });
    }
  • from vs just :逐个发射 from(@NotNull java.lang.Iterable<? extends T> iterable);整体发射 just(T value...)
    public void justVsFrom() {
        ArrayList<String> strings = new ArrayList<>();
        strings.add("12");
        strings.add("13");
        strings.add("14");
     // Observable.from(strings)
        Observable.just(strings)
                .subscribe(new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        System.out.println("o = [" + o + "]");
                    }
                });
/*        Observable.just(null)
                .subscribe(o -> {
                    System.out.println("o = [" + o + "]");
                });*/
    }
  • timer vs interval vs range

    看一下下面的结果:

//      Observable.timer(1, TimeUnit.SECONDS)
//      Observable.interval(1, TimeUnit.SECONDS)
        Observable.range(1, 60)
                .subscribe(aLong -> {
                    System.out.println("aLong = [" + aLong + "]");
                });

下面是一个获取验证码的倒计时例子:

    public static final int DELAY_SECONDS = 60;
    private int timer = 60;
    ...
    private void startTimer() {
        Subscription s = Observable.timer(1, TimeUnit.SECONDS) //延迟1s
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .repeat(DELAY_SECONDS)
                .subscribe(aLong -> {
                    timer--;
                    if (timer > 0) {
                        btnSend.setText("重新发送(" + timer + "s)");
                    } else {
                        timer = DELAY_SECONDS;
                        btnSend.setText("重新发送");
                        btnSend.setEnabled(true);
                    }
                });
        addSubscription(s);
    }

变换操作符

map、buffer、window、flatMap、groupBy

  • map(?) — map(rx.functions.Func1<? super T, ? extends R> func) 对Observable发射的每一个数据项,都应用一个函数来变换。
   Observable.from(getDataList())
        //.map(PersonObj::getName)
        //入参和出参。上一个操作符传递给它的类型,它传递给下一个操作符的类型
                .map(new Func1<PersonObj, String>() {
                    @Override
                    public String call(PersonObj personObj) {
                        //进行数据处理,返回给下一个操作符的相应类型的数据项
                        return personObj.getName();
                    }
                })
                .subscribe(s -> {
                    System.out.println("name = [" + s + "]");
                });
  • buffer() —Observable<List<T>> buffer(long timespan, TimeUnit unit,int count)(有多个重载方法) 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个。
  Observable.from(getDataList())//20个
                .buffer(4)//每次打包的数据项的最大个数
                .subscribe(new Action1<List<PersonObj>>() {
                    @Override
                    public void call(List<PersonObj> personObjs) {
                        System.out.println("personObjs = [" + personObjs + "]");//打印5次
                    }
                });
  • window() — 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。
 Observable.from(getDataList())//20个
                .window(4)//每个Observable里发射的数据项最大个数
                .subscribe(new Action1<Observable<PersonObj>>() {
                    @Override
                    public void call(Observable<PersonObj> personObjObservable) {
                        System.out.println("personObjObservable = [" + personObjObservable + "]");//5个Observable
                        personObjObservable.subscribe(new Action1<PersonObj>() {
                            @Override
                            public void call(PersonObj personObj) {
                            //每个Observable里发射的数据项
                                System.out.println("personObj = [" + personObj + "]");
                            }
                        });
                    }
                });
  • flatMap — 将接收到的数据项变换为Observable。

    1.flatMap(rx.functions.Func1<? super T, ? extends Observable<? extends R>> func)

 Observable.from(getDataList())
                .flatMap(new Func1<PersonObj, Observable<String>>() {
                    @Override
                    public Observable<String> call(PersonObj personObj) {
                    //将接收到的数据项转换为一个Observable,返回给下一个操作符
                        return personObj.getAddressObservable();
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        System.out.println("s = [" + s + "]");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("throwable = [" + throwable.getMessage() + "]");
                    }
                });

2.flatMap(Func1<? super T, ? extends Observable<? extends U>> collectionSelector,Func2<? super T, ? super U, ? extends R> resultSelector)

 Observable.from(getDataList())
                .flatMap(new Func1<PersonObj, Observable<String>>() {
                             @Override
                             public Observable<String> call(PersonObj personObj) {
                                 return personObj.getAddressObservable();
                             }
                         },
                        //接收到的数据项的类型、Fuc1处理后的Observable里的数据类型、要发射给下一个操作的数据类型
                        new Func2<PersonObj, String, Integer>() {

                            @Override
                            public Integer call(PersonObj personObj, String str) {
                             //personObj,是flatMap接收到的数据项,str是Func1处理后的Observable里发射的数据项
                                System.out.println("personObj = [" + personObj + "], s = [" + str + "]");
                                return personObj.getAge();
                            }
                        })
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer s) {
                        System.out.println("s = [" + s + "]");
                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {
                        System.out.println("throwable = [" + throwable.getMessage() + "]");
                    }
                });

flatMap将接收到数据项进行映射转换时,任何一个数据项发生异常时,这个Observable将立刻被放弃,并调用观察者的onError方法。

  • concatMap —— flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。为了防止交错的发生,可以使用与之类似的concatMap()操作符。
  • groupBy —— 将原始Observable按照指定的key进行分拆。它们中的每一个发射原始Observable数据序列的一个子序列。哪个数据项由哪一个Observable发射是由一个函数判定的,这个函数给每一组指定一个Key,Key相同的数据会被同一个Observable发射。

    1.groupBy(Func1<? super T, ? extends K> keySelector)

        Observable.from(getBookDataList())
                .groupBy(new Func1<BookObj, Integer>() {
                    @Override
                    public Integer call(BookObj bookObj) {
                        return bookObj.getType();
                    }
                })
                .subscribe(new Action1<GroupedObservable<Integer, BookObj>>() {
                    @Override
                    public void call(GroupedObservable<Integer, BookObj> groupedObservable) {
                        System.out.println("groupedObservable = [" + groupedObservable + "]");
                        Integer key = groupedObservable.getKey();
                        if (key == BOOK_TYPE_CIRCLR) {
                            setupCircleBooks(groupedObservable);
                        } else {
                            setupTimeBooks(groupedObservable);
                        }
                    }
                });

2.groupBy(Func1<? super T, ? extends K> keySelector,Func1<? super T, ? extends R> elementSelector)

        Observable.from(getBookDataList())
                .groupBy(new Func1<BookObj, Integer>() {
                             @Override
                             public Integer call(BookObj bookObj) {
                                 return bookObj.getType();//分组的依据
                             }
                         },
                        new Func1<BookObj, String>() {//类似于map的一个变换
                            @Override
                            public String call(BookObj bookObj) {
                                return bookObj.getName();//在这里可以对接收到的数据项进行处理。
                            }
                        })
                .subscribe(new Action1<GroupedObservable<Integer, String>>() {
                    @Override
                    public void call(GroupedObservable<Integer, String> groupedObservable) {
                        //....
                    }
                });

注意:groupBy将原始Observable分解为一个发射多个GroupedObservable的Observable,一旦有订阅,每个GroupedObservable就开始缓存数据。因此,如果你忽略这些GroupedObservable中的任何一个,这个缓存可能形成一个潜在的内存泄露。因此,如果你不想观察,也不要忽略GroupedObservable。你应该使用像take(0)这样会丢弃自己的缓存的操作符。

结合操作符

startWith、merge、concatWith、zip、join

  • startWith 在发射数据之前先发射一个指定的数据序列 observableA.startWith(observableB):
  • concatWith 发射的数据末尾追加一个数据序列observableA.concatWith(observableB):
  • merge 将多个Observables的输出合并,就好像它们是一个单个的Observable一样observableA.merge(observableB):
  • zipWith 使用一个函数组合多个Observable发射的数据集合,然后再发射这个结果。

    1.zipWith(Iterable<? extends T2> other,Func2<? super T, ? super T2, ? extends R> zipFunction)

    2.zipWith(Observable<? extends T2> other,Func2<? super T, ? super T2, ? extends R> zipFunction)

Observable.from(getDataList())//ObservableA
                .zipWith(
                        Observable.from(getBookDataList()),//ObservableB
                        //ObservableA发射的数据项类型,ObservableB发射的数据项的类型、返回给下一操作符的数据类型
                        new Func2<PersonObj, BookObj, PersonObj>() {
                            @Override
                            public PersonObj call(PersonObj personObj, BookObj bookObj) {
                            //对ObservableA和ObservableB发射的数据项进行处理
                                personObj.setBookObj(bookObj);
                                return personObj;//给下一个操作符处理的数据项
                            }
                        })
                .subscribe(new Action1<PersonObj>() {
                    @Override
                    public void call(PersonObj personObj) {
                        System.out.println("s = [" + personObj + "]");
                    }
                });

其他类别的操作符:

过滤 filter、 debounce

条件 contains、all

连接 connect、publish、refCount、replay

转换 toList、toMap

阻塞 forEach、first

字符串操作 byLine/decode/encode/from/join/split/stringConcat

算术操作 average/concat/count/max/min/sum/reduce

错误处理 catch/retry/retryWhen

Subject

class Subject<T, R> extends Observable<R> implements Observer<T>

Represents an object that is both an Observable and an Observer.

AsyncSubject、BehaviorSubject、PublishSubject、ReplaySubject

  1. AsyncSubject 数据源观察者* 发射完成之后 *,仅向它的每个订阅者发射最后一个数据项。
        AsyncSubject<Object> asyncSubject = AsyncSubject.create();
        asyncSubject.subscribe(//这个订阅方法也有多个重载方法
                new Action1<Object>() {
                    @Override
                    public void call(Object o) {
                        System.out.println("o = [" + o + "]");//只打印一个`时光`
                    }
                },
                throwable -> {
                    System.out.println(throwable.getMessage());
                });
        asyncSubject.onNext(12);
        Model model = new Model();
        model.age = 18;
        model.name = "江一燕";
        asyncSubject.onNext(model);
        asyncSubject.onNext("时光");
        asyncSubject.onCompleted();//completed后,订阅者会接收到最后一个发射的数据
  1. BehaviorSubject 订阅者可以接收到它订阅时BehaviorSubject发射的最近的一个和订阅后BehaviorSubject发射的所有数据项。
  // observer will receive all events.
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.subscribe(observer);
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive only onCompleted
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.onCompleted();
  subject.subscribe(observer);

  // observer will receive only onError
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.onError(new RuntimeException("error"));
  subject.subscribe(observer);
  1. PublishSubject 从那里订阅就从那里开始接收数据。
 PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onCompleted events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");
  // observer2 will only receive "three" and onCompleted
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onCompleted();
  1. ReplaySubject 无论何时订阅,都会接收到PublishSubject 发射过的所有数据。
ReplaySubject<Object> subject = ReplaySubject.create();
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");
  subject.onCompleted();
  // both of the following will get the onNext/onCompleted calls from above
  subject.subscribe(observer1);
  subject.subscribe(observer2);
        ReplaySubject<Object> replaySubject = ReplaySubject.create();
        replaySubject.onNext("one");
        replaySubject.onNext("two");
        replaySubject.onNext("three");
        replaySubject.subscribe(new Subscriber<Object>() {
            @Override
            public void onCompleted() {
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onNext(Object o) {
                System.out.println("o = [" + o + "]");
            }
        });
//        replaySubject.onCompleted();//注意这个的位置
          replaySubject.onNext("four");
          replaySubject.subscribe(observer1);
          replaySubject.subscribe(observer2);

调度器 Scheduler

Schedulers.computation(?) 用于计算任务,如事件循环或和回调处理,不要用于IO操作,IO操作请使用Schedulers.io());默认线程数等于处理器的数量 。



Schedulers.from(executor) 使用指定的Executor作为调度器。



Schedulers.immediate(?) 在当前线程立即开始执行任务。



Schedulers.io(?) 用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用 Schedulers.computation();Schedulers.io(?)默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器。



Schedulers.newThread(?) 为每个任务创建一个新线程。



Schedulers.trampoline(?) 当其它排队的任务完成后,在当前线程排队开始执行 。



除了将这些调度器传递给RxJava的Observable操作符,也可以用它们调度你自己的任务。

  • 调度自己的任务

    下面的示例展示了Scheduler.Worker的用法:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        yourWork();
    }
});
// some time later...
worker.unsubscribe();
  • 递归调度器

要调度递归的方法调用,可以使用schedule,然后再用schedule(this),示例:

worker = Schedulers.newThread().createWorker();
worker.schedule(new Action0() {
    @Override
    public void call() {
        yourWork();
        // recurse until unsubscribed (schedule will do nothing if unsubscribed)
        worker.schedule(this);
    }
});
// some time later...
worker.unsubscribe();
  • 检查或设置取消订阅状态

Worker类的对象实现了Subscription接口,使用它的isUnsubscribed和unsubscribe方法,所以你可以在订阅取消时停止任务,或者从正在调度的任务内部取消订阅,示例:

Worker worker = Schedulers.newThread().createWorker();
Subscription mySubscription = worker.schedule(new Action0() {
    @Override
    public void call() {
        while(!worker.isUnsubscribed()) {
            status = yourWork();
            if(QUIT == status) { worker.unsubscribe(); }
        }
    }
});

Worker同时是Subscription,因此你可以(通常也应该)调用它的unsubscribe方法通知可以挂起任务和释放资源了。

  • 延时和周期调度器

你可以使用schedule(action,delayTime,timeUnit)在指定的调度器上延时执行你的任务,下面例子中的任务将在500毫秒之后开始执行:

someScheduler.schedule(someAction, 500, TimeUnit.MILLISECONDS);

使用另一个版本的schedule,schedulePeriodically(action,initialDelay,period,timeUnit)方法让你可以安排一个定期执行的任务,下面例子的任务将在500毫秒之后执行,然后每250毫秒执行一次:

someScheduler.schedulePeriodically(someAction, 500, 250, TimeUnit.MILLISECONDS);

  • 测试调度器

TestScheduler让你可以对调度器的时钟表现进行手动微调。这对依赖精确时间安排的任务的测试很有用处。这个调度器有三个额外的方法:

advanceTimeTo(time,unit) 向前波动调度器的时钟到一个指定的时间点

advanceTimeBy(time,unit) 将调度器的时钟向前拨动一个指定的时间段

triggerActions(?) 开始执行任何计划中的但是未启动的任务,如果它们的计划时间等于或者早于调度器时钟的当前时间

线程变换

  • 在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread())

subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。

observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

让UI线程与工作线程间的跳转变得简单

NOTE:

subscribeOn() 的位置放在哪里都可以,但它是只能调用一次的。

一个事件流应该只有一个subscribeOn()起作用;可以有多个observeOn().

compose: 对 Observable 整体的变换。 compose() 是针对 Observable 自身进行变换。举个例子,假设在程序中有多个 Observable ,并且他们都需要应用一组相同的 lift() 变换。

  • doOnSubscribe()

    它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程。

Observable.create(onSubscribe)
    .subscribeOn(Schedulers.io())
    .doOnSubscribe(new Action0() {
        @Override
        public void call() {
            progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行
        }
    })
    .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(subscriber);

Backpressure

生产者-消费者(producer-consumer)模式中,一定要小心Backpressure(背压,反压)的出现。一个宽泛的解释就是:事件产生的速度比消费快。一旦发生overproducing,当你的链式结构不能承受数据压力的时候,就会抛出MissingBackpressureException异常。 在Android中最容易出现的Backpressure就是连续快速点击跳转界面、数据库查询、键盘输入,甚至联网等操作都有可能造成Backpressure,可能有些情况并不会导致程序崩溃,但是会造成一些我们不想见到的小麻烦。


https://www.gitbook.com/book/mcxiaoke/rxdocs/details

https://github.com/ReactiveX/RxJava/wiki

http://gank.io/post/560e15be2dca930e00da1083

http://www.open-open.com/lib/view/open1452698649605.html

时间: 2024-10-09 08:16:22

认识一下Rxjava的相关文章

一起来造一个RxJava,揭秘RxJava的实现原理

RxJava是一个神奇的框架,用法很简单,但内部实现有点复杂,代码逻辑有点绕.我读源码时,确实有点似懂非懂的感觉.网上关于RxJava源码分析的文章,源码贴了一大堆,代码逻辑绕来绕去的,让人看得云里雾里的.既然用拆轮子的方式来分析源码比较难啃,不如换种方式,以造轮子的方式,将源码中与性能.兼容性.扩展性有关的代码剔除,留下核心代码带大家揭秘RxJava的实现原理. 什么是响应式编程 首先,我们需要明确,RxJava是Reactive Programming在Java中的一种实现.什么是响应式编程

RxJava概叙

给Android开发者的 RxJava 详解:http://gank.io/post/560e15be2dca930e00da1083 响应式编程是一种异步数据流交互的编程范式,而RxJava就是基于事件操作异步数据流在Java上实现的库 核心的理念是将一切都当做数据流来看待,各种变量,用户输入,数据结构,缓存等等 而Rx库提供了高度抽象的函数来操作流,创建.流入流出.过滤.合并.映射等等各种变换 不仅如此,Rx库还使得异步操作,和错误处理变得非常简洁. 使用了RxJava后明显的好处就是 1解

RxJava从入门到放弃---关于RxJava-入门必看

RxJava 到底是什么 RxJava 好在哪 API 介绍和原理简析 1. 概念:扩展的观察者模式 观察者模式 RxJava 的观察者模式 2. 基本实现 1) 创建 Observer 2) 创建 Observable 3) Subscribe (订阅) 4) 场景示例 a. 打印字符串数组 b. 由 id 取得图片并显示 3. 线程控制 -- Scheduler (一) 1) Scheduler 的 API (一) 2) Scheduler 的原理 (一) 4. 变换 1) API 2) 变

Rxjava+ReTrofit+okHttp深入浅出-终极封装

Rxjava+ReTrofit+okHttp深入浅出-终极封装 背景: 学习Rxjava和retrofit已经很长时间了,功能确实很强大,但是使用起来还是有点复杂,代码的重复性太高,所以决定把基于retrofit和rxjava的处理统一封装起来,实现的功能: 1.Retrofit+Rxjava+okhttp基本使用方法 2.统一处理请求数据格式 3.统一的ProgressDialog和回调Subscriber处理 4.取消http请求 5.预处理http请求 5.返回数据的统一判断 效果: 封装

RxJava开发精要6 – Observables组合

原文出自<RxJava Essentials> 原文作者 : Ivan Morgillo 译文出自 : 开发技术前线 www.devtf.cn 转载声明: 本译文已授权开发者头条享有独家转载权,未经允许,不得转载! 译者 : yuxingxin 项目地址 : RxJava-Essentials-CN 上一章中,我们学到如何转换可观测序列.我们也看到了map(),scan(),groupBY(),以及更多有用的函数的实际例子,它们帮助我们操作Observable来创建我们想要的Observabl

RxJava & RxAndroid备忘

"你问我要去向何方,我指着大海的方向" 今天在刷G+的时候看到Dave Smith推荐了一个视频 <Learning RxJava (for Android) by example> 点进去看了一下,原来是位熟悉的"阿三哥",视频封面如下:(没有歧视的意思,不要喷我啊~,为什么感到熟悉?接着往下看) 几乎同时也看到了JetBrains在G+也推荐了篇在Medium上的博文 <RxAndroid And Kotlin (Part 1)> ,然后

给 Android 开发者的 RxJava 详解

作者:扔物线 前言 我从去年开始使用 RxJava ,到现在一年多了.今年加入了 Flipboard 后,看到 Flipboard 的 Android 项目也在使用 RxJava ,并且使用的场景越来越多 .而最近这几个月,我也发现国内越来越多的人开始提及 RxJava .有人说『RxJava 真是太好用了』,有人说『RxJava 真是太难用了』,另外更多的人表示:我真的百度了也谷歌了,但我还是想问: RxJava 到底是什么? 鉴于 RxJava 目前这种既火爆又神秘的现状,而我又在一年的使用

最适合使用RxJava处理的四种场景

下面我们开始介绍RxJava最适合使用的四种场景,代码示例基于RxJava1 场景一: 单请求异步处理 由于在Android UI线程中不能做一些耗时操作,比如网络请求,大文件保存等,所以在开发中经常会碰到异步处理的情况,我们最典型的使用场景是RxJava+Retrofit处理网络请求 MyService myService = retrofit.create(MyService.class);myService.getSomething() .subscribeOn(Schedulers.io

转:深入浅出RxJava(二:操作符)

原文地址:http://blog.csdn.net/lzyzsd/article/details/44094895#comments 在第一篇blog中,我介绍了RxJava的一些基础知识,同时也介绍了map()操作符.当然如果你并没有意愿去使用RxJava我一点都不诧异,毕竟才接触了这么点.看完这篇blog,我相信你肯定想立即在你的项目中使用RxJava了,这篇blog将介绍许多RxJava中的操作符,RxJava的强大性就来自于它所定义的操作符. 首先先看一个例子: 准备工作 假设我有这样一

RxJava 与 Retrofit

RxJava 与 Retrofit