RxJava- 操作符之转换Observable

RxJava- 操作符之过滤Observables了解到,RxJava过滤操作符的基本使用,主要是针对Observable列表。本篇主要针对Observable列表中对象操作符的使用。用于变换可观测序列来创建一个能够更好的满足我们需求的序列。

map

map操作符通过指定一个的Func对象,将Observables转换为一个新的Observable对象并发射,观察者将收到新的Observable处理。

map操作符的流程图如下:

示例代码:

    Observable.from(mLists)
        .map(new Func1<Student, Student>() {

            @Override
            public Student call(Student student) {
                StringBuilder sb = new StringBuilder();
                String age = student.getAge();
                sb.append("AGE - ");
                sb.append(age);
                student.setAge(sb.toString());
                return student;
            }
        })
        .subscribe(new Observer<Student>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(Student student) {
                mAdaStu.addData(student);
            }
        });

正如你看到的,像往常一样创建我们发射的Observable之后,追加一个map调用,在其回调call方法中,将age字段添加“AGE - ”, 更新对象后发射,观察者接收到的是更新后的Observable。

flatMap

flatMap操作符用于发射的一个数据序列,而这些数据同时本身拥有发射Observable。flatMap是以铺平序列的方式,然后合并这些Observables发射的数据,最后将合并后的结果作为最终的Observable。但是,flatMap()可能交错的发送事件,最终结果的顺序可能并是不原始Observable发送时的顺序。

flatMap操作符的流程图如下:

示例代码:

    private void doFlatMap() {
        List<String> lists = new ArrayList<>();
        lists.add("Flat - A");
        lists.add("Flat - B");
        lists.add("Flat - C");
        lists.add("Flat - D");

        Observable.from(lists)
                .flatMap(new Func1<String, Observable<Student>>() {
                    @Override
                    public Observable<Student> call(String s) {
                        return getStudentData(s);
                    }
                })
                .subscribe(new Observer<Student>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Student student) {
                        mAdaStu.addData(student);
                    }
                });

    }

    // 针对不同的s发射不同的Observable序列
    private Observable<Student> getStudentData(final String s) {
        List<Student> lists_0 = new ArrayList<>();
        if (TextUtils.equals("Flat - A", s) || TextUtils.equals("Concat - A", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "12", "20", "1102"));
            lists_0.add(new Student(s + "13", "20", "1103"));
        } else if (TextUtils.equals("Flat - B", s) || TextUtils.equals("Concat - B", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
        } else if (TextUtils.equals("Flat - C", s) || TextUtils.equals("Concat - C", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "12", "20", "1102"));
        } else if (TextUtils.equals("Flat - D", s) || TextUtils.equals("Concat - D", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "12", "20", "1102"));
            lists_0.add(new Student(s + "13", "20", "1103"));
            lists_0.add(new Student(s + "13", "20", "1104"));
        }

        return Observable.from(lists_0);
    }

注:

1.flatMap允许交叉。如流程图所示,这意味着flatMap()不能够保证在最终生成的Observable中源Observables确切的发射顺序。

2.当我们在处理可能有大量的Observables时,重要是记住任何一个Observables发生错误的情况,flatMap()将会触发它自己的onError()函数并放弃整个链。

concatMap

concatMap操作符功能与flatMap操作符一致,不过,它解决了flatMap()的交叉问题,提供了一种能够把发射的值连续在一起的

铺平函数,而不是合并它们,

concatMap操作符的流程图如下:

示例代码:

    List<String> lists = new ArrayList<>();
    lists.add("Concat - A");
    lists.add("Concat - B");
    lists.add("Concat - C");
    lists.add("Concat - D");

    Observable.from(lists)
            .concatMap(new Func1<String, Observable<Student>>() {
                @Override
                public Observable<Student> call(String s) {
                    return getStudentData(s);
                }
            })
            .subscribe(new Observer<Student>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Student student) {
                    mAdaStu.addData(student);
                }
            });

