RxJava 方法备忘录

RxJava 方法

过滤相关

RxJava提供了filter()方法来过滤我们观测序列中不想要的值

当我们不需要整个序列时,而是只想取开头或结尾的几个元素,我们可以用take()或takeLast()

一个可观测序列会在出错时重复发射或者被设计成重复发射。distinct()和distinctUntilChanged()函数可以方便的让我们处理这种重复问题。(它会记录以及发射的值来过滤,所以请注意内存)

first()方法和last()方法很容易弄明白。它们从Observable中只发射第一个元素或者最后一个元素。这两个都可以传Func1作为参数,:一个可以确定我们感兴趣的第一个或者最后一个的谓词:

skip()和skipLast()函数与take()和takeLast()相对应。它们用整数N作参数,从本质上来说,它们不让Observable发射前N个或者后N个值。如果我们知道一个序列以没有太多用的“可控”元素开头或结尾时我们可以使用它。

elementAt()函数仅从一个序列中发射第n个元素,如果怕不存在可以使用elementAtOrDefault()

一个温度传感器,它每秒都会发射当前室内的温度。说实话,我们并不认为温度会变化这么快,我们可以使用一个小的发射间隔。在Observable后面加一个sample(),我们将创建一个新的可观测序列,它将在一个指定的时间间隔里由Observable发射最近一次的数值:

假设我们工作的是一个时效性的环境,我们温度传感器每秒都在发射一个温度值。我们想让它每隔两秒至少发射一个,我们可以使用timeout()函数来监听源可观测序列,就是在我们设定的时间间隔内如果没有得到一个值则发射一个错误。我们可以认为timeout()为一个Observable的限时的副本。如果在指定的时间间隔内Observable不发射值的话,它监听的原始的Observable时就会触发onError()函数。

debounce()函数过滤掉由Observable发射的速率过快的数据;如果在一个指定的时间间隔过去了仍旧没有发射一个,那么它将发射最后的那个。

就像sample()和timeout()函数一样,debounce()使用TimeUnit对象指定时间间隔。

变换相关

RxJava提供了几个mapping函数:map(),flatMap(),concatMap(),flatMapIterable()以及switchMap().所有这些函数都作用于一个可观测序列,然后变换它发射的值,最后用一种新的形式返回它们

在复杂的场景中,我们有一个这样的Observable:它发射一个数据序列,这些数据本身也可以发射Observable。RxJava的flatMap()函数提供一种铺平序列的方式,然后合并这些Observables发射的数据,最后将合并后的结果作为最终的Observable。当我们在处理可能有大量的Observables时,重要是记住任何一个Observables发生错误的情况,flatMap()将会触发它自己的onError()函数并放弃整个链。

重要的一点提示是关于合并部分:它允许交叉,所以flatMap()不能够保证在最终生成的Observable中源Observables确切的发射顺序。

concatMap()函数解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的铺平函数,而不是合并它们,如下图所示:

witchMap()和flatMap()很像,除了一点:每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个

scan()函数可以看做是一个累积函数。scan()函数对原始Observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。

groupBy() 将源Observable变换成一个发射Observables的新的Observable。它们中的每一个新的Observable都发射一组指定的数据。

Buffer() 将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射

window()函数和buffer()很像,但是它发射的是Observable而不是列表,数量由count指定,最后发射一个onCompleted()结束。正如buffer()一样,window()也有一个skip变体

cast()函数是map()操作符的特殊版本。它将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。

组合Observables

merge() 我们有多个来源但是又只想有一个结果:多输入,单输出。RxJava的merge()方法将帮助你把两个甚至更多的Observables合并到他们发射的数据项里。每个Observable抛出的错误都将会打断合并。如果你需要避免这种情况,RxJava提供了mergeDelayError(),它能从一个Observable中继续发射数据即便是其中有一个抛出了错误。当所有的Observables都完成时,mergeDelayError()将会发射onError()

zip() 处理多个数据来源时会带来:多从个Observables接收数据,处理它们,然后将它们合并成一个新的可观测序列来使用。RxJava有一个特殊的方法可以完成:zip()合并两个或者多个Observables发射出的数据项,根据指定的函数Func*变换它们,并发射一个新值。

