RxJava操作符(二) __变换操作

RxJava变换操作符

这周真心比较累,一直都在加班,今天才有点自己的时间来学习新的内容,外包工作苦啊! 上周学习了各种创建操作符,像create,from,Just,Defer….等等,这周中也工作中也用了不少,有时间也需要总结一下自己在工作中使用的操作符。好了,现在来开始学习一个变换操作符吧,不知道什么意思没关系,一个一个去试错吧。

map

官方的翻译是对于Observable发射的每一项数据,都会应用一个函数,执行变换操作,然后返回一个发射这些结果的Observable。 还是举个例子吧,Observable会发射0-5之间的数字,我需要判断这些数字中,哪些是奇数,哪些是偶数。

照例子写一个全局函数,来打印和输出Observer:

    private static void print(Object obj) {
        System.out.println(obj);
    }

    private <T> Observer<T> getDefaultObserver() {
        return new Observer<T>() {

            @Override
            public void onCompleted() {
                print("onCompleted method");
            }

            @Override
            public void onError(Throwable e) {
                print("onError method : " + e);
            }

            @Override
            public void onNext(T t) {
                print("onNext:" + t) ;
            }
        };
    }

那么我们计算0-5之间是否为偶数的代码为:

        Observable.range(0, 5)
        .map(new Func1<Integer, Boolean>() {

            @Override
            public Boolean call(Integer t) {
                //在这里判断每一个发射值 是否为偶数
                return (t & 1) == 0;
            }
        }).subscribe(getDefaultObserver()) ;

结果为:

可以看出,map函数非常简洁,利用map操作符可以完成我们平时用一般方法较难完成的作业。举个例子,在我自己的项目中,服务器返回的数据中,我需要计算每个item中money是否大于某个值,从而决定是否进行某项操作(比如标红,标绿,标蓝色),在涉及到网络请求线程转换中,map这个函数非常好的解决了这个问题。

当然还存在一个特殊的操作符cast,是map的特殊版本,意思是将发射的数据都转换成特定的类型,个人觉得用的比较少,举个例子:Student类继承自Person类,那么下面代码为:

@Test
    public void castFunction() {
        Student s1 = new Student("tom" , 11 , "90") ;
        Student s2 = new Student("merry" , 12 , "92") ;

        Observable.just(s1, s2).
        cast(Person.class).
        subscribe(getDefaultObserver());

    }

flatMap

flatMap是将一个发射数据的Observable变换为多个Observables,然后将他们发射的数据合并后放进一个单独的Observable,举个例子:

    @Test
    public void flatMapFunction() {
        Observable.
        range(1, 5).
        flatMap(new Func1<Integer, Observable<String>>() {

            @Override
            public Observable<String> call(Integer t) {
                print("create func1 method : " + t);
                return Observable.just(String.valueOf(t));
            }
        }).subscribe(getDefaultObserver());
    }

Observable发射5个从1开始的连续数据,每一个发射的数据都将被转化为一个Observable,然后分发给订阅者,那么结果为:

可以看到我们的每一个数据的分发中,都会被转化为一个新的Observable了。

那么,这个map和flatMap到底有什么区别呢? 也不先急着说明它们之间的不同,用个项目中的例子,来说说它们之间的不同吧。

这是项目中一个很平常的例子,我们需要从服务器和本地分别获取歌曲信息,如上图,此时我们需要遍历这两种数据来满足某项要求,那么就意味着我们需要合并这两种数据,就像下图:

代码表现如下:

@Test
    public void flatMapFunction2() {
        List<String> networkList = new ArrayList<String>();
        networkList.add("network:演员");
        networkList.add("network:暧昧");

        List<String> localList = new ArrayList<String>() ;
        localList.add("local:等一分钟");
        localList.add("local:最浪漫的事");

        Observable.
        just(networkList,localList).
        flatMap(new Func1<List<String>, Observable<String>>() {
            @Override
            public Observable<String> call(List<String> t) {
                return Observable.from(t);
            }
        }).subscribe(getDefaultObserver()) ;

    }

