Java的HTTP服务端响应式编程

为什么要响应式编程?

传统的Servlet模型走到了尽头

传统的Java服务器编程遵循的是J2EE的Servlet规范,是一种基于线程的模型:每一次http请求都由一个线程来处理。

线程模型的缺陷在于,每一条线程都要自行处理套接字的读写操作。对于大部分请求来讲,本地处理请求的速度很快,请求的读取和返回是最耗时间的。也就是说大量的线程浪费在了远程连接上,而没有发挥出计算能力。但是需要注意一点,线程的创建是有开销的,每一条线程都需要独立的内存资源。JVM里的-Xss参数就是用来调整线程堆栈大小的。而JVM堆的总大小局限在了-Xmx参数上,因此一个正在运行的JVM服务器能够同时运行的线程数是固定的。

即便通过调整JVM参数,使其能够运行更多线程。但是JVM的线程会映射成为操作系统的用户线程,而操作系统依然只能调度有限数量的线程。例如,Linux系统可以参考这里的讨论:Maximum number of threads per process in Linux?

此外,大量线程在切换的时候,也会产生上下文加载卸载的开销,同样会降低系统的性能。

可伸缩 IO

Doug Lea大神有一篇很经典的PPTScalable IO in Java讲述了一个更为优秀的服务器模型。

一个可伸缩的网络服务系统应当满足以下条件:

  1. 能够随着计算资源(CPU、内存、磁盘容量、网络带宽等)的增加提高负载能力。
  2. 当网络负载增加超过能力的时候,能够优雅降级,避免直接崩溃。例如,拒绝为超过能力范围的请求提供服务,但对于能力范围内的请求,依然提供服务。当流量洪峰过去之后,依然能够正常运行。
  3. 当然高可用、高性能依然是必须的:例如低响应延迟、随负载变化请求或释放计算资源等。

作者给出的解决方案就是Reactor模式。

Reactor模式将耗时的IO资源封装为handle对象。handle对象注册在操作系统的内核里,当对象满足一定的条件时(可读或者可写),才会处理handle对象。在Reactor模式中,同步多路复用器负责处理handle对象的状态变更,当满足条件时,会调用handle对象注册时提供的回调函数。

同步多路复用器在一个单独的线程里专门处理IO链接。当请求读取完毕之后,任务提交至工作线程池完成请求的解码、处理、编码等工作,最后将由多路复用器负责将结果返回给客户端,而池内线程继续处理下一个任务。相比JDK1.5之前的对每一次请求新建一个线程的方式,线程池能够实现线程复用,降低创建回收线程的开销,在应对密集计算负载的时候有更好的表现。同时,在多个线程上分别部署一个同步多路复用器,也可以更好地利用多核CPU的处理能力。

这样,线程的任务分工就很明确,分别专门处理IO密集任务和专门处理CPU密集任务。

NIO普及艰难

从最早的select到后来Linux的epoll和BSD的Kqueue,操作系统的多路复用性能一直在不断增强。

JDK 1.4引入了NIO模块,可以屏蔽了操作系统层面的细节,将各个系统的多路复用API做了同一封装。JDK的NIO有以下几个核心组件:

  • Buffer,一种容量在创建时被固定的数据容器
  • Charsets,负责数据的编解码工作
  • Channels,对远程连接的抽象
  • Selector,多路复用选择器

在JVM之外的世界里,多路复用通过Nginx、基于V8引擎的Node.js早就大放异彩。但是Java NIO在生产环境里的发展却很慢。例如,Tomcat直到2016年发布8.5版本的时候,才彻底移除BIO连接器,完全拥抱NIO。

JDK NIO主要有这样几个问题比较麻烦:

  1. 首先是NIO为了提高数据收发性能,可以创建DirectBuffer对象。该对象的内存开辟在JVM堆之外,无法通过正常的GC收集器来回收,只能在JVM的老年代触发全量GC的时候回收。而全量GC往往导致系统卡顿,降低响应效率。如果被动等待老年代区域自行触发全量GC,又有可能造成堆外内存溢出。两者之间的矛盾需要在开发的时候小心的平衡。
  2. 其次就是,JDK1.8依然存在的epoll bug:若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%。