Join() 前面的zip()和merge()方法作用在发射数据的范畴内,在决定如何操作值之前有些场景我们需要考虑时间的。RxJava的join()函数基于时间窗口将两个Observables发射的数据结合在一起

combineLatest() 有点像zip()函数的特殊形式。zip()作用于最近未打包的两个Observables。相反,combineLatest()作用于最近发射的数据项:如果Observable1发射了A并且Observable2发射了B和C,combineLatest()将会分组处理AB和AC.

在将来还有一些zip()满足不了的场景。如复杂的架构,或者是仅仅为了个人爱好,你可以使用And/Then/When解决方

switch 有这样一个复杂的场景就是在一个subscribe-unsubscribe的序列里我们能够从一个Observable自动取消订阅来订阅一个新的Observable。

RxJava的switch(),正如定义的,将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射那些Observables最近发射的数据项。

给出一个发射多个Observables序列的源Observable,switch()订阅到源Observable然后开始发射由第一个发射的Observable发射的一样的数据。当源Observable发射一个新的Observable时,switch()立即取消订阅前一个发射数据的Observable(因此打断了从它那里发射的数据流)然后订阅一个新的Observable,并开始发射它的数据。

startWith()是concat()的对应部分。正如concat()向发射数据的Observable追加数据那样,在Observable开始发射他们的数据之前, startWith()通过传递一个参数来先发射一个数据序列

调用线程

StrictMode帮助我们侦测敏感的活动,如我们无意的在主线程执行磁盘访问或者网络调用。正如你所知道的,在主线程执行繁重的或者长时的任务是不可取的。因为Android应用的主线程时UI线程,它被用来处理和UI相关的操作:这也是获得更平滑的动画体验和响应式App的唯一方法。

为了在我们的App中激活StrictMode,我们只需要在MainActivity中添加几行代码,即onCreate()方法

@Override
public void onCreate() {
    super.onCreate();
    if (BuildConfig.DEBUG) {
        StrictMode.setThreadPolicy(new StrictMode.ThreadPolicy.Builder().detectAll().penaltyLog().build());
        StrictMode.setVmPolicy(new StrictMode.VmPolicy.Builder().detectAll().penaltyLog().build());
    }
}

我们并不想它总是激活着,因此我们只在debug构建时使用。这种配置将报告每一种关于主线程用法的违规做法,并且这些做法都可能与内存泄露有关:Activities、BroadcastReceivers、Sqlite等对象。+

选择了penaltyLog(),当违规做法发生时,StrictMode将会在logcat打印一条信息。

  阻塞I/O的操作会导致App必须等待结果返回(阻塞结束)才能进行下一步操作。在UI线程上执行一个阻塞操作会将UI强行卡住,直接造成很糟糕的用户体验。

我们激活StrictMode后,我们开始收到了关于我们的App错误操作磁盘I/O的不良信息。

D/StrictMode  StrictMode policy violation; ~duration=998 ms: android.os.StrictMode$StrictModeDiskReadViolation: policy=31 violation=2
at android.os.StrictMode$AndroidBlockGuardPolicy.onReadFromDisk (StrictMode.java:1135)
at libcore.io.BlockGuardOs.open(BlockGuardOs.java:106) at libcore.io.IoBridge.open(IoBridge.java:393)
at java.io.FileOutputStream.<init>(FileOutputStream.java:88)
at android.app.ContextImpl.openFileOutput(ContextImpl.java:918)
at android.content.ContextWrapper.openFileOutput(ContextWrapper. java:185)
at com.packtpub.apps.rxjava_essentials.Utils.storeBitmap (Utils.java:30)

上一条信息告诉我们Utils.storeBitmap()函数执行完耗时998ms:在UI线程上近1秒的不必要的工作和App上近1秒不必要的迟钝。这是因为我们以阻塞的方式访问磁盘。我们的storeBitmap()函数包含了:

[code]FileOutputStream fOut = context.openFileOutput(filename, Context.MODE_PRIVATE);

它直接访问智能手机的固态存储然后就慢了。我们该如何提高访问速度呢?storeBitmap()函数保存了已安装App的图标。他的返回值类型为void,因此在执行下一个操作前我们毫无理由去等待直到它完成。我们可以启动它并让它执行在不同的线程。近几年来Android的线程管理发生了许多变化,导致App出现诡异的行为。我们可以使用AsyncTask,但是我们要避免掉入前几章里的onPre… onPost…doInBackGround地狱。下面我们将换用RxJava的方式。调度器万岁!

    *

