1:引入jar包,这里使用的是maven,只需要引用一个jar包就行了
<DEPENDENCY>
<GROUPID>org.projectreactor</GROUPID>
reactor-spring</ARTIFACTID>
<VERSION>1.0.0.RELEASE</VERSION>
</DEPENDENCY>
2:写一个reactor的配置的bean
@Configuration
@EnableReactor
public class ReactorConfig {
@Bean(name = "rootReactor")
public Reactor rootReactor(Environment env) {
return Reactors.reactor()。env(env)。get();
}
@Bean(name = "reportReactor")
public Reactor reportReactor(Environment env) {
return Reactors.reactor()。env(env)。get();
}
}
3:事件的处理类,一般是以Hander结尾,方便区分:
@Component
public class IndexHandler {
@Autowired
@Qualifier("rootReactor")
private Reactor reactor;
@Selector(value = "hello", reactor = "@rootReactor")
public void handleTestTopic(Event<STRING> evt) throws Exception {
System.out.println("************");
}
}
4:最后就是在controller或者service里面通知新开线程了:
@Controller
public class IndexController {
@Autowired
@Qualifier("rootReactor")
private Reactor r;
@RequestMapping("chen")
@Transactional
public void chen() {
r.notify("hello", Event.wrap("你好"));
}
}
---------------------------------------------------------------------------------------
转自:
Rx (Reactive Extensions)介绍
Reactor - A Foundation for Reactive FastData Appli该文简单介绍了Spring reactor 1.0的基本特性。
目前reactor是作为Spring.io核心包下面项目。
Reactor 是一个基础性库包
–定位在用户级和低级之间的灰色区域的抽象。
– 能够在Reactor上建立组件和应用核心
– 驱动器 服务器和数据整合库,领域整合库,事件驱动架构
Reactor的应用是reactive的。
– 属于Reactive Extensions in .NET
– 类似Netflix RxJava
– 观察者模式Observer pattern
Reactor应用基于一个Selector发送事件。
– 象一个消息系统中routing topic, 但是它是一个对象
– 支持Regex, URI template, Class.isAssingableFrom, 定制逻辑
Reactor Core内部封装了LMAX Disruptor的RingBuffer,再通过Reactor-Spring等支持支持各种Spring应用,如下图:
Reactor演示代码
Environment env = new Environment(); Reactor reactor = Reactors.reactor() .env(env) .dispatcher(RING_BUFFER) .get(); reactor.on($(“topic”), (Event<String> ev) → { System.out.println(“Hello “ + ev.getData()); }); reactor.notify(“topic”, Event.wrap(“John Doe”)); |
RING_BUFFER是Disruptor的RingBuffer操作,熟悉Disruptor的应该知道。
reactor.notify发送一个事件,而reactor.on能够接受到这个事件即时响应。
Reactor 的分发器 Dispatchers 类似Akka的分发器
● 分发器管理任务执行,有下面几种:
– ThreadPoolExecutorDispatcher
● 标准的 ThreadPoolExecutor
– BlockingQueueDispatcher
● 能够进行事件轮询
– RingBufferDispatcher
● LMAX Disruptor RingBuffer
– SynchronousDispatcher
Reactor的 Selectors
● Selectors 是一个等式的左边。
– 一个Selector能够被任何对象使用$(obj)创建
(或者: Selectors.object(obj))
– 一个Selector能够从匹配的key中释放数据
– Predicate<T> Selectors 能够创建匹配特定领域准则
(domain-specific criteria)
比如RegexSelector:
reactor.on(R(“some.(.+)”), (Event<String> ev) → {
// s will be ‘topic‘
String s = ev.getHeaders().get(“group1”);
});
reactor.notify(“some.topic”, Event.wrap(“John Doe”));
其中R(“some.(.*)”)匹配事件发送者“some.topic”。
UriTemplateSelector能够从URI匹配字符串:
reactor.on(U(“/some/{topic}”), (Event<String> ev) → {
// s will be ‘topic‘
String s = ev.getHeaders().get(“topic”);
});
reactor.notify(“/some/topic”, Event.wrap(“John Doe”));
Reactor 的Stream
● Streams允许基于数据的函数组合composition
– Callback++
– 类似Netflix RxJava Observable, JDK 8 Stream
Stream<String> str; str.map(String::toUpperCase) .filter(new Predicate<String>() { public boolean test(String s) { … } }) .consume(s → log.info(“consumed string {}”, s)); |
Reactor 的 Promise
允许在Stream之间分享函数
Promise<String> p; String s = p .onSuccess(s → log.info(“consumed string {}”, s)) .onFailure(t → log.error(t.getMessage(), t)) .onComplete(t → log.info(“complete”)) .await(5, SECONDS); p.map(String::toUpperCase).consume(s → log.info(“UC: {}”, s)); |
Reactor 的 Processor
干脆直接将Disruptor API转为Reactor API
对于#UberFastData有超级快性能
Processor<Buffer> proc; Operation<Buffer> op = proc.prepare(); op.get().append(data).flip(); op.commit(); proc.batch(512, buff → buff.append(data).flip()); |
与Spring整合:
首先使用@EnableReactor 激活reactor
@Configuration @EnableReactor public class ReactorConfiguration { @Bean public Reactor input(Environment env) { return Reactors.reactor().env(env) .dispatcher(RING_BUFFER).get(); } @Bean public Reactor output(Environment env) { return Reactors.reactor().env(env) .dispatcher(RING_BUFFER).get(); } |
然后在监听者或观察者写入:
@Component public class SimpleHandler { @Autowired private Reactor reactor; @Selector(“test.topic”) public void onTestTopic(String s) { // Handle data } } |
reactor的groovy整合:
@CompileStatic def welcome(){ reactor.on(‘greetings‘) { String s -> reply “hello $s” reply “how are you?” } reactor.notify ‘greetings‘, ‘Jon‘ reactor.send(‘greetings‘, ‘Stephane‘){ println it cancel() } } |