flatMapIterable

flatMapIterable操作符和flatMap类似,仅有的本质不同是它将源数据两两结成对并生成Iterable,而不是原始数据项和生成的Observables。

flatMapIterable操作符的流程图如下:

示例代码:

     private void doFlatMapIterable() {
        List<String> lists = new ArrayList<>();
        lists.add("FlatMapIterable - A");
        lists.add("FlatMapIterable - B");
        lists.add("FlatMapIterable - C");
        lists.add("FlatMapIterable - D");

        Observable.from(lists)
                .flatMapIterable(new Func1<String, Iterable<Student>>() {
                    @Override
                    public Iterable<Student> call(String s) {
                        return getStudentList(s);
                    }
                })
                .subscribe(new Observer<Student>() {
                    @Override
                    public void onCompleted() {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(Student student) {
                        mAdaStu.addData(student);
                    }
                });
    }

    // 针对不同的s发射不同的Observable序列
    private ArrayList<Student> getStudentList(final String s) {
        ArrayList<Student> lists_0 = new ArrayList<>();
        if (TextUtils.equals("FlatMapIterable - A", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "12", "20", "1102"));
            lists_0.add(new Student(s + "13", "20", "1103"));
        } else if (TextUtils.equals("FlatMapIterable - B", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
        } else if (TextUtils.equals("FlatMapIterable - C", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "11", "20", "1102"));
        } else if (TextUtils.equals("FlatMapIterable - D", s)) {
            lists_0.add(new Student(s + "11", "20", "1101"));
            lists_0.add(new Student(s + "12", "20", "1102"));
            lists_0.add(new Student(s + "13", "20", "1103"));
            lists_0.add(new Student(s + "13", "20", "1104"));
        }

        return lists_0;
    }

switchMap

switchMap操作符flatMap类似,有一点不同,每当源Observable发射一个新的数据项(Observable)时,它将取消订阅并停止监视之前那个数据项产生的Observable,并开始监视当前发射的这一个。

switchMap操作符的流程图如下:

scan

scan操作符可以认为是一个sum函数,scan()函数对原始Observable发射的每一项数据都应用一个函数,计算出函数的结果值,并将该值填充回可观测序列,等待和下一次发射的数据一起使用。

scan操作符的流程图如下:

示例代码:

    Observable.just(1,2,3,4,5)
        .scan((sum,item) -> sum + item)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {
                Log.d("RXJAVA", "Sequence completed.");
            }

            @Override
            public void onError(Throwable e) {
                Log.e("RXJAVA", "Something went south!");
            }

            @Override
            public void onNext(Integer item) {
                Log.d("RXJAVA", "item is: " + item);
            }
        });

goupBy

goupBy操作符用于分组元素,将源Observable变换成一个发射Observables的新的Observable(分组后的)。它们中的每一个新的Observable都发射一组指定的数据。

goupBy操作符的流程图如下:

示例代码:

    Observable<GroupedObservable<String,Student>> groupedItems = Observable.from(mLists)
        .groupBy(new Func1<Student,String>(){
            @Override
            public String call(Student student){
                return student.getAge();
            }
        });
    Observable.concat(groupedItems)
            .subscribe(new Observer<Student>() {
                @Override
                public void onCompleted() {

                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onNext(Student student) {
                    mAdaStu.addData(student);
                }
            });

buffer

buffer操作符将源Observable变换一个新的Observable,这个新的Observable每次发射一组列表值而不是一个一个发射。

buffer操作符的流程图如下:

示例代码:

仅仅指定cout

    Observable.from(mLists)
        .buffer(2)
        .subscribe(new Action1<List<Student>>() {
            @Override
            public void call(List<Student> list) {
                cout++;
                Log.i("123", "cout = " + cout);
                Log.i("123", list.toString());
                mAdaStu.addDatdLists(list);
            }
        });
    log打印
    cout = 1
    [Student(name=A11, age=26, no=1101), Student(name=A12, age=21, no=1102)]
    cout = 2
    [Student(name=A13, age=20, no=1103), Student(name=B11, age=22, no=1201)]
    cout = 3
    [Student(name=B12, age=30, no=1202), Student(name=B13, age=20, no=1203)]
    cout = 4
    [Student(name=S10, age=28, no=1301), Student(name=E10, age=30, no=1401)]
    cout = 5
    [Student(name=F10, age=26, no=1501)]

查看上方的Log,可以看出将源Observable量量分组,变换为一个新的Observable,新的Observable每次发射一组列表值。

设定count及skip

查看上方的流程图,如果设定skip值以后,将源Observable按照长度为skip分组,但是又将每组前count个添入新的Observable并发射。

示例代码:

    Observable.from(mLists)
        .buffer(2,3)
        .subscribe(new Action1<List<Student>>() {
            @Override
            public void call(List<Student> list) {
                cout++;
                Log.i("123", "cout = " + cout);
                Log.i("123", list.toString());
                mAdaStu.addDatdLists(list);
            }
        });
    //Log打印
    cout = 1
    [Student(name=A11, age=26, no=1101), Student(name=A12, age=21, no=1102)]
    cout = 2
    [Student(name=B11, age=22, no=1201), Student(name=B12, age=30, no=1202)]
    cout = 3
    [Student(name=S10, age=28, no=1301), Student(name=E10, age=30, no=1401)]

设定count及timespan

如果设定buffer的timespan的参数,会创建一个每隔timespan时间段就会发射一个列表的Observable。

window

window操作符与Buffer操作符类似,但是它发射的是Observable而不是列表。看下方流程图,winow是缓存3个数据项并把它们作为一个新的Observable发射出去。

window操作符的流程图如下:

示例代码:

仅仅设定count

    Observable.from(mLists)
        .window(2)
        .subscribe(new Action1<Observable<Student>>() {
            @Override
            public void call(Observable<Student> studentObservable) {
                studentObservable.subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        mAdaStu.addData(student);
                    }
                });
            }
        });
    //Log打印
    I: cout = 1
    I: Student(name=A11, age=26, no=1101)
    I: cout = 2
    I: Student(name=A12, age=21, no=1102)
    I: cout = 3
    I: Student(name=A13, age=20, no=1103)
    I: cout = 4
    I: Student(name=B11, age=22, no=1201)
    I: cout = 5
    I: Student(name=B12, age=30, no=1202)
    I: cout = 6
    I: Student(name=B13, age=20, no=1203)
    I: cout = 7
    I: Student(name=S10, age=28, no=1301)
    I: cout = 8
    I: Student(name=E10, age=30, no=1401)
    I: cout = 9
    I: Student(name=F10, age=26, no=1501)

细看上方Log,window将源Observable按count缓存后,然后一一发射出去。

设定count及Skip

    Observable.from(mLists)
        .window(2,3)
        .subscribe(new Action1<Observable<Student>>() {
            @Override
            public void call(Observable<Student> studentObservable) {
                studentObservable.subscribe(new Action1<Student>() {
                    @Override
                    public void call(Student student) {
                        mAdaStu.addData(student);
                    }
                });
            }
        });
    I: cout = 1
    I: Student(name=A11, age=26, no=1101)
    I: cout = 2
    I: Student(name=A12, age=21, no=1102)
    I: cout = 3
    I: Student(name=B11, age=22, no=1201)
    I: cout = 4
    I: Student(name=B12, age=30, no=1202)
    I: cout = 5
    I: Student(name=S10, age=28, no=1301)
    I: cout = 6
    I: Student(name=E10, age=30, no=1401)

细看上方Log,window将源Observable按skip值进行缓存后,但新Observable仅仅将分组中的前count个一一发射出去。

cast

cast操作符是是map()操作符的特殊版本,它将源Observable中的每一项数据都转换为新的类型,把它变成了不同的Class。

