gateway&reactive(响应式流)函数编程的webflux

  springcloud.gateway是springcloud2的全新项目,该项目提供了一个构建在spring生态之上的API网关,包括spring5,springboot2,projectReactor。gateway旨在提高一种简单而有效的途径来转发请求,并为他们提供横切关注点,如安全性,监控/指标和弹性。在之前springcloud提供的网关是zull,zuul基于servlet2.5,使用阻塞架构,不支持长连接。zuul和negix相似,除了编程语言不同,zuul已经发布了zuul2,支持长连接,非阻塞,但是springcloud还没有整合。

项目构建:

  引入gateway和eureka依赖,gateway的网关提供了路由配置,路由断言,过滤器等功能。

一, 路由断言:

  路由断言接口由函数式接口RouterFunction接口提供支持,通过RouterFunctions的静态route方法来实现,如下:

 1 public static <T extends ServerResponse> RouterFunction<T> route(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
 2         return new RouterFunctions.DefaultRouterFunction(predicate, handlerFunction);
 3     }
 4
 5 private static final class DefaultRouterFunction<T extends ServerResponse> extends RouterFunctions.AbstractRouterFunction<T> {
 6         private final RequestPredicate predicate;
 7         private final HandlerFunction<T> handlerFunction;
 8
 9         public DefaultRouterFunction(RequestPredicate predicate, HandlerFunction<T> handlerFunction) {
10             super(null);
11             Assert.notNull(predicate, "Predicate must not be null");
12             Assert.notNull(handlerFunction, "HandlerFunction must not be null");
13             this.predicate = predicate;
14             this.handlerFunction = handlerFunction;
15         }
16
17         public Mono<HandlerFunction<T>> route(ServerRequest request) {
18             if (this.predicate.test(request)) {
19                 if (RouterFunctions.logger.isTraceEnabled()) {
20                     String logPrefix = request.exchange().getLogPrefix();
21                     RouterFunctions.logger.trace(logPrefix + String.format("Matched %s", this.predicate));
22                 }
23
24                 return Mono.just(this.handlerFunction);
25             } else {
26                 return Mono.empty();
27             }
28         }
29
30         public void accept(RouterFunctions.Visitor visitor) {
31             visitor.route(this.predicate, this.handlerFunction);
32         }
33     }
    private abstract static class AbstractRouterFunction<T extends ServerResponse> implements RouterFunction<T> 

  而生成需要两个参数他们分别是:RequestPredicate  predicate和 HandlerFunction<T>  handlerFunction。

predicate由RequestPredicates.path()产生,handlerFunction是接口HandlerFunction<T>的函数实现类,该类只有一个方法:Mono<T> handle(ServerRequest var1)。

  所以添加路由断言的配置类方法如下:

1 @Bean//路由断言
2     public RouterFunction<ServerResponse> doRouterFunction(){
3         RouterFunction route = RouterFunctions.route(RequestPredicates.path("/test"),
4                 request -> ServerResponse.ok().body(BodyInserters.fromValue("this is ok")));
5         return route;
6     }

  此时路由断言的作用是当请求的路径是/test时,直接返回状态吗200,且响应体为this is ok。

为什么呢?

@FunctionalInterface
public interface HandlerFunction<T extends ServerResponse> {
    Mono<T> handle(ServerRequest var1);
}
下图的类都在org.springframework.web.reactive包下:

从上面可以看出,serverResponse持有serverHttpResponse,而serverHttpResponse持有HttpServletResponse。所以这就是webflux的reactive响应式流编程实现流程。

 二,过滤器:

  网关经常要做的是对路由请求进行过滤,对符合条件的请求进行一些操作,如增加请求头,增加请求参数,增加响应头和断路器等功能。这个功能通过函数接口RouteLocator实现,RouteLocator通过RouteLocatorBuilder来获得:

 1   @Bean//过滤器
 2     public RouteLocator customRouteLocator(RouteLocatorBuilder builder){
 3         return builder.routes().route(
 4
 5                 r -> r.path("/baidu")
 6                         .filters(f -> f.addResponseHeader("X-AnotherHeader","baz"))   7                   .uri("http://www.baidu.com")   8   9      ).build();  10 }