Netty才是NIO该有的水准

作为一个第三方框架,Netty做到了JDK本应做到的事情。

Netty的数据容器ByteBuf更为优秀

ByteBuf同时维护两个索引:读索引和写索引。从而保证容器对象能够同时适配读写同时进行的场景。而NIO的Buffer却需要执行一次flip操作来适应读写场景的切换。同时ByteBuf容器使用引用计数来手工管理,可以在引用计数归零时通过反射调用jdk.internal.ref.Cleaner来回收内存,避免泄露。在GC低效的时候,选择使用手工方式来管理内存,完全没问题。

Netty的API封装度更高

观察一下Netty官网Tutorial给出的demo,只要几十行代码就完成了一个具备Reactor模式的服务器。ServerBootstrap的group方法定义了主套接字和子套接字的处理方式,例中使用的NioEventLoopGroup类为Java NIO + 多线程的实现方式。对于NIO的epoll bug,NioEventLoopGroup的解决方案是rebuildSelectors对象方法。这个方法允许在selector失效时重建新的selector,将旧的释放掉。此外,Netty还通过JNI实现了自己的EpollEventLoopGroup,规避了NIO版本的bug。

Netty使用责任链模式实现了对server进出站消息的处理,使得server的代码能够更好的扩展和维护。

Netty在生产领域得到大量应用,Hadoop Avro、Dubbo、RocketMQ、Undertow等广泛应用于生产领域的产品的通信组件都选择了Netty作为基础,并经受住了考验。

Netty是一个优秀的异步通信框架,但是主要应用在基础组件中。因为Netty向开发者暴露出大量的细节,对于业务系统的开发仍然形成了困扰,所以没法得到进一步的普及。

举个例子。Netty使用ChannelFuture来接收传入的请求。相比于其继承的java.util.concurrent.Future来说,ChannelFuture可以添加一组GenericFutureListener来管理对象状态,避免了反复对Future对象状态的询问。这是个进步。但是,这些Listener都带来了另一个问题——Callback hell。而嵌套的回调代码往往难以维护。

对于Callback hell,我们可以做什么

Netty做一个优秀的基础组件就很好了。业务层面的问题就让我们用业务层面的API来解决。

Java API的适应性不佳

JDK7以前的异步代码难以组织

在JDK7以及之前,Java多线程的编程工具主要就是Thread、ExecutorService、Future以及相关的同步工具,实现出来的代码较为繁琐、且性能不高。

Thread

举个例子A,考虑一个场景有X、P、Q三个逻辑需要执行,其中X的执行需要在P、Q一起完成之后才启动执行。

如果使用Thread,那么代码会是这个样子:

/* 创建线程 */
Thread a = new Thread(new Runnable() {
    @Override
    public void run() {
        /* P逻辑 */
    }
});

Thread b = new Thread(new Runnable() {
    @Override
    public void run() {
        /* Q逻辑 */
    }
});

/* 启动线程 */
a.start();
b.start();

/* 等候a、b线程执行结束 */
try {
    a.join();
    b.join();
} catch (InterruptedException e) {
    e.printStackTrace();
}

/* 启动X逻辑的执行 */
Thread c = new Thread(new Runnable() {
    @Override
    public void run() {
        /* X逻辑 */
    }
});
c.start();

...

上面这个代码,先不论线程创建的开销,单从形式上看,线程内部的执行逻辑、线程本身的调度逻辑,还有必须捕获的InterruptedException的异常处理逻辑混杂在一起,整体很混乱。假想一下,当业务逻辑填充在其中的时候,代码更难维护。

ThreadPoolExecutor、Future

ThreadPoolExecutor和Future有助于实现线程复用,但对于代码逻辑的规范没什么帮助。