cast操作符是是map操作符的流程图如下:

时间: 2024-12-09 17:36:41

RxJava- 操作符之转换Observable的相关文章

RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)

RxJava系列教程: 1. RxJava使用介绍 [视频教程] 2. RxJava操作符 ? Creating Observables(Observable的创建操作符) [视频教程] ? Transforming Observables(Observable的转换操作符) [视频教程] ? Filtering Observables(Observable的过滤操作符) [视频教程] ? Combining Observables(Observable的组合操作符) [视频教程] ? Erro

RxJava操作符 -创建型

操作符类型 创建操作 变换操作 过滤操作 组合操作 错误处理 辅助操作 条件和布尔操作 算术和聚合操作 连接操作 转换操作 创建操作 create 你可以使用create操作符从头开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,编写这个函数让它的行为表现为一个Observable–恰当的调用观察者的onNext,onError和onCompleted方法. 一个形式正确的有限Observable必须尝试调用观察者的onCompleted正好一次或者它的onErro

Android RxJava操作符一览

前言 把现在接触到的操作符全部整理进来,方便查阅,遇到新的也会添加进来.和RxJavaLearn 的README.md同步更新. 操作符决策树 直接创建一个Observable(创建操作) 组合多个Observable(组合操作) 对Observable发射的数据执行变换操作(变换操作) 从Observable发射的数据中取特定的值(过滤操作) 转发Observable的部分值(条件/布尔/过滤操作) 对Observable发射的数据序列求值(算术/聚合操作) 创建操作 用于创建Observab

RxJava操作符(05-结合操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656736 本文出自:[openXu的博客] 目录: CombineLatest Join Merge StartWith Switch Zip 源码下载 结合操作就是将多个Observable发射的数据按照一定规则组合后发射出去,接下来看看RxJava中的结合操作符: 1. CombineLatest ??当两个Observables中的任何一个发射数据时,使用一个函数结合每个Obse

RxJava操作符(02-创建操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51645348 本文出自:[openXu的博客] 目录: Create Defer EmptyNeverThrow From Interval Just Range Repeat Timer 源码下载 ??在上一篇博客中我们初步体验了Rxjava的使用,领略了RxJava对于异步操作流编码的简洁风格,接下来的一系列博客,我们主要学习RxJava中的操作符,也就是RxJava的一些API,由于

RxJava操作符(03-变换操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51649975 本文出自:[openXu的博客] 目录: Buffer FlatMap flatMapIterable concatMap switchMap GroupBy Map cast Scan Window 源码下载 ??变换操作符的作用是对Observable发射的数据按照一定规则做一些变换操作,然后将变换后的数据发射出去,接下来看看RxJava中主要有哪些变换操作符: 1. B

RxJava操作符总结之过滤

RxJava操作符总结之过滤 jsut() just(T t1, T t2, T t3 ....) ,just能够传入多个同样类型的參数,并将当前參数一个接着一个的发送. Observable.just("1","2","3") .subscribe(new Action1<String>() { @Override public void call(String s) { System.out.println(s); } }); 1

RxJava操作符(09-算术/聚合操作&amp;连接操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51692493 本文出自:[openXu的博客] 目录: 算术聚合 Count Concat Reduce 连接操作 Publish Connect RefCount Replay 源码下载 算术&聚合 1. Count ??Count操作符将一个Observable转换成一个发射单个值的Observable,这个值表示原始Observable发射的数据的数量. ?? 如果原始Observa

RxJava操作符(04-过滤操作)

转载请标明出处: http://blog.csdn.net/xmxkf/article/details/51656494 本文出自:[openXu的博客] 目录: Debounce Distinct ElementAt Filter First Last IgnoreElements SampleThrottleFirst SkipSkipLast TakeTakeLast 源码下载 "过滤操作",顾名思义,就是过滤掉Observable发射的一些数据,不让他发射出去,也就是忽略丢弃掉