结果为:

很显然,使用map操作符不能满足我们的要求,因为map操作符此时只能操作List,并不能操作List的中的所有Item,所以flatmap操作符比map操作符控制粒度更精细,更加牛逼,而且flatmap相当于数据结构的降维,讲复杂的数据结构变成统一的有效的处理结构。嗯,这个函数我用得真心比较多,这主要是我司的PM经常改需求有关,经常Observable.zip请求多个接口,然后使用这个flatmap统一处理某些业务,所以啊上班难混啊。

当然它还有一个升级版,使用flatmap操作符对发射操作的每一个操作[onNext,onError,onCompleted]进行处理,如下:

@Test
    public void flatMapFunction3() {
        Observable.
        just(1, 2, 3).
        flatMap(new Func1<Integer,Observable<String>>(){  //onNext

            @Override
            public Observable<String> call(Integer t) {
                return Observable.just("call onNext : " + String.valueOf(t));
            }

        },new Func1<Throwable, Observable<String>>() {  //onError

            @Override
            public Observable<String> call(Throwable t) {
                return Observable.just("call onError " + t);
            }
        },new Func0<Observable<String>>(){  //onCompleted

            @Override
            public Observable<String> call() {
                return Observable.just("call completed ...");
            }

        }).subscribe(getDefaultObserver());
    }

concatMap

ReactiveX文档中介绍,它类似最简单版本的flatmap,但是它是按照次序连接而不是合并那些生成的Observables,然后产生自己的数据序列。所以根据此说明,写了一个例子,来试图说明一下flatMap与concatMap的区别:

先使用线程创建一个Obervable,使用多线程的例子就是为了试图解决flatMap是否为合并数据,concatMap是否为连接数据:

private Observable<String> getRandomObservable(int index) {
        return  Observable.create(new OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> t) {
                new Thread(){
                    Random random = new Random() ;

                    public void run() {
                        for(int i = 0 ; i < 3 ; i++) {
                            try {
                                //线程休息随机秒数
                                sleep(random.nextInt(1000)) ;
                                t.onNext(index + "--" + i);

                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        //onCompleted()方法必须调用,因为conactMap主要知道Observable是否发射完成
                        //才能去连接下一个Observable的数据
                        t.onCompleted();
                    };
                }.start();
            }
        }) ;
    }

那么在主程序中,先看flatMap的表现:

@Test
    public void concatMapFunction2() throws IOException {
        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        flatMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;
    }

再使用concatMap:

    @Test
    public void concatMapFunction2() throws IOException {
        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        concatMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;
    }

那么我们看一下结果,上图的是flatmap,下图的是concatMap:

很显然在多线程的情况下,flatmap的数据是混合的,那么就意味着其处理数据的方式是随机合并的[因为采用的是random方式,所以得出的结果也可能是随机的,这里我只是说明flatmap与concatmap之间的区别,但是它们也有相同结果的表现形式,比如它们是同一线程从上到下的执行顺序],而我们的conactMap是按照顺序的,先执行getRandomObservable(1),然再去执行getRandomObservable(2)。那么从这里我们就能非常清晰的知道了flatmap鱼concatmap的区别了。

switchMap

使用方式和flatmap一致,除了一点:当原始的Observable发射一个新的数据(Observable)时,它将取消订阅并停止监视之前的那个Observable,只监视这一个,好吧,还是上面的代码,重新贴一下:

        Observable.
        just(getRandomObservable(1), getRandomObservable(2)).
        switchMap(new Func1<Observable<String>, Observable<String>>() {
            @Override
            public Observable<String> call(Observable<String> t) {
                return t;
            }
        }).subscribe(getDefaultObserver()) ;

        System.in.read() ;

结果为:

可以看出,它只执行了onNext2,并没有执行onNext1方法,就执行了onCompleted方法,那么就说明switchMap放弃监视getRandomObservable(1),转而监视getRandomObservable(2)方法。

scan

连续地对数据列的每一项应用一个函数,然后连续发射结果

不扯了,还是来个例子吧,小时候我们数学课上经常计算这个结果为5050的表达式:

1 + 2 + 3 + 4 + 5 + ... + 100 = 5050;

那么使用scan操作符计算,就是这样的:

    @Test
    public void scanFunction() {
        Observable.
        range(1, 100).
        scan(new Func2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer t1, Integer t2) {
                return t1 + t2;
            }
        }).subscribe(getDefaultObserver()) ;
    }

来看看结果吧,比较好玩:

好了我们看到了最终的结果是 5050了,那么看看上面的除了5050之外的onNext方法,再看一下下面的解释过程,就可以知道了scan的计算方式了:

scan操作符对原始的Observable发射的第一项数据应用一个函数,然后将那个函数的结果作为第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据,持续这个过程,直到发射数据的结束。

GroupBy

将一个Observable分拆为Observables的集合,当然这个分拆的规则是我们自己通过函数去定义的。通过groupby操作符,返回的是一个GroupedObservable。GroupObservable继承了Observable,并且拥有一个额外的方法getKey,这个key值指定了数据分组指定的Observable。

好了,咋们来试一下这个操作符吧。现在我们想从tom,tommy,merry各位小学生里面取出名字第一个字母为m的,那么我们先需要分组,然后再输出,来,给个代码证明一下啊:

@Test
    public void groupByFunction() {
        Observable.
        just("tom", "tommy" , "merry").
        groupBy(new Func1<String, Integer>() {
            @Override
            public Integer call(String t) {
                //我这里是计算字符 ‘m‘的数值
                return t.charAt(0) - ‘a‘;
            }
        }).flatMap(new Func1<GroupedObservable<Integer, String>, Observable<String>>() {

            @Override
            public Observable<String> call(GroupedObservable<Integer, String> t) {
                //分组完成之后,在这里判断根据key去判断我们的目标值
                //当key==19 时,输出此时分组的Observable

                if(t.getKey() == 19) {
                    return t ;
                }

                //当key为其它值时,我们可以根据自己的需要 这里直接输入never【就是什么都不操作的意思】
                return Observable.never();
            }

        }).subscribe(getDefaultObserver()) ;
    }

好了,我们来看一下结果吧:

好了,我们的目标完成了,这个grouby操作符,基本也就差不多扯到这里了,具体项目中自己用吧,据说这玩意还有个内存泄漏的什么bug,估计是我们操作不好造成的吧。

Buffer

定期收集Observable的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值。

1. 发射1-15这15个数字,每4个数字发送一次:

@Test
    public void bufferFunction1() {
        Observable.
        range(1, 15).
        buffer(4).
        subscribe(getDefaultObserver());
    }

得到的结果为:

可以看到结果是以List的形式,每次为4位的形式展现出来的。

  1. 发射1-15这15个数字,每次取值时,剔除8个发射值,然后取4个数字发送一次:
@Test
    public void bufferFunction2() {
        Observable.
        range(1, 15).
        buffer(4,8).
        subscribe(getDefaultObserver());
    }

那么结果为:

可以看出第二项onNext中1-8的数据被剔除了。

  1. buffer除了定量取值之外,还可以定时取值,下面的例子,buffer定时3秒获取发射值:
