实战SpringCloud响应式微服务系列教程(第六章)

本章节介绍:Flux和Mono操作符

和其他主流的响应式编程一样,Reactor框架的设计目标也是为了简化相应式流的使用方法。为此Reactor框架提供了大量操作符用于操作Flux和Mono对象。

本节不打算全面详细介绍,我们的思路是将这些操作符分类,然后对每一类中具有代表性的操作符展开讨论。

对于其他没有介绍到的操作符可参考Reactor框架的官方文档。

在本节中我们把Flux和Mono操作符分为以下7大类。

  • 转换 (Transforming)操作符负责对序列中的元素进行转变。
  • 过滤 (Filtering)操作符负责将不需要的数据从序列中进行过滤。
  • 组合 (Combining) 操作符负责将序列中的元素进行合并和连接。
  • 条件 (Conditional) 操作符负责根据特定条件对序列中的元素进行处理。
  • 数学 (Mathematical) 操作符负责对序列中的元素执行各种数学操作。
  • Obserable工具(Utility) 操作符提供的是一些针对流失处理的辅助性工具。
  • 日志和调试(Log&Debug) 操作符提供了针对运行时日志以及如何对序列进行代码调试的工具类。

1. 转换操作符

Reactor框架中常用的转换操作符包括buffer、map、flatMap和window。

(1)buffer

buffer操作符把当前流中的元素收集到集合中,并把集合对象作为流中的新元素。使用buffer操作符在进行元素收集时可以指定集合对象所包含的元素的最大数量。

以下代码先使用range()方法创建了1~50这50个元素,然后演示了使用buffer从包含这50个元素的流中构建集合,每个集合包含10个元素,一共构建5个集合。

Flux.range(1,50).buffer(10).subscribe(System.out::println);

上面代码执行结果如下:

[1,2,3,4,5,6,7,8,9,10]
[11,12,13,14,15,16,17,18,19,20]
[21,22,23,24,25,26,27,28,29,30]
[31,32,33,34,35,36,37,38,39,40]
[41,42,43,44,45,46,47,48,49,50]

buffer操作符的另一种用法是指定收集的时间间隔,由此演变出了bufferTimeout()方法。bufferTimeout()方法可以指定时间间隔为一个Duration对象或者毫秒数,即使用bufferTimeoutMillis()或者bufferMillis()这两个方法。

除了指定元素数量和时间间隔,还可以通过bufferUnitl和bufferWhile操作符进行数据收集。bufferUnitl会一直收集,知道断言(Predicate)条件返回true。使得断言条件返回true的那个元素可以选择添加到当前集合或者下一个集合当中。

而bufferWhile只有当断言条件返回true时才会收集,一旦值为false,会立即开始下一次收集。

代码如下:

Flux.range(1,10).bufferUnitl(i -> i%2 ==0).subscribe(System.out::println);
System.out.println("-----------------------------------");
Flux.range(1,10).bufferWhile(i -> i%2 ==0).subscribe(System.out::println);

以上代码执行结果如下:

[1,2]
[3,4]
[5.6]
[7,8]
[9,10]
--------------------------
[2]
[4]
[6]
[8]
[10]

(2)map

map操作符相当于一种映射操作,他对流中每一个元素映射一个函数,从而达到一个变幻效果。

Flux.just(1, 2, 3, 4)
                .log()
                .map(i -> {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return i * 2;
                })
                .subscribe(e -> log.info("get:{}",e));

以上代码运行结果:

10:53:57.058 [main] INFO reactor.Flux.Array.1 - | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
10:53:57.062 [main] INFO reactor.Flux.Array.1 - | request(unbounded)
10:53:57.063 [main] INFO reactor.Flux.Array.1 - | onNext(1)
10:53:58.067 [main] INFO com.example.demo.FluxTest - get:2
10:53:58.067 [main] INFO reactor.Flux.Array.1 - | onNext(2)
10:53:59.071 [main] INFO com.example.demo.FluxTest - get:4
10:53:59.071 [main] INFO reactor.Flux.Array.1 - | onNext(3)
10:54:00.076 [main] INFO com.example.demo.FluxTest - get:6
10:54:00.076 [main] INFO reactor.Flux.Array.1 - | onNext(4)
10:54:01.080 [main] INFO com.example.demo.FluxTest - get:8
10:54:01.081 [main] INFO reactor.Flux.Array.1 - | onComplete()

(3) flatMap

与map不同,操作符把流中的每一个元素转换成一个流,再把转换之后得到的所有流中的元素进行合并。

flatMap操作符非常实用,代码示例如下:

Flux.just(1,5).flatMap(x -> Mono.just(x*x)).subscribe(System.out::println);

以上代码中我们对1和5这两个元素进行了flatMap操作,操作的结果是返回他们的平方值进行合并,执行结果如下:

1
25

在系统开发过程中我们经常会碰到对数据库中的数据进行逐一处理的场景,这时候可以充分利用flatMap操作符的特性开展相关操作。以下代码展示了如何使用flatMap对数据库中获取的数据进行逐一删除的方法。