ExecutorService pool = Executors.newCachedThreadPool();
Future<?> a = pool.submit(new Runnable() {
    @Override
    public void run() {
        /* P逻辑 */
    }
});
Future<?> b = pool.submit(new Runnable() {
    @Override
    public void run() {
        /* Q逻辑 */
    }
});

/* 获取线程执行结果
 * 依然要捕获异常,处理逻辑
 */
try {
    a.get();
    b.get();
    Future<?> c = pool.submit(new Runnable() {
        @Override
        public void run() {
            /* X逻辑 */
        }
    });
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}

JDK8代码可读性有了显著提高

JDK8借鉴了相当多的函数式编程的特点,提供了几样很称手的工具。

CompleteableFuture和ForkJoinPool

如果要用CompleteableFuture实现上一个例子,可以这样写。

CompletableFuture<?> a = CompletableFuture.runAsync(() -> {
    /* P逻辑 */
}).exceptionally(ex -> {
    /* 异常处理逻辑 */
    return ...;
});
CompletableFuture<?> b = CompletableFuture.runAsync(() -> {
    /* Q逻辑 */
});
CompletableFuture<?> c = CompletableFuture.allOf(a, b).thenRun(() -> {
    /* X逻辑 */
});

有了lambda表达式的加持,例中的代码整体以线程内部逻辑为主,调度逻辑通过allOf()、thenRun()等方法名直观地展示出来。特别是可选的异常捕获逻辑,更是使得代码可读性得到了极大的提高。

需要注意的是,CompletableFuture是可以使用指定ExecutorService来执行的。如果像上例那样没有指定ExecutorService对象,那么会默认使用ForkJoinPool里的静态对象commonPool来执行。而ForkJoinPool.commonPool作为一个JVM实例中唯一的对象,也是Stream并发流的执行器,因此应当尽量保证CompletableFuture里的逻辑不会阻塞线程。如果无法规避,可以使用ManagedBlocker来降低影响。

ForkJoinPool是JDK1.7提供的并发线程池,可以很好地应对计算密集型并发任务,特别适用于可以“分-治”的任务。传统的ThreadPoolExecutor需要指定线程池里的线程数量,而ForkJoinPool使用了一个相似但更有弹性的概念——“并发度”。并发度指的是池内的活跃线程数。对于可能的阻塞任务,ForkJoinPool设计了一个ManagedBlocker接口。当池内线程执行到ForkJoinPool.managedBlock(ForkJoinPool.ManagedBlocker blocker)方法时,线程池会新建一个线程去执行队列里的其他任务,并轮询该对象的isReleasable方法,决定是否恢复线程继续运行。JDK1.7里的Phaser类源码用到了这个方法。

关于CompleteableFuture的用法,推荐看看这篇博客:理解CompletableFuture,总结的很好。

而对于ForkJoinPool,可以看看这篇博客:Java 并发编程笔记:如何使用 ForkJoinPool 以及原理

Stream

Stream流也是JDK8引入的一个很好的编程工具。

Stream对象通常通过Iterator、Collection来构造。也可以用StreamSupport的stream静态方法来创建自定义行为的实例。

Stream流对象采用链式编程风格,可以制定一系列对流的定制行为,例如过滤、排序、转化、迭代,最后产生结果。看个例子。

List<Integer> intList = List.of(1, 2, 3);

List<String> strList = intList.stream()
        .filter(k -> k>1)
        .map(String::valueOf)
        .collect(Collectors.toList());

上面这段代码中,intList通过stream方法获取到流对象,然后筛选出大于1的元素,并通过String的valueOf静态方法生成String对象,最后将各个String对象收集为一个列表strList。就像CompletableFuture的方法名一样,Stream的方法名都是自描述的,使得代码可读性极佳。

除此之外,Stream流的计算还是惰性的。Stream流对象的方法大致分为两种:

  • 中间方法,例如filter、map等对流的改变
  • 终结方法,例如collect、forEach等可以结束流