@Test
    public void bufferFunction3() {
        Observable.create(new Observable.OnSubscribe<String>() {
               @Override
               public void call(Subscriber<? super String> subscriber) {
                   if (subscriber.isUnsubscribed()) return;
                   while (true) {
                       subscriber.onNext("消息" + System.currentTimeMillis());
                       try {
                         //每隔2s发送消息
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                   }
               }
           }).
        //每隔3秒 取出消息
        buffer(3, TimeUnit.SECONDS).
        subscribe(getDefaultObserver());
    }

那么得出的结果为:

每3秒获取一次数据,然后将获取的所有数据统一发射出来。

好了,基本上能说能写的,就写成这样了,有新的东西需要及时补上啊,写得真心时间长啊。。。。

时间: 2024-10-12 04:05:18

RxJava操作符(二) __变换操作的相关文章

RxJava操作符(09-算术/聚合操作&amp;连接操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51692493 本文出自:[openXu的博客] 目录: 算术聚合 Count Concat Reduce 连接操作 Publish Connect RefCount Replay 源码下载 算术&聚合 1. Count ??Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量. ?? 如果原始Observa

Linux(二)__基础操作

centos7 字符界面操作基础 1.字符界面的使用方法 2.本地登录和远程登录 3.学会使用putty 4.理解系统运行级别及其切换方法 5.掌握常用的系统关机和重启命令 为什么使用字符工作方式? 1.在字符操作方式下可以高效的完成所有的任务,尤其是系统管理任务. 2.系统管理任务通常都是远程进行,而远程登录后进入字符工作模式可以提高运行效率,占用带宽也非常小. 3.由于使用字符界面不用启动图形工作环境,这样可以大节省系统资源开销 进入字符工作方式的方法 1.在图形环境下直接开启终端窗口.图形

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

2_Matlab图像的空间域变换操作

1. 目的:为了达到某种视觉效果,变换输入图像的像素位置,通过把输入图像的像素位置映射到一个新的位置以达到改变原图像显示效果的目的. 2. 操作包括: ? 图像插值(Interpolation) ? 图像缩放(Resizing) ? 图像旋转(Rotation) ? 图像剪切(Cropping) 3.图像差值操作 1)原因:在处理图像的过程中,比如对图像进行缩放及旋转,这时图像中每个像素的值都要发生变化.数字图像的坐标是整数,经过这些变换之后的坐标不一定是整数,使得输入图像的像素点经过空间域变换

排序(二)__冒泡排序、简单选择排序和直接插入排序

前面<排序(一)__综述>提到按照算法的复杂度分为简单算法和改进算法两大类,本文主要就简单算法中的冒泡排序.简单选择排序和直接插入排序进行通俗详细的解析. 一.冒泡排序 1.基本概念 冒泡排序是一种交换排序,它的基本思想是:两两比较相邻记录的关键字,如果反序则交换,直到没有反序的记录为止.(动态图来自维基百科) 2.关键代码(优化之后) void BubbleSort(SqList *L) { int i,j; Status flag=TRUE;            //flag用作标记,避

JAVA编程思想(2) - 操作符(二)

5. 直接常量 -一般来说,如果程序里使用了"直接常量",编译器可以准确的知道要生成什么样的类型,但有时候却是模棱两可的.这时候需要我们对编译器进行适当的"指导" -直接常量后面的后缀字符标示了它的类型. -指数记数法:e代表"10的幂次" -注意如果编译器能够正确的识别类型,就不必在数值后附加字符,例如语句: float f4 = 200; 不存在含糊不清的地方,所以200后面不需要加L,但是对于:float f4 = 1e-43f; 编译器通

DotNet二维码操作组件ThoughtWorks.QRCode

DotNet二维码操作组件ThoughtWorks.QRCode 在生活中有一种东西几乎已经快要成为我们的另一个电子"身份证",那就是二维码.无论是在软件开发的过程中,还是在普通用户的日常中,几乎都离不开二维码.二维码 (dimensional barcode) ,又称二维条码,是在一维条码的基础上扩展出的一种具有可读性的条码.设备扫描二维条码,通过识别条码的长度和宽度中所记载的二进制数据,可获取其中所包含的信息.相比一维条码,二维码记载更复杂的数据,比如图片.网络链接等. 今天介绍一