Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。
关于ActiveMQ5请参考:http://activemq.apache.org,本文只介绍Apollo在windows下安装和应用,Apollo的详细文档请参考官网:http://activemq.apache.org/apollo/documentation/user-manual.html.
Apollo的下载和安装
1.下载并安装
进入http://activemq.apache.org/apollo/download.html,下载windows版本的压缩包,并解压到自己工作目录(如:E:\apache-apollo-1.7),并创建环境变量APOLLO_HOME=E:\apache-apollo-1.7。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。
2.创建broker实例并启动服务
进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create E:\apollo_broker,命令执行成功后,在E盘下会有apollo_broker目录,在其下有个bin目录,其中有两个文件:apollo-broker.cmd和apollo-broker-service.exe,第一个是通过cmd命令启动apollo服务的,第二个是创建window服务的。
cmd命令启动:apollo-broker run,启动成功可以在浏览器中查看运行情况(http://127.0.0.1:61680/,默认用户名/密码:admin/password);
windows服务启动:执行apollo-broker-service.exe,创建windows服务,就可以以windows服务的方式启动apollo服务。
3.MQTT协议的应用
MQTT协议有众多客户端实现,相关请参考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。
本文采用eclipse的paho客户端实现(https://eclipse.org/paho/)。a.javascript客户端:https://eclipse.org/paho/clients/js/
将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,demo代码如下:
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId");
// 61623是ws连接的默认端口,可以在apollo中间件中进行配置(关于apollo的配置请参考:http://activemq.apache.org/apollo/documentation/user-manual.html)
// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
// connect the client
client.connect({userName:‘admin‘,password:‘password‘,onSuccess:onConnect});
// called when the client connects
function onConnect() { // 连接成功后的处理
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/topic/event"); // 订阅消息的主题
var message = new Paho.MQTT.Message("Hello,this is a test");
message.destinationName = "/topic/event";
client.send(message); // 发送消息
}
// called when the client loses its connection
function onConnectionLost(responseObject) { // 连接丢失后的处理
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
// called when a message arrives
function onMessageArrived(message) { // 消息接收成功后的处理
console.log("onMessageArrived:"+message.payloadString);
}b. java客户端实现
paho目前只支持J2SE和安卓,下载地址:https://eclipse.org/paho/clients/java/,我们采用maven方式。
maven库地址:
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots
maven dependency:<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.1</version>
</dependency>
说明:版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。
java端实现:public interface IMessage {
String getHost();
Integer getPort();
Integer getQos();
String getTopic();
String getClientId();
String getContent();
byte[] getContentBytes();
Map<String,Object> getOption();
Object getSender();
Date getSendTime();
}public final class MessageProcessingCenter {
protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class);
protected static final String BROKER_PREFIX="tcp://";
protected static final String BROKER_HOST="localhost";
protected static final int PORT=61613;
protected static final int QOS=2;
protected static final String TOPIC="/topic/event";
protected static final String CLIENT_ID="clientId";
protected static final String MQ_USER="admin";
protected static final String MQ_PASSWORD="password";
public static void send(IMessage message){
String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic();
int qos=null == message.getQos()?QOS: message.getQos();
String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost());
int port=null == message.getPort()?PORT:message.getPort();
broker+=":"+port;
String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId();
Map<String,Object> opts=message.getOption();
String user=MQ_USER;
String password=MQ_PASSWORD;
if(null != opts){
if(null != opts.get("userName")){
user=opts.get("userName").toString();
}
if(null != opts.get("password")){
password=opts.get("password").toString();
}
}
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName(user);
connOpts.setPassword(password.toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
MqttMessage mqm = new MqttMessage(message.getContentBytes());
mqm.setQos(qos);
sampleClient.publish(topic, mqm);
sampleClient.disconnect();
} catch(MqttException me) {
logger.info("********************* send message exception :");
logger.info("********************* reason : " + me.getReasonCode());
logger.info("********************* msg : " + me.getMessage());
logger.info("********************* loc : " + me.getLocalizedMessage());
logger.info("********************* cause : " + me.getCause());
logger.info("********************* excep : " + me);
me.printStackTrace();
}
}
public static void send(Set<IMessage> set){
for(IMessage message:set){
send(message);
}
}
}
小结
至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档:
apollo:http://activemq.apache.org/apollo/documentation/user-manual.html
eclipse paho:https://eclipse.org/paho/