只有在执行终结方法的时候,流的计算才会真正执行。之前的中间方法,都作为步骤记录下来,但没有实时地执行修改操作。

如果将例子里的stream方法修改为parallelStream,那么得到的流对象就是一个并发流,而且总在ForkJoinPool.commonPool中执行。

关于Stream,极力推荐Brian Goetz大神的系列文章Java Streams

还有一点问题

ForkJoinPool是一款强大的线程池组件,只要使用的得当,线程池总会保持一个合理的并发度,充分利用计算资源。

但是,CompleteableFuture也好,Stream也好,他们都存在一个相同的问题:无法通过后端线程池的负载变化,来调整前端的调用压力。打比方说,当后端的ForkJoinPool.commonPool在全力运算而且队列里有大量的任务排队时,新提交的任务很可能会有很高的响应延迟,但是前端的CompleteableFuture或者Stream没有途径去获取这样一个状态,来延缓任务的提交。这种情况就违背了“响应式系统”的“灵敏性”要求。

来自第三方API的福音

Reactive Streams

Reactive Streams是一套标准,定义了一个运行于JVM平台上的响应式编程框架实现所应该具备的行

为。

Reactive Streams规范衍生自“观察者模式”。将前后依赖的逻辑流,拆解为事件和订阅者。只有当事件发生变更时,感兴趣的观察者才随之执行随后的逻辑。所以说,Reactive Stream和JDK的Stream的理念有点接近,两者都是注重对数据流的控制。将紧耦合的逻辑流拆分为“订阅-发布”方式其实是一大进步。代码变得维护性更强,而且很容易随着业务的需要使用消息驱动模式拆解。

Reactive Streams规范定义了四种接口:

  • Publisher,负责生产数据流,每一个订阅者都会调用subscribe方法来订阅消息。
  • Subscriber,就是订阅者。
  • Subscription,其实就是一个订单选项,相当于饭馆里的菜单,由发布者传递给订阅者。
  • Processor,处于数据流的中间位置,即是订阅者,也是新数据流的生产者。

当Subscriber调用Publisher.subscribe方法订阅消息时,Publisher就会调用Subscriber的onSubscribe方法,回传一个Subscription菜单。

Subscription菜单包含两个选择:

  1. 一个是request方法,对数据流的请求,参数为所请求的数据流的数量,最大为Long.MAX_VALUE;
  2. 另一个是cancel方法,对数据流订阅的取消,需要注意的是数据流或许会继续发送一段时间,以满足之前的请求调用。

一个Subscription对象只能由同一个Subscriber调用,所以不存在对象共享的问题。因此即便Subscription对象有状态,也不会危及逻辑链路的线程安全。

订阅者Subscriber还需要定义三种行为:

  1. onNext,接受到数据流之后的执行逻辑;
  2. onError,当发布出现错误的时候如何应对;
  3. onComplete,当订阅的数据流发送完毕之后的行为。

相比于Future、Thread那样将业务逻辑和异常处理逻辑混杂在一起,Subscriber将其分别定义在三个方法里,代码显得更为清晰。java.util.Observer(在JDK9中开始废弃)只定义了update方法,相当于这里的onNext方法,缺少对流整体的管理和对异常的处理。特别是,异常如果随着调用链传递出去,调试定位会非常麻烦。因此要重视onError方法,尽可能在订阅者内部就处理这个异常。

尽管Reactive Streams规范和Stream都关注数据流,但两者有一个显著的区别。那就是Stream是基于生产一方的,生产者有多大能力,Stream就制造多少数据流。而Reactive Streams规范是基于消费者的。逻辑链下游可以通过对request方法参数的变更,通知上游调整生产数据流的速度。从而实现了“响应式系统”的“灵敏性”要求。这在响应式编程中,用术语“背压”来描述。