RouteLocatorBuilder.Builder.routes就是路由表,如下图:

具体类调用流程如下:

 1 public RouteLocatorBuilder.Builder routes() {
 2         return new RouteLocatorBuilder.Builder(this.context);
 3     }
 4
 5 public RouteLocatorBuilder.Builder route(Function<PredicateSpec, AsyncBuilder> fn) {
 6             AsyncBuilder routeBuilder = (AsyncBuilder)fn.apply((new RouteLocatorBuilder.RouteSpec(this)).randomId());
 7             this.add(routeBuilder);
 8             return this;
 9         }
10
11  void add(AsyncBuilder route) {
12             this.routes.add(route);
13         }
14
15 public RouteLocator build() {
16             return () -> {
17                 return Flux.fromIterable(this.routes).map((routeBuilder) -> {
18                     return routeBuilder.build();
19                 });
20             };
21         }

  上面第5行的意思是Builder.route(Function<PredicateSpec, AsyncBuilder> fn)函数需要一个AsyncBuilder,这个AsyncBuilder是org.springframework.cloud.gateway.route.Route.AsyncBuilder,即route的异步构造器,第6行得到了这个异步构造器,第七,八行加到了Builder.routes集合中,并返回自己,然后调用自己的build方法,Flux是Reactive提供的返回结果工具类,原来返回User的话,那现在就返回Mono<User>;原来返回List<User>的话,那现在就返回Flux<User>。第18行调用build构造route。

  我们需要一个asyncbuilder,所以需要实现Function接口,如下:

  r -> r.path("/baidu").filters(f -> f.addRequestHeader("X-AnotherHeader","baz")).uri("http://www.baidu.com")  上述方法体会最终得到一个asyncbuilder,然后开始上述带颜色部分的内容。上面的功能就是当请求路径中有baidu,就在返回头上加上一个属性,然后转到该http://www.baidu.com地址。

 三,网关与服务中心结合

 1 spring:
 2   application:
 3     name: gateway #hystrix服务类型
 4   cloud:
 5     gateway:
 6       locator:
 7         enabled: true #开启gateway和eureka的结合
 8       routes:
 9         - id: service_eureka-service1 #id
