Vert.x中EventBus中的使用

注意:使用的是vert.x3.0 仅支持到java8当中有一些lambda表达式。如不明确请自补java8新特性。

The Event Bus

event bus 是vert.x的神经系统。

每个vert.x的实例都有一个单一的event bus 实例。它是使用vertx.eventBus()方法获得的。

event bus 同意程序中的不同语言编写的模块进行通信。不论他们是同样的vert.x实例。还是不同的vert.x实例。

它甚至能够桥接浏览器中执行的Javascript通信。

event bus能够在分布式系统中的多个server节点之间进行点对点通信和多个浏览器。

event bus支持公布/订阅模式。点对点模式,和请求/响应模式。

event bus的API是很easy的。它主要包含注冊消息处理事件。取消处理事件。发送和公布消息。

首先理论

寻址

event bus上的消息被发送到一个地址。

vert.x不包括不论什么花哨的寻址方案。

在vert.x中,一个地址就是一个简单的String字符串。不论什么字符串都是有效的。只是最好的方法是使用某种有计划或者有规则的方案,比方使用一个私有的空间名称。

一些有參考价值的样例:europe.news.feed1,
acme.games.pacman, sausages, and X。

事件-消息的处理程序

收到消息的处理程序,你在一个地址上注冊一个处理程序,来消息后将触发这个处理程序。

同一个消息处理程序能够注冊到不同的地址上,相同同一个地址也能注冊多个处理程序。

公布/订阅模式

event bus 支持公布消息

消息被公布到一个地址。公布意味着将消息交给全部订阅并注冊处理程序的地址来处理。

这跟大家熟悉的公布/订阅模式没有什么不同。

点对点和请求/响应模式

event bus 支持点对点消息传递。

消息被发送到一个地址。

vert.x将发送它到一个注冊消息处理程序的地址。

假设有多个处理程序注冊地址,vert.x将选择一个来处理(採用非严格循环算法)。

强烈不推荐。

当接收到消息的程序处理完毕后,能够决定是否回复。发送程序接到回复后也能够进行响应回复,假设他们这样做应答处理程序将被调用。

当接收方到返回发送方。这样能够无限反复,这又是一种常见的消息传递模式:请求/响应模式

最优传输

vert.x可以做到最优传输。不会有意识的丢失消息。这是很重要的。

然而,event bus的部分或所有失败还是有可能造成消息丢失的。

假设你的应用程序很在乎消息的完整性和时序性。那么你的代码处理应该是幂等的。以便在消息处理程序复苏后又一次发送消息。

消息类型

开箱同意vert.x使用不论什么的原始/简单类型,字符串或者缓冲区发送消息。

然而这里有一个不成文的规定或者说建议。那就是最好使用JSON格式的子串来进行消息的传递。

JSON字串在全部的编程语言中都是很easy创建。读取和解析的。在vert.x下它已经变成一种通用语言了。

假设你不是必需使用JSON或者说你不想。

event bus 很灵活。

它还支持发送随意对象。还能够定义您想要发送的对象的编解码器。

EVENT BUS 的API

让我们跳进event bus的API。

获得event bus 的对象

你能够通过例如以下代码获得event bus的单一对象:

EventBus
eb = vertx.eventBus();

注冊处理事件

使用以下这个简单方法注冊一个消费处理程序:

EventBus
eb = vertx.eventBus();

eb.consumer("news.uk.sport", message -> {

System.out.println("I have received a message: " + message.body());

});

当一个消息到达你的处理事件是。你的事件将被激活。并处理这个消息。

consumer()方法返回一个MessageConsumer的对象实例。这个对象随后用于注销处理程序,或者用处理程序作为流。

然而您也能够使用consumer()返回MessageConsumer没有处理程序,然后单独设置处理程序。比如:

EventBus
eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("news.uk.sport");consumer.handler(message
-> {

System.out.println("I have received a message: " + message.body());

});

当在集群事件总线上注冊一个处理程序时,它能够花一些时间登记到集群的全部节点上。

假设你希望在注冊完毕时得到通知的话,你能够在MessageConsumer上注冊一个注冊完毕的处理程序:

consumer.completionHandler(res
-> {

if (res.succeeded()) {

System.out.println("The handler registration has reached all
nodes");

} else {

System.out.println("Registration failed!");

}

});

注销处理事件

去除处理事件。叫做注销。

假设你是集群事件总线, 假设你想当这个过程完毕时通知注销,你能够使用以下的方法:

consumer.unregister(res
-> {

if (res.succeeded()) {

System.out.println("The handler un-registration has reached
all nodes");

} else {

System.out.println("Un-registration failed!");

}

});

公布消息

公布消息很easy。仅仅须要把它公布到指定地址就可以:

eventBus.publish("news.uk.sport",
"Yay! Someone kicked a ball");

这一消息将被交付全部订阅news.uk.sport地址处理。

发送消息

发送消息将导致仅仅有一个注冊地址的处理程序接收到消息(多个注冊地址也仅仅有一个能收到)。

这就是点对点模式,选择处理程序的方法採用非严格循环方式。

你可用用send()方法发送一条消息。

eventBus.send("news.uk.sport",
"Yay! Someone kicked a ball");

未解决的指令包含在<stdiin>-include::override/eventbus_headers.adoc[]
==== The Message object

你的消息处理程序收到的是一个Message。

消息的body相应着是应该发送还是应该公布。

消息的headers是可用的。

回复消息

有时你发送消息后希望得到接收到消息的人的回复。

这就须要你使用请求-响应模式。

要做到这一点,在消息发送的时候,你能够指定一个回复事件。

当你接收到消息的时候,你能够通过调用reply()方法来应答。

当这一切发生的时候它会导致一个答复发送回发送方,发送方收到应答消息再做处理。

接收方:

MessageConsumer<String> consumer = eventBus.consumer("news.uk.sport");

consumer.handler(message -> {

System.out.println("I have received a message: " + message.body());

message.reply("how interesting!");

});

发送方:

eventBus.send("news.uk.sport", "Yay! Someone kicked a ball across a patch of grass", ar -> {

if (ar.succeeded()) {

		System.out.println("Received reply: " + ar.result().body());
		}
	});

相应答也能够做应答。这样你就能够在两个不同的程序中创建一个包括多个回合的对话。

发送超时

当你发送消息时和指定应答事件时你能够通过DeliveryOptions指定超时时间。

假设应答事件不少于超时时间,这个应答事件将失败。

默认的超时时间是30S。

发送失败

消息发送失败的其它原因,包含:

没有可用的事件去发送消息

接收者已经明白使用失败:失败的消息

在全部情况下。应答事件将回复特定的失败。

未解决的指令包括在<stdin> - include::override/eventbus.adoc[]==== Clustered Event Bus

event bus 不只存在于一个单一的Vert.x实例中,在一个集群中不同的Vert.x实例也能够形成一个单一的,分布的事件总线。

集群编程

假设你创建一个Vert.x实例用于集群编程。你须要的得到一个关于集群事件总线配置

VertxOptions options = new VertxOptions();

Vertx.clusteredVertx(options, res -> {

if (res.succeeded()) {

Vertx vertx = res.result();

EventBus eventBus = vertx.eventBus();

System.out.println("We now have a clustered event bus: " + eventBus);

} else {

System.out.println("Failed: " + res.cause());

}});

你应该确保在你的类路径中实现了一个ClusterManager,比如默认的:HazelcastClusterManager。

使用命令集群

你能够使用命令行执行集群:vertx
run my-verticle.js -cluster

Automatic
clean-up in verticles

If you’re registering event bus handlers from inside verticles, those handlers
will be automatically unregisteredwhen the verticle is undeployed.

时间: 2024-10-07 04:00:53

Vert.x中EventBus中的使用的相关文章

Android 中 EventBus 的使用(3):多线程事件处理

在这一系列教程的最后一篇中,我想谈谈GR的EventBus,在处理多线程异步任务时是多么简单而有效. AsyncTask, Loader和Executor…… 拜托! Android中有很多种执行异步操作的方法(指平行于UI线程的).AsyncTask对于用户来说是最简单的一种机制,并且只需要少量的设置代码即可.然而,它的使用是有局限的,正如Android官方文档中所描述的: AsyncTask被设计成为一个工具类,在它内部包含了Thread和Handler,但它本身并不是通用线程框架的一部分.

Android 中 EventBus 的使用(2):缓存事件

在上一篇文章中,我曾提到我所选择的是Green Robot提供的EventBus(Android平台),而且这并非只是我一个人的选择.在最近一次查看中,我发现选择它的人数已经是Otto(由Jake Wharton和其他大神们在Square上所提供的版本)的两倍之多了.GR的版本显然比Otto有更多的性能提升,但最令我动心的地方在于它添加了很多新功能.今天我就打算谈谈其中的一项:通过sticky事件进行事件缓存. sticky是什么? sticky事件就是指在EventBus内部被缓存的那些事件.

MAC中Django中runserver提示Can&#39;t connect to local MySQL server through socket &#39;/tmp/mysql.sock错误

好像不止遇到一次,直接Google就可以了,在stackoverflow中就有答案,答案就是你没有开MySQL - -. stackoverflow链接见 http://stackoverflow.com/questions/16325607/cant-connect-to-local-mysql-server-through-socket-tmp-mysql-sock 开启MySQL的命令如下: mysql.server start MAC中Django中runserver提示Can't co

【转】Java中字符串中子串的查找共有四种方法(indexof())

原文网址:http://wfly2004.blog.163.com/blog/static/1176427201032692927349/ Java中字符串中子串的查找共有四种方法,如下:1.int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引. 2.int indexOf(String str, int startIndex):从指定的索引处开始,返回第一次出现的指定子字符串在此字符串中的索引. 3.int lastIndexOf(String st

jquery中form中使用submit出现的问题,未解决

$("#login_btn").click(function(){在type为submit的按钮下 if($("#id_password").val().length == 0){//检验密码是否为空 $('form').submit(function(){ alert("1") return false; }); }else if($("#vali").val() != code){//验证码是否正确 $('form').s

ios开发中object-c中UTF-8 和 GBK 的 NSString 相互转化的方法

应用都要遇到一个很头疼的问题:文字编码,汉字的 GBK 和 国际通用的 UTF-8 的互相转化稍一不慎, 就会满屏乱码.下面介绍 UTF-8 和 GBK 的 NSString 相互转化的方法 NSStringEncoding enc = CFStringConvertEncodingToNSStringEncoding(kCFStringEncodingGB_18030_2000); char* c_test = "北京"; int nLen = strlen(c_test); NSS

解决QML开发中ComboBox中一个已选择项没有清除的问题

解决QML开发中ComboBox中一个已选择项没有清除的问题 近期使用QML开发一个项目.须要使用ComboBox进行显示.当进行一个操作时,须要向ComboBox加入一个元素,当进行另外一个操作时.须要清除ComboBox里面的元素. 可是在操作的过程中,出现了一个诡异的现象--ComboBox里面的已选择项并没有清除. 以下是程序的截图,能够看到.ComboBox中已选择项并没有删除.可是ComboBox中的候选项已经删除了. 我在QTCN上进行提问.后面再大家的努力下,最终把这个问题攻克了

iOS开发之获取一段字符串中的中文字和中文字符

#pragma mark -获取一段字符串中的中文字 + (NSArray *)getAStringOfChineseWord:(NSString *)string { if (string == nil || [string isEqual:@""]) { return nil; } NSMutableArray *arr = [[NSMutableArray alloc]init]; for (int i=0; i<[string length]; i++) { int a

java中InputStream中read()与read(byte[] b) 用法介绍

本文章介绍了关于在java中InputStream中read()与read(byte[] b) 用法有需要了解这两个函数的朋友可以看看本文章. 这两个方法在抽象类InputStream中都是作为抽象方法存在的, JDK API中是这样描述两者的: read() :  从输入流中读取数据的下一个字节,返回0到255范围内的int字节值.如果因为已经到达流末尾而没有可用的字节,则返回-1.在输入数据可用.检测到流末尾或者抛出异常前,此方法一直阻塞. read(byte[] b) : 从输入流中读取一