Reactive Streams规范仅仅是一个标准,其实现又依赖其他组织的成果。其意义在于各家实现能够通过这样一个统一的接口,相互调用,有助于响应式框架生态的良性发展。Reactive Streams规范虽然是Netflix、Pivatol、RedHat等第三方大厂合作推出的,但已经随着JDK9的发布收编为官方API,位于java.util.concurrent.Flow之内。JDK8也可以在项目中直接集成相应的模块调用。

顺便吐槽一下,JDK9官方文档给出的demo里的数据流居然从Subscription里生产出来,吓得我反复确认了一下Reactive Streams官方规范。

RxJava2

RxJava由Netfilx维护,实现了ReactiveX API规范。该规范有很多语言实现,生态很丰富。

Rx范式最先是微软在.NET平台上实现的。2014年11月,Netfilx将Rx移植到JVM平台,发布了1.0稳定版本。而Reactive Streams规范是在2015年首次发布,2017年才形成稳定版本。所以RxJava 1.x和Reactive Streams规范有很大出入。1.x版本迭代至2018年3月的1.3.8版本时,宣布停止维护。

Netflix在2016年11月发布2.0.1稳定版本,实现了和Reactive Streams规范的兼容。2.x如今是官方的推荐版本。

RxJava框架里主要有这些概念:

  • Observable与Observer。RxJava直接复用了“观察者模式”里的概念,有助于更快地被开发社区接受。Observeble和Publisher有一点差异:前者有“冷热”的区分,“冷”表示只有订阅的时候才发布消息流,“热”表示消息流的发布与时候有对象订阅无关。Publisher更像是“冷”的Observeble。
  • Operators,也就是操作符。RxJava和JDK Stream类似,但设计了更多的自描述的函数方法,并同样实现了链式编程。这些方法包括但不限于转换、过滤、结合等等。
  • Single,是一种特殊的Observable。一般的Observable能够产生数据流,而Single只能产生一个数据。所以Single不需要onNext、onComplete方法,而是用一个onSuccess取而代之。
  • Subject,注意这个不是事件,而是介于Observable与Observer之间的中介对象,类似于Reactive Streams规范里的Processor。
  • Scheduler,是一类线程池,用于处理并发任务。RxJava默认执行在主线程上,可以通过observeOn/subscribeOn方法来异步调用阻塞式任务。

RxJava 2.x在Zuul 2、Hystrix、Jersey等项目都有使用,在生产领域已经得到了应用。

Reactor3

Reactor3有Pivotal来开发维护,也就是Spring的同门师弟。

整体上,Reactor3框架里的概念和RxJava都是类似的。Mono和Flux都等同于RxJava的Single和Observable。Reactor3也使用自描述的操作符函数实现链式编程。

RxJava 2.x支持JVM 6+平台,对老旧项目很友好;而Reactor3要求必须是JVM8+。所以说,如果是新项目,使用Reactor3更好,因为它使用了很多新的API,支撑很多函数式接口,代码可读性维护性都更好。

背靠Spring大树,Reactor3的设计目标是服务器端的Java项目。Reactor社区针对服务器端,不断推出新产品,例如Reactor Netty、Reactor Kafka等等。但如果是Android项目,RxJava2更为合适(来自Reactor3官方文档的建议)。

老实讲,Reactor3的文档内容更丰富。

什么是响应式系统

响应式宣言里面说的很清楚,一个响应式系统应当是:

  • 灵敏的:能够及时响应
  • 有回复性的:即使遇到故障,也能够自行恢复、并产生回复
  • 可伸缩的:能够随着工作负载的变化,自行调用或释放计算资源;也能够随着计算资源的变化,相应的调整工作负载能力
  • 消息驱动的:显式的消息传递能够实现系统各组件解耦,各类组件自行管理资源调度。

构建响应式Web系统

Vert.X

Vert.X目前由Eclipse基金会维护,打造了一整套响应式Web系统开发环境,包括数据库管理、消息队列、微服务、权限认证、集群管理器、Devops等等,生态很丰富。