10           uri: lb://eureka-service1 #服务
11           predicates:
12           - Path=/service_eureka-service1/**  #匹配路径
13           filters:
14           - StripPrefix=1 #使用路径过滤器去掉第一部分前缀 即去除service_eureka-service1这一部分

下面是service1的controller:

  

启动service1和gateway两个服务,注册到eureka,然后访问:

ok,先到这里,源码后续再补充。

参考文章:https://blog.csdn.net/get_set/article/details/79480233 这兄弟的文章真不错。

原文地址:https://www.cnblogs.com/YsirSun/p/12552151.html

时间: 2024-11-09 01:14:13

gateway&reactive(响应式流)函数编程的webflux的相关文章

Reactive(2) 响应式流与制奶厂业务

再谈响应式 在前一篇文章从Reactive编程到"好莱坞"中,谈到了响应式的一些概念,讲的有些发散. 但仅仅还是停留在概念的层面,对于实战性的东西并没有涉及.所以大家看了后,或许还是有些不痛不痒. 响应式编程强调的是异步化.面向流的处理方式,这两者也并非凭空生出,而是从大量的技术实践中总结提炼出来的概念,就比如: 我们谈异步化,容易联想到 Java 异步IO(Asynchronized IO),而且习惯于将其和 BIO.NIO等概念来做对比. 殊不知,老早出现的 Swing 框架(Ja

JDK9新特性 Reactive Stream 响应式流

JDK9新特性 Reactive Stream 响应式流  本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor.  1.Reactive Stream 概念  Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范.响应式流从2013年开始,作为提供非阻

Reactive Stream 响应式流

初识Reactive Stream Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范.响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议. 它旨在解决处理元素流的问题--如何将元素流从发布者传递到订阅者,而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃.更确切地说,Reactive流目的是"找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流".

JVM平台上的响应式流(Reactive Streams)规范

// Reactive Streams // 响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准.大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上. 注:响应式流其实就是一个规范,本文讲解的正是这个规范,且这个规范已经被引入到JDK9里了. 后压:就是下游出现了问题,得不到解决时,这个问题就会逆流而上,继而影响上游. 如果一个路口红绿灯坏了造成堵车,如果不管的话,用不了太长时间,车就会堵到上一个路口,如果再不管的话,整条路都会被赌满. // JDK9里的j

(2)响应式流——响应式Spring的道法术器

本系列文章索引:<响应式Spring的道法术器>.前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑--为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的.比如如下两个需要面对的问题: Web 应用具有I/O密集的特点,I/O阻塞会带来比较大的性能损失或资源浪费,我们需要一种异步非阻塞的响应式的库,而Java Stream是一种同步API. 假设我们要搭建从数据层到前端的一个变化传递管道,可能会遇到数据层每秒上千次的数据更新,而显然

响应式Spring的道法术器(Spring WebFlux 快速上手 + 全面介绍)

1. Spring WebFlux 2小时快速入门 Spring 5 之使用Spring WebFlux开发响应式应用. lambda与函数式(15min) Reactor 3 响应式编程库(60min) Spring Webflux和Spring Data Reactive开发响应式应用(45min) 通过以上内容相信可以对Spring 5.0 推出的响应式开发有了初步的体会.如果希望有更加深入的了解,欢迎阅读下边的系列文章-- 2. 响应式Spring的道法术器 这个系列的文章是为了记录下自

SpringBoot2.0不容错过的新特性 WebFlux响应式编程

第1章 课程介绍 课程介绍及导学 第2章 函数式编程和lambda表达式 本章介绍函数式编程的概念,和lambda表达式的基础语法,并分析了惰性求值的应用和实现.最后同意反编译字节码,重点剖析了lambda表达式的底层实现原理 第3章 Stream流编程 本章介绍jdk8里面stream流编程的重要知识点,并剖析流的运行机制和实现原理 第4章 reactive stream 响应式流 本章介绍jdk9的响应式流的开发过程,重点讲解响应式流的4个接口,以及背压的概念和jdk实现背压的关键. 第5章

Java响应式编程 Springboot WebFlux基础与实战

第1章 课程介绍课程介绍及导学1-1 导学 第2章 函数式编程和lambda表达式本章介绍函数式编程的概念,和lambda表达式的基础语法,并分析了惰性求值的应用和实现.最后同意反编译字节码,重点剖析了lambda表达式的底层实现原理2-1 概念2-2 为什么要使用函数式编程-12-3 为什么要使用函数式编程-22-4 lambda初接触-12-5 lambda初接触-22-6 jdk8接口新特性-12-7 jdk8接口新特性-22-8 jdk8接口新特性-32-9 函数接口-12-10 函数接

[转]springboot2 webflux 响应式编程学习路径

原文链接 spring官方文档 springboot2 已经发布,其中最亮眼的非webflux响应式编程莫属了!响应式的weblfux可以支持高吞吐量,意味着使用相同的资源可以处理更加多的请求,毫无疑问将会成为未来技术的趋势,是必学的技术!很多人都看过相关的入门教程,但看完之后总觉得很迷糊,知其然不知道其所以然,包括我本人也有相同的疑惑.后面在研究和学习中发现,是我的学习路径不对,很多基本概念不熟悉,之前公司主打的jdk版本还是1.6/1.7,直接跳到运行在jdk8上的webflux,跨度太大,