spring reactor多线程配置

 1:引入jar包,这里使用的是maven,只需要引用一个jar包就行了

  <DEPENDENCY>

  <GROUPID>org.projectreactor</GROUPID>

  reactor-spring</ARTIFACTID>

  <VERSION>1.0.0.RELEASE</VERSION>

  </DEPENDENCY>

  2:写一个reactor的配置的bean

  @Configuration

  @EnableReactor

  public class ReactorConfig {

  @Bean(name = "rootReactor")

  public Reactor rootReactor(Environment env) {

  return Reactors.reactor()。env(env)。get();

  }

  @Bean(name = "reportReactor")

  public Reactor reportReactor(Environment env) {

  return Reactors.reactor()。env(env)。get();

  }

  }

  3:事件的处理类,一般是以Hander结尾,方便区分:

  @Component

  public class IndexHandler {

  @Autowired

  @Qualifier("rootReactor")

  private Reactor reactor;

  @Selector(value = "hello", reactor = "@rootReactor")

  public void handleTestTopic(Event<STRING> evt) throws Exception {

  System.out.println("************");

  }

  }

  4:最后就是在controller或者service里面通知新开线程了:

  @Controller

  public class IndexController {

  @Autowired

  @Qualifier("rootReactor")

  private Reactor r;

  @RequestMapping("chen")

  @Transactional

  public void chen() {

  r.notify("hello", Event.wrap("你好"));

  }

  }

---------------------------------------------------------------------------------------

转自:
Rx (Reactive Extensions)介绍

Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。

目前reactor是作为Spring.io核心包下面项目。

Reactor 是一个基础性库包
–定位在用户级和低级之间的灰色区域的抽象。
– 能够在Reactor上建立组件和应用核心
– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构

Reactor的应用是reactive的。
– 属于Reactive Extensions in .NET
– 类似Netflix RxJava
– 观察者模式Observer pattern

Reactor应用基于一个Selector发送事件。
– 象一个消息系统中routing topic, 但是它是一个对象
– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑

Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:

Reactor演示代码

Environment env = new Environment();
Reactor reactor = Reactors.reactor()
                               .env(env)
                               .dispatcher(RING_BUFFER)
                               .get();

reactor.on($(“topic”), (Event<String> ev) → {
                             System.out.println(“Hello “ + ev.getData());
                  });

reactor.notify(“topic”, Event.wrap(“John Doe”));

RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。

reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。

Reactor 的分发器 Dispatchers 类似Akka的分发器
● 分发器管理任务执行,有下面几种:
– ThreadPoolExecutorDispatcher
● 标准的 ThreadPoolExecutor

– BlockingQueueDispatcher
● 能够进行事件轮询

– RingBufferDispatcher
● LMAX Disruptor RingBuffer

– SynchronousDispatcher

Reactor的 Selectors
● Selectors 是一个等式的左边。
– 一个Selector能够被任何对象使用$(obj)创建
(或者: Selectors.object(obj))
– 一个Selector能够从匹配的key中释放数据
– Predicate<T> Selectors 能够创建匹配特定领域准则
(domain-specific criteria)

比如RegexSelector:
reactor.on(R(“some.(.+)”), (Event<String> ev) → {
// s will be ‘topic‘
String s = ev.getHeaders().get(“group1”);
});

reactor.notify(“some.topic”, Event.wrap(“John Doe”));

其中R(“some.(.*)”)匹配事件发送者“some.topic”。

UriTemplateSelector能够从URI匹配字符串:
reactor.on(U(“/some/{topic}”), (Event<String> ev) → {
// s will be ‘topic‘
String s = ev.getHeaders().get(“topic”);
});
reactor.notify(“/some/topic”, Event.wrap(“John Doe”));

Reactor 的Stream
● Streams允许基于数据的函数组合composition 
– Callback++
– 类似Netflix RxJava Observable, JDK 8 Stream

Stream<String> str;
str.map(String::toUpperCase)
     .filter(new Predicate<String>() { public boolean test(String s) { … }
     })
    .consume(s → log.info(“consumed string {}”, s));

Reactor 的 Promise
允许在Stream之间分享函数

Promise<String> p;
String s = p
        .onSuccess(s → log.info(“consumed string {}”, s))
        .onFailure(t → log.error(t.getMessage(), t))
        .onComplete(t → log.info(“complete”))
        .await(5, SECONDS);

p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s));

Reactor 的 Processor
干脆直接将Disruptor API转为Reactor API
对于#UberFastData有超级快性能

Processor<Buffer> proc;
Operation<Buffer> op = proc.prepare();
op.get().append(data).flip();
op.commit();
proc.batch(512, buff → buff.append(data).flip());

与Spring整合:
首先使用@EnableReactor 激活reactor