Vert.X Core框架基于Netty开发,是一种事件驱动框架:每当事件可行时都会调用其对应的handler。在Vert.X里,有专门的线程负责调用handler,被称作eventloop。每一个Vert.X实例都维护了多个eventloop。

Vert.X Core框架有两个重要的概念:Verticle和Event Bus。

Verticle

Verticle类似于Actor模型的Actor角色。

Actor是什么?

这里泛泛的说一下吧。

Actor模型主要针对于分布式计算系统。Actor是其中最基本的计算单元。每一个Actor都有一个私有的消息队列。Actor之间的通信依靠发送消息。每一个Actor都可以并发地做到:

  1. 向其他Actor发送消息
  2. 创建新的Actor
  3. 指定当接收到下一个消息时的行为

Verticle之间的消息传递依赖于下面要说的Event Bus。

Vert.X为Verticle的部署提供了高可用特性:在Vert.X集群中,如果一个节点的上运行的Veticle实例失效,其他节点就会重新部署一份新的Verticle实例。

Verticle只是Vert.X提供的一种方案,并非强制使用。

Event Bus

Event Bus是Vert.X框架的中枢系统,能够实现系统中各组件的消息传递和handler的注册与注销。其消息传递既支持“订阅-发布”模式,也支持“请求-响应”模式。

当多个Vert.X实例组成集群的时候,各系统的Event Bus能够组成一个统一的分布式Event Bus。各Event Bus节点相互之间通过TCP协议通信,没有依赖Cluster Manager。这是一种可以实现节点发现,提供了分布式基础组件(锁、计数器、map)等的组件。

Spring WebFlux

Spring5的亮点之一就是响应式框架Spring WebFlux,使用自家的Reactor3开发,但同样支持RxJava。

Spring WebFlux的默认服务端容器是Reactor Netty,也可以使用Undertow或者Tomcat、Jetty的实现了Servlet 3.1 非阻塞API接口的版本。Spring WebFlux分别为这些容器实现了与Reactive Streams规范实现框架交互的适配器(Adapter),没有向用户层暴露Servlet API。

Spring WebFlux的注解方式和Spring MVC很像。这有助于开发团队快速适应新框架。而且Spring WebFlux兼容Tomcat、Jetty,有助于项目运维工作的稳定性。

但如果是新的项目、新的团队,给我,我大概会选Vert.X,因为Event Bus确实很吸引人。

参考资料

延伸阅读

原文地址:https://www.cnblogs.com/rim99/p/9026500.html

时间: 2024-10-21 07:03:21

Java的HTTP服务端响应式编程的相关文章

Spring5.0响应式编程入门

引言? 响应式编程是一种面向数据流和变化传播的编程范式.使用它可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播.我们可以使用声明的方式构建应用程序的能力,形成更加敏感和有弹性的应用,所以Spring 5在其核心框架中增加了反应系统,已经开始向声明式编程的范式转变. 响应式编程的优势 提高了代码的可读性,因此开发人员只需要关注定义业务逻辑. 在高并发环境中,可以自然的处理消息. 可以控制生产者和消费者之间的流量,避免内存不足. 对于一个或多个线程,

什么是函数响应式编程(Java&amp;Android版本)

什么是函数响应式编程(Java&Android版本) 原文链接:http://www.bignerdranch.com/blog/what-is-functional-reactive-programming/ 函数响应式编程(FRP)为解决现代编程问题提供了全新的视角.一旦理解它,可以极大地简化你的项目,特别是处理嵌套回调的异步事件,复杂的列表过滤和变换,或者时间相关问题. 我将尽量跳过对函数响应式编程学院式的解释(网络上已经有很多),并重点从实用的角度帮你理解什么是函数响应式编程,以及工作中

Java响应式编程Springboot WebFlux基础与实战