Schedulers.io() 这个调度器时用于I/O操作。它基于根据需要,增长或缩减来自适应的线程池。我们将使用它来修复我们之前看到的StrictMode违规做法。由于它专用于I/O操作,所以并不是RxJava的默认方法;正确的使用它是由开发者决定的。

重点需要注意的是线程池是无限制的,大量的I/O调度操作将创建许多个线程并占用内存。一如既往的是,我们需要在性能和简捷两者之间找到一个有效的平衡点。

Schedulers.computation() 这个是计算工作默认的调度器,它与I/O操作无关。它也是许多RxJava方法的默认调度器:buffer(),debounce(),delay(),interval(),sample(),skip()。

Schedulers.immediate()

这个调度器允许你立即在当前线程执行你指定的工作。它是timeout(),timeInterval(),以及timestamp()方法默认的调度器。

Schedulers.newThread()

这个调度器正如它所看起来的那样:它为指定任务启动一个新的线程。

Schedulers.trampoline()

当我们想在当前线程执行一个任务时,并不是立即,我们可以用.trampoline()将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是repeat()和retry()方法默认的调度器。

非阻塞I/O操作

现在我们知道如何在一个指定I/O调度器上来调度一个任务,我们可以修改storeBitmap()函数并再次检查StrictMode的不合规做法。为了这个例子,我们可以在新的blockingStoreBitmap()函数中重排代码。

    FileOutputStream fOut = null;
    try {
        fOut = context.openFileOutput(filename, Context.MODE_PRIVATE);
        bitmap.compress(Bitmap.CompressFormat.PNG, 100, fOut);
        fOut.flush();
        fOut.close();
    } catch (Exception e) {
        throw new RuntimeException(e);
    } finally {
        try {
            if (fOut != null) {
                fOut.close();
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}

现在我们可以使用Schedulers.io()创建非阻塞的版本:

    Schedulers.io().createWorker().schedule(() -> {
        blockingStoreBitmap(context, bitmap, filename);
    });
}

每次我们调用storeBitmap(),RxJava处理创建所有它需要从I / O线程池一个特定的I/ O线程执行我们的任务。所有要执行的操作都避免在UI线程执行并且我们的App比之前要快上1秒:logcat上也不再有StrictMode的不合规做法。

SubscribeOn and ObserveOn

subscribeOn()方法来用于每个Observable对象。subscribeOn()方法用Scheduler来作为参数并在这个Scheduler上执行Observable调用

observeOn()方法将会在指定的调度器上返回结果

总结

  RxJava是一个正在不断发展和扩大的世界。还有许多方法我们还没有去探索。有些方法甚至还没有,通过RxJava,你可以创建你自己的操作符并把他们发展地更远。+

  Android是一个好玩的地方,但是它也有局限性。作为一个Android开发者,你可以用RxJava和RxAndroid克服其中的许多。我们用AndroidScheduler只简单提了下RxAndroid,除了在最后一章,你了解了ViewObservable。RxAndroid给了你许多:例如,WidgetObservable,LifecycleObservable。往后将它发展地更长远的任务就取决于你了。

谨记可观测序列就像一条河:它们是流动的。你可以“过滤”(filter)一条河,你可以“转换”(transform)一条河,你可以将两条河合并(combine)成一个,然后依然畅流如初。最后,它就成了你想要的那条河。

参考:http://www.aichengxu.com/view/2473291

时间: 2024-10-15 17:52:51

RxJava 方法备忘录的相关文章

拦截js方法备忘录

很明显,以下代码拦截了fusion2.dialog.invite,然后在页面执行fusion2.dialog.invite方法的时候修改了参数中的img. <script> var oldFunction; function editinvite() { oldFunction = fusion2.dialog.invite; fusion2.dialog.invite= function (jsonstr) { console.log("拦截图片地址:"+jsonstr.

访问域名时直接访问默认的工程,执行默认的方法—备忘录《二》

众所周知,访问域名或者某个IP时,如果不配置默认工程,是只显示tomcat界面.下面讲解直接访问域名或IP时,如何访问到默认工程 1.开发环境:springMVC+tomcat 2.下面正式开始配置,主要分为以下几个步骤 1)配置tomcat默认工程,配置tomcat/conf/server.xml文件(记得把默认端口号改成80,linux中可能会设置失败,可执行命令:iptables -t nat -A PREROUTING -p tcp --dport 80 -j REDIRECT --to

最适合使用RxJava处理的四种场景

下面我们开始介绍RxJava最适合使用的四种场景,代码示例基于RxJava1 场景一: 单请求异步处理 由于在Android UI线程中不能做一些耗时操作,比如网络请求,大文件保存等,所以在开发中经常会碰到异步处理的情况,我们最典型的使用场景是RxJava+Retrofit处理网络请求 MyService myService = retrofit.create(MyService.class);myService.getSomething() .subscribeOn(Schedulers.io

理解 RxJava 的线程模型

来源:鸟窝, colobu.com/2016/07/25/understanding-rxjava-thread-model/ 如有好文章投稿,请点击 → 这里了解详情 ReactiveX是Reactive Extensions的缩写,一般简写为Rx,最初是LINQ的一个扩展,由微软的架构师Erik Meijer领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持.NET.JavaScript和C++,Rx近几年越来越

RxJava2 方法总结

RxJava2 方法总结 看了许多讲解RxJava的文章,有些文章讲解的内容是基于第一个版本的,有些文章的讲解是通过比较常用的一些API和基础的概念进行讲解的. 但是每次看到RxJava的类中的几十个方法的时候,总是感觉心里没底.所以,我打算自己去专门写篇文章来从API的角度系统地梳理一下RxJava的各种方法和用法. 1.RxJava 基本 1.1 RxJava 简介 RxJava是一个在Java VM上使用可观测的序列来组成异步的.基于事件的程序的库. 虽然,在Android中,我们可以使用

Volley 解析

Volley Request处理流程 RequestQueue类中有三个基本的队列.调用RequestQueue.add(request)增加的请求会先增加mCacheQueue(优先级堵塞队列)由CacheDispatcher( 循环读取队列中的请求,当没有请求处理时线程堵塞)线程处理.假设该请求之前已经被缓存.读取缓存返回给主线程结果.否则将请求增加mNetworkQueue由NetworkDispatcher线程处理. 因为处理网络请求比較耗时.NetworkDispatcher线程默认开

[CLRS][CH 15.3] 动态规划基础

摘要 究竟什么时候才需要动态规划?这里介绍两个要素:最优子结构,重叠子问题.另外,还要分析一种方法——备忘录,充分利用重叠子问题性质. 最优子结构 利用动态规划求解时第一步是描述最优解的结构.当一个问题具有最优子结构时,提示我们动态规划可能会适用. 在寻找最优子结构时,可以遵循一种共同的模式:1)问题的一个解可以是一个选择:2)假设对于一个给定问题,已知的是一个可以导致最优解的选择:3)在已知这个选择后,要确定哪些子问题会随之发生. 最优子结构在问题域中以两种形式变化:1)有多少个子问题被使用在

对动态规划算法的理解及相关题目分析

1.对动态规划算法的理解 (1)基本思想: 动态规划算法的基本思想与分治法类似:将待求解的问题分解成若干个子问题,先求解子问题,然后从这些子问题的解中得到原问题的解.但是,与分治法不同的是,为了避免重复多次计算子问题,动态规划算法用一个表记录所有已解决的子问题的答案,不管该子问题以后是否被利用,只要它被计算过,就将其结果填入表中. (2)设计动态规划算法的步骤: ①找出最优解的性质,并刻画其结构特征 ②递归地定义最优值 ③以自底向上的方式计算最优值 ④根据计算最优值时得到的信息构造最优解 (3)

矩阵连乘(备忘录方法:自顶向下递归)

#include<iostream> #include<vector> #include<iterator> #include<algorithm> using namespace std; /* *矩阵连乘(备忘录方法:自顶向下递归) */ vector<vector<int>> m;//m[i][j]表示矩阵Ai连乘到Aj的最少运算次数 vector<vector<int>> s;//s[i][j]记录矩阵