@Configuration
@EnableReactor public class ReactorConfiguration {

  @Bean public Reactor input(Environment env) { return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
   }

   @Bean public Reactor output(Environment env) { return Reactors.reactor().env(env)
                  .dispatcher(RING_BUFFER).get();
}

然后在监听者或观察者写入:

@Component public class SimpleHandler {
    @Autowired private Reactor reactor;

    @Selector(“test.topic”) public void onTestTopic(String s) { // Handle data }
} 

reactor的groovy整合:

@CompileStatic
def welcome(){
    reactor.on(‘greetings‘) { String s ->
            reply “hello $s”
            reply “how are you?”
}
reactor.notify ‘greetings‘, ‘Jon‘
           reactor.send(‘greetings‘, ‘Stephane‘){
                  println it
            cancel()
           }
}

时间: 2024-12-30 21:04:57

spring reactor多线程配置的相关文章

spring定时任务.线程池,自定义多线程配置

定时任务及多线程配置xml <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springfra

spring(16)------spring的数据源配置

在spring中,通过XML的形式实现数据源的注入有三种形式. 一,使用spring自带的DriverManagerDataSource 使用DriverManagerDataSource配置数据源与直接使用jdbc在效率上没有多大的区别,使用DriverManagerDataSource配置数据源 的代码实例如下,这里重点研究spring的数据源配置,就采用spring编程式事务处理来来研究数据源的配置. 所需要的jar包和spring编程式配置:http://blog.csdn.net/yh

Spring的bean配置

IOC其实是从我们平常new一个对象的对立面来说的,我们平常使用的对象一般直接使用关键字类new一个对象,患处很显然,使用new那么就表示当前模块已经不知不觉和new出的对象耦合了,而我们通常都是更高层次的抽象模块调用底层实现模块,这样就产生模块依赖于具体的实现,这与我们JAVA中提倡的面向接口面向抽象编程是相冲突的,而且这样做也带来系统的模块架构问题.很简单的例子,在进行数据库操作的时候,总是业务层调用DAO层,DAO一般采用接口开发,这在一定程度上满足了松耦合,使业务逻辑层不依赖于具体的DA

Spring之——c3p0配置详解

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/51162560 今天,我们就来详细谈谈Spring中的c3p0配置问题,好了,不耽搁大家的时间,我们直接进入主题,请看下面的具体配置文件信息: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/

基于注解的Spring多数据源配置和使用

前一段时间研究了一下spring多数据源的配置和使用,因为后期需要从多个数据源拉取数据,定时进行数据分析和报表统计.由于之前做过的项目都是单数据源的,没有遇到这种场景,所以也一直没有去了解过如何配置多数据源.后来发现其实基于spring来配置和使用多数据源还是比较简单的,因为spring框架已经预留了这样的接口可以方便数据源的切换.先看一下spring获取数据源的源码: 可以看到AbstractRoutingDataSource获取数据源之前会先调用determineCurrentLookupK

spring线程池配置

源自:http://zjriso.iteye.com/blog/771706 1.了解 TaskExecutor接口 Spring的TaskExecutor接口等同于java.util.concurrent.Executor接口. 实际上,它存在的主要原因是为了在使用线程池的时候,将对Java 5的依赖抽象出来. 这个接口只有一个方法execute(Runnable task),它根据线程池的语义和配置,来接受一个执行任务.最初创建TaskExecutor是为了在需要时给其他Spring组件提供

struts+spring action应配置为scope=&quot;prototype&quot;

truts+spring action应配置为scope="prototype" <bean id="personAction" scope="prototype" class="quickstart.action.PersonAction"> <constructor-arg ref="personService" /></bean> 在配置文件中,bean默认是单例模

Spring Boot自动配置原理(转)

第3章 Spring Boot自动配置原理 3.1 SpringBoot的核心组件模块 首先,我们来简单统计一下SpringBoot核心工程的源码java文件数量: 我们cd到spring-boot-autoconfigure工程根目录下.执行 $ tree | grep -c .java$ 模块 java文件数 spring-boot 551 spring-boot-actuator 423 spring-boot-autoconfigure 783 spring-boot-devtools

4、Spring Boot 自动配置原理

1.4 Spring Boot 自动配置原理 简介 spring boot自动配置功能可以根据不同情况来决定spring配置应该用哪个,不应该用哪个,举个例子: Spring的JdbcTemplate是不是在Classpath里面?如果是,并且DataSource也存在,就自动配置一个JdbcTemplate的Bean Thymeleaf是不是在Classpath里面?如果是,则自动配置Thymeleaf的模板解析器.视图解析器.模板引擎 那个这个是怎么实现的呢?原因就在于它利用了Spring的