在这里我个人推荐的学习途径如下:先学习jdk8的lambda表达式和stream流编程,了解函数式编程的知识点和思想,接着学习jdk9的响应式流flux,理解响应式流概念,理解背压和实现机制.这2者学好之后,很容易理解webflux的基石reactor,再学习webflux就水到渠成了! 这里我记录了自己的学习之路,列出了每一块的学习重点,除了API的知识点学习之外,更加重要的了解底层运行机制和实现原理.对于我个人来说,学习技术如果不了解原理,知识点需要死记硬背,而了解了底层机制之后,不但不需要

响应式编程,是明智的选择

相信你们在学习响应式编程这个新技术的时候都会充满了好奇,特别是它的一些变体,例如:Rx系列.Bacon.js.RAC等等…… 在缺乏优秀资料的前提下,响应式编程的学习过程将满是荆棘.起初,我试图寻找一些教程,却只找到少量的实践指南,而且它们讲的都非常浅显,从来没人接受围绕响应式编程建立一个完整知识体系的挑战.此外,官方文档通常也不能很好地帮助你理解某些函数,因为它们通常看起来很绕,不信请看这里: Rx.Observable.prototype.flatMapLatest(selector, [t

[译] Swift 的响应式编程

原文  https://github.com/bboyfeiyu/iOS-tech-frontier/blob/master/issue-3/Swift的响应式编程.md 原文链接 : Reactive Swift 原文作者 : Agnes Vasarhelyi 译文出自 : 开发技术前线 www.devtf.cn 译者 :Mr.Simple 校对者:Lollypo 状态 : 完成 让我们首先回到Apple刚推出Objective-C的继任者-Swift的时候,那真是一个非比寻常的时刻. Sir

Android 响应式编程 RxJava2 完全解析

转载请注明出处:http://blog.csdn.net/smartbetter/article/details/68941859 使用了 RxJava2 有一段时间了,深深感受到了其"牛逼"之处.下面,就从 RxJava2 的基础开始,一步步与大家分享一下这个强大的异步库的用法!RxJava 是 一个在 Java VM 上使用可观测的序列来组成异步的.基于事件的程序的库,也就是用于实现异步操作的库. 1.RxJava2 基础 RxJava可以浓缩为异步两个字,其核心的东西不外乎两个,

(10)响应式宣言、响应式系统与响应式编程——响应式Spring的道法术器

本系列文章索引<响应式Spring的道法术器>前情提要 响应式编程 | 响应式流 1.5 响应式系统 1.5.1 响应式宣言 关注"响应式"的朋友不难搜索到关于"响应式宣言"的介绍,先上图: 这张图凝聚了许多大神的智慧和经验,见官网,中文版官网,如果你认可这个宣言的内容,还可以签下你的大名.虽然这些内容多概念而少实战,让人感觉是看教科书,但是字字千金,不时看一看都会有新的体会和收获. 这也是新时代男朋友的行为准则: Responsive,要及时响应,24

SpringBoot使用WebFlux响应式编程操作数据库

这一篇文章介绍SpringBoot使用WebFlux响应式编程操作MongoDb数据库. 前言 在之前一篇简单介绍了WebFlux响应式编程的操作,我们在来看一下下图,可以看到,在目前的Spring WebFlux还没有支持类似Mysql这样的关系型数据库,所以本文以MongoDb数据库为例. SpringBoot使用WebFlux响应式编程操作数据库 接下来介绍SpringBoot使用WebFlux响应式编程操作MongoDb数据库. 新建项目 pom文件 新建项目,在项目中加入webflux

Spring Boot (十四): 响应式编程以及 Spring Boot Webflux 快速入门

1. 什么是响应式编程 在计算机中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的编程范式.这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播. 例如,在命令式编程环境中,a=b+c 表示将表达式的结果赋给 a,而之后改变 b 或 c 的值不会影响 a .但在响应式编程中,a 的值会随着 b 或 c 的更新而更新. 响应式编程是基于异步和事件驱动的非阻塞程序,只需要在程序内启动少量线程扩