### 摘要
>Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。
关于ActiveMQ5请参考:http://activemq.apache.org,本文只介绍Apollo在windows下安装和应用,Apollo的详细文档请参考官网:http://activemq.apache.org/apollo/documentation/user-manual.html.
### Apollo的下载和安装
#### 1.下载并安装
>进入http://activemq.apache.org/apollo/download.html,下载windows版本的压缩包,并解压到自己工作目录(如:E:\apache-apollo-1.7),并创建环境变量APOLLO_HOME=E:\apache-apollo-1.7。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable (64位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=14632;32位JVM:http://www.microsoft.com/en-us/download/details.aspx?id=5555)。
#### 2.创建broker实例并启动服务
>进入E:\apache-apollo-1.7之下的bin目录,打开cmd窗口,执行命令:apollo create E:\apollo_broker,命令执行成功后,在E盘下会有apollo_broker目录,在其下有个bin目录,其中有两个文件:apollo-broker.cmd和apollo-broker-service.exe,第一个是通过cmd命令启动apollo服务的,第二个是创建window服务的。
> ###### cmd命令启动
在cmd命令行执行apollo-broker run;
等启动成功可以在浏览器中查看运行情况(http://127.0.0.1:61680/,默认用户名/密码:admin/password)。
#### 3.MQTT协议的应用
>MQTT协议有众多客户端实现,相关请参考:http://activemq.apache.org/apollo/versions/1.7/website/documentation/mqtt-manual.html。
本文采用eclipse的paho客户端实现(https://eclipse.org/paho/)。
> ##### a.javascript客户端:https://eclipse.org/paho/clients/js/
将javascript客户端项目下载下来,并在其项目根目录下执行mvn命令,进行编译,生成target目录,其下生成mqttws31.js、mqttws31-min.js两个js文件,将其拷贝到自己项目相关目录下,并在页面中引用,即可实现javascript客户端的消息订阅和发布,demo代码如下:
`var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); `
`// 61623是ws连接的默认端口,可以在apollo中间件中进行配置(关于apollo的配置请参考: http://activemq.apache.org/apollo/documentation/user-manual.html)`
`// set callback handlers`
`client.onConnectionLost = onConnectionLost;`
`client.onMessageArrived = onMessageArrived;`
`// connect the client`
`client.connect({userName:‘admin‘,password:‘password‘,onSuccess:onConnect});`
`// called when the client connects`
`function onConnect() { // 连接成功后的处理·`
`// Once a connection has been made, make a subscription and send a message.`
`console.log("onConnect");`
`client.subscribe("/topic/event"); // 订阅消息的主题`
`var message = new Paho.MQTT.Message("Hello,this is a test");`
`message.destinationName = "/topic/event";`
`client.send(message); // 发送消息`
`}`
`// called when the client loses its connection`
`function onConnectionLost(responseObject) { // 连接丢失后的处理`
`if (responseObject.errorCode !== 0) {`
` console.log("onConnectionLost:"+responseObject.errorMessage);`
` }`
` }`
` // called when a message arrives`
` function onMessageArrived(message) { // 消息接收成功后的处理`
` console.log("onMessageArrived:"+message.payloadString);`
` }`
>##### b. java客户端实现
paho目前只支持J2SE和安卓,下载地址:https://eclipse.org/paho/clients/java/,我们采用maven方式。
**maven库地址:**
https://repo.eclipse.org/content/repositories/paho-releases/ - Official Releases
https://repo.eclipse.org/content/repositories/paho-snapshots/ - Nightly Snapshots
**maven dependency:**
`<dependency>`
` <groupId>org.eclipse.paho</groupId>`
` <artifactId>org.eclipse.paho.client.mqttv3</artifactId>`
` <version>1.0.1</version>`
` </dependency>`
说明:版本为1.0.0或0.9.0时,其jar包根本加载不进来,最后搜到1.0.1版本才可以正常使用。
**java端实现:**
`public interface IMessage {`
` String getHost();`
` Integer getPort();`
` Integer getQos();`
`String getTopic();`
`String getClientId();`
`String getContent();`
`byte[] getContentBytes();`
`Map<String,Object> getOption();`
`Object getSender();`
`Date getSendTime();`
`}`
`public final class MessageProcessingCenter {`
`protected static Logger logger=LoggerFactory.getLogger(MessageProcessingCenter.class);`
`protected static final String BROKER_PREFIX="tcp://";`
`protected static final String BROKER_HOST="localhost";`
`protected static final int PORT=61613;`
`protected static final int QOS=2;`
`protected static final String TOPIC="/topic/event";`
`protected static final String CLIENT_ID="easypm";`
`protected static final String MQ_USER="admin";`
`protected static final String MQ_PASSWORD="password";`
`public static void send(IMessage message){`
`String topic= StringUtils.isEmpty(message.getTopic())?TOPIC: message.getTopic();`
`int qos=null == message.getQos()?QOS: message.getQos();`
`String broker=BROKER_PREFIX+ (StringUtils.isEmpty(message.getHost())?BROKER_HOST:message.getHost());`
`int port=null == message.getPort()?PORT:message.getPort();`
`broker+=":"+port;`
`String clientId = StringUtils.isEmpty(message.getClientId())?CLIENT_ID:message.getClientId();`
`Map<String,Object> opts=message.getOption();`
`String user=MQ_USER;`
`String password=MQ_PASSWORD;`
`if(null != opts){`
`if(null != opts.get("userName")){`
`user=opts.get("userName").toString();`
`}`
`if(null != opts.get("password")){`
`password=opts.get("password").toString();`
`}`
`}`
`MemoryPersistence persistence = new MemoryPersistence();`
`try {`
`MqttClient sampleClient = new MqttClient(broker, clientId, persistence);`
`MqttConnectOptions connOpts = new MqttConnectOptions();`
`connOpts.setUserName(user);`
`connOpts.setPassword(password.toCharArray());`
`connOpts.setCleanSession(true);`
`sampleClient.connect(connOpts);`
`MqttMessage mqm = new MqttMessage(message.getContentBytes());`
`mqm.setQos(qos);`
`sampleClient.publish(topic, mqm);`
`sampleClient.disconnect();`
`} catch(MqttException me) {`
`logger.info("********************* send message exception :");`
`logger.info("********************* reason : " + me.getReasonCode());`
`logger.info("********************* msg : " + me.getMessage());`
`logger.info("********************* loc : " + me.getLocalizedMessage());`
`logger.info("********************* cause : " + me.getCause());`
`logger.info("********************* excep : " + me);`
`me.printStackTrace();`
`}`
`}`
`public static void send(Set<IMessage> set){`
`for(IMessage message:set){`
`send(message);`
`}`
` }`
`}`
### 小结
>至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档:
**apollo:**http://activemq.apache.org/apollo/documentation/user-manual.html
**eclipse paho:**https://eclipse.org/paho/