Mono<void> deleteFiles =
fileRepository.findByname(flieName).flatMap(fileRepository::delete);

(4)window

window操作符类似于buffer,所不同的是,window操作符是把当前流中的元素收集到另一个Flux序列中。因此它的返回值类型是Flux<flux>,而不是简单的Flux。</flux

示例代码:

Flux.range(1,5).window(2).toIterable().forEach(w -> {
   w.subscribe(System.out::println);
   System.out.println("---------------------------")
})

以上代码执行结果如下。这里生成了5个元素,然后通过window操作符把这个5个元素转变成3个Flux对象,并通过forEach()工具把这些对象打印出来。

1
2
-----------------
3
4
-----------------
5

2.过滤操作符

Reactor中 常用过操作符包括filter、first、last、skip/skipLast、take/takeLast等。

(1)filter

filter操作符的含义与普通的过滤器类似,就是对流中包含的元素进行过滤,只是留下满足指定条件的元素。

例如,我们想对1~10这10个元素进行过滤,只获取能被2取余的元素,可以使用如下代码。

Flux.range(1,10).filter(i -> i % 2 ==0).subscribe(System.out::println);

(2)first

first操作符的执行效果即为返回流中的第一个元素。

(3)last

last操作符与first类似,返回流中的最后一个元素。

(4)skip/skipLast

如果使用skip操作符,将会忽略数据流中的前n个元素。类似的如果使用skipLast将会忽略数据流中的后n个元素。

(5)take/takeLast

take系列操作符用来从当前流中提取元素。我们可以按照指定的数量来提取元素,对应的方法是take(long n);同时,也可以按照指定的时间间隔来提取元素,分别使用take(Duration time) 和takeMillis(long time)。类似的takeLast操作符用来从当前流中尾部提取元素。

take和takeLast操作符示例代码如下:

Flux.range(1,100).take(10).subscribe(System.out::println);
Flux.range(1,100).takeLast(10).subscribe(System.out::println);

3. 组合操作符

Reactor中常用的组合操作符有then/when、merge、starWith和zip

(1)then/when

then操作符的含义是等到上一个操作符完成时在做下一个。

when操作符的含义是等到多个操作仪器完成。

如下代码很好的展示了when操作符的实际应用场景。

public Mono<Void> updateFiles(Flux<FilePart> files){
        return files.flatMap(file ->{
            Mono<Void> copyFileToFileServer =...;
            Mono<Void> saFilePathToDataBase = ...;
            return Mono.when(copyFileToFileServer);
        });
    }

(2)starWith

starWith操作符的含义是在数据元素序列的开头插入指定的元素项。

(3)merge

merge操作符用来把多个流合并成一个Flux序列,该操作符按照所有流中的元素实际生产顺序合并。

merge操作符示例代码如下:

Flux.merge(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

请注意这里两个Flux.intervalMillis()方法都是在限制10ms内生产一个新元素。

运行结果如下:

0
0
1
1
2
2

不同于merge,mergeSequeetial操作符则是按照所有流被订阅的顺序以流为单位进行合并。

请看如下代码:

Flux.mergeSequeetial(Flux.intervalMillis(0,10).take(3),
   Flux.intervalMillis(5,10).take(3)).toStream()
   .forEach(System.out::println);

我们仅仅只是将merge换成了mergeSequeetial。

运行结果如下:

0
1
2
0
1
2

(4)zipWith

zipWith把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。使用zipWith
操作符在合并时可以不做任何处理,如此得到一个元素类型为Tuple2的流,示例代码如下:

Flux.just("a","b").zipWith(Flux.just("c","b")).subscribe(System.out::println);

运行结果如下:

[a,c]
[b,d]

另外,我们还可以通过一个BiFunction函数对合并的元素进行处理,所得到的流的元素类型为该函数的返回值。

代码如下:

Flux.just("a","b").zipWith(Flux.just("c","b"),(s1,s2) -> String.format("%+%",s1,s2))
   .subscribe(System.out::println);

运行结果如下:

a+c
b+d

本章节完!历史章节

实战SpringCloud响应式微服务系列教程(第一章)

实战SpringCloud响应式微服务系列教程(第二章)

实战SpringCloud响应式微服务系列教程(第三章)

实战SpringCloud响应式微服务系列教程(第四章)

实战SpringCloud响应式微服务系列教程(第五章)

原文地址:https://www.cnblogs.com/javazhiyin/p/11597839.html

时间: 2024-11-10 18:14:59

实战SpringCloud响应式微服务系列教程(第六章)的相关文章

架构新说之响应式微服务架构-分布式系统设计原则

O'Reilly的电子书<Reactive Microservices Architecture>讲述了微服务/分布式系统的一些设计原则,本文是笔者阅读完此书后的理解. 微服务相比传统的单体应用能够带来快速的响应,以小的系统产生大的影响.而随着网络加速.磁盘成本降低.RAM成本降低.多核技术的发展.云架构技术的爆发,微服务不再受这些客观条件的限制,已经开始大规模的应用. 与SOA架构,微服务和它都具有相同的初衷:解耦.隔离.组合.集成.分散以及自主,但是SOA经常被误解和误用,尤其是使用ESB

SpringCloud分布式微服务云架构 第五篇: 路由网关(zuul)(Finchley版本)

SpringCloud分布式微服务云架构 第五篇: 路由网关(zuul)(Finchley版本)在微服务架构中,需要几个基础的服务治理组件,包括服务注册与发现.服务消费.负载均衡.断路器.智能路由.配置管理等,了解springcloud架构可以加求求:三五三六二四七二五九,由这几个基础组件相互协作,共同组建了一个简单的微服务系统.一个简答的微服务系统如下图: 注意:A服务和B服务是可以相互调用的,并且配置服务也是注册到服务注册中心的. 在Spring Cloud微服务系统中,一种常见的负载均衡方

Unity3D脚本中文系列教程(十六)

Unity3D脚本中文系列教程(十五) ◆ function OnPostprocessAudio (clip:AudioClip):void 描述:◆  function OnPostprocessGameObjectWithUserProperties (root : GameObject, propNames : string[], values : object[]) : void 描述:在导入文件时,为每个至少附加了一个用户属性的游戏物体调用propNames是一个string[ ],

HttpClient 4.3教程 第六章 HTTP缓存

HttpClient 4.3教程 第六章 HTTP缓存 Posted on 2013 年 10 月 28 日 6.1.基本概念 HttpClient的缓存机制提供一个与HTTP/1.1标准兼容的缓存层 – 相当于Java的浏览器缓存.HttpClient缓存机制的实现遵循责任链(Chain of Responsibility)设计原则,默认的HttpClient是没有缓存的,有缓存机制的HttpClient可以用来临时替代默认的HttpClient,如果开启了缓存,我们的请求结果就会从缓存中获取

《Entity Framework 6 Recipes》中文翻译系列 (30) ------ 第六章 继承与建模高级应用之多对多关联

翻译的初衷以及为什么选择<Entity Framework 6 Recipes>来学习,请看本系列开篇 第六章  继承与建模高级应用 现在,你应该对实体框架中基本的建模有了一定的了解,本章将帮助你解决许多常见的.复杂的建模问题,并解决你可能在现实中遇到的建模问题. 本章以多对多关系开始,这个类型的关系,无论是在现存系统还是新项目的建模中都非常普遍.接下来,我们会了解自引用关系,并探索获取嵌套对象图的各种策略.最后,本章以继承的高级建模和实体条件结束. 6-1  获取多对多关联中的链接表 问题

SpringCloud分布式微服务b2b2c电子商务(一)构建第一个SpringBoot工程

spring boot 它的设计目的就是为例简化开发,开启了各种自动装配,你不想写各种配置文件,引入相关的依赖就能迅速搭建起一个web工程.它采用的是建立生产就绪的应用程序观点,优先于配置的惯例. 可能你有很多理由不放弃SSM,SSH,但是当你一旦使用了springboot ,你会觉得一切变得简单了,配置变的简单了.编码变的简单了,部署变的简单了,感觉自己健步如飞,开发速度大大提高了.了解springcloud架构可以加求求:三五三六二四七二五九,就好比,当你用了IDEA,你会觉得再也回不到Ec

Springcloud分布式微服务多用户商城系统b2b2c-Spring Cloud常见问题

在使用Spring Cloud的过程中,难免会遇到一些问题.所以对Spring Cloud的常用问题做一些总结.需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码:三五三六二四七二五九 一.Eureka常见问题 Eureka 注册服务慢 默认情况下,服务注册到Eureka Server的过程较慢.在开发或测试时,常常希望能够加速这一过程,从而提升工作效率. 该问题的原因及解决方案: 服务的注册涉及周期性心跳,默认30秒一次(通过客户端配置的service

angular2系列教程(六)升级装备、pipe

今天,我们要讲的是angualr2的pipe这个知识点,但是在这之前我们需要升级一下我们的装备,因为之前的装备太“寒酸”了. 例子 这个例子包含两个pipe,一个是stateful,一个是stateless,是直接复制官方的例子.本例子还包含了我对AngularClass/angular2-webpack-starter这个牛逼starter的改写,我会详细讲解配置. 源代码 没有测试 AngularClass/angular2-webpack-starter 这里面包含了Angular 2 (

《Entity Framework 6 Recipes》中文翻译系列 (37) ------ 第六章 继承与建模高级应用之独立关联与外键关联

翻译的初衷以及为什么选择<Entity Framework 6 Recipes>来学习,请看本系列开篇 6-13  在基类中应用条件 问题 你想从一个已存在的模型中的实体派生一个新的实体,允许基类被实例化. 解决方案 假设你有如图6-20所示的模型. 图6-20 包含Invoice实体的模型 这个模型只包含一个单独的实体Invoice(发货单).我们想从Invoice派生一个新的实体,它表示删除掉的发货单.这将允许我们以更清晰的业务逻辑来分别对有效的发货单和已删除掉的发货进行不同的操作.按下面