MQTT的学习研究(六) MQTT moquette 的 Blocking API 订阅消息客户端使用

* 使用 Java 为 MQ Telemetry Transport 创建订户
 * 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。
 * 提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获
 * 得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前
 * 为同一客户机标识创建的预订的发布。
 * MqttConnectionOptions.cleanSession Boolean 属性将控制是否接收到先前所发送的发布
 *
 *4.创建新的 MqttClient 实例。 
 * MqttClient client = new MqttClient(Example.TCPAddress, Example.clientId);为客户机提供服务器地址,稍后会将此地址用来连接至 WebSphere MQ。设置客户机标识以对客户机命名。
 * 
 * ?(可选)可以提供 MqttClientPersistence 接口的实现以替换缺省实现。缺省 MqttPersistence 实现会将正在等待传递的 QoS 1 和 QoS 2 消息作为文件来存储;请参阅MQTT 客户机中的消息持久性。
 * ?MQTT 的缺省 WebSphere MQ TCP/IP 端口为 1883。对于 SSL,缺省端口为 8883。在此示例中,缺省地址设置为 tcp://localhost:1883。
 * ?通常,能够使用客户机标识来标识特定物理客户机很重要。在与服务器相连的所有客户机中,客户机标识必须是唯一的;请参阅MQTT 客户机标识。如果与前一个实例使用同一个客户机标识,那么表示目前的实例是同一个客户机的实例。如果您在两个正在运行的客户机中重复使用同一个客户机标识,那么这两个客户机中都会抛出异常,并且一个客户机会终止。
 * ?客户机标识的长度不能超过 23 个字节。如果超过了此长度,就会抛出异常。客户机标识中必须只包含队列管理器名称中允许使用的字符;例如,不能包含连字符或空格。
 * ?在您调用 MqttClient.connect 方法之前,不会处理消息。
 * 使用客户机对象来发布和预订主题以及恢复有关尚未传递的发布的信息。
 *
 *
 *6.创建一个 MqttConnectOptions 对象,并设置其 cleanSession 属性。
 *  a.创建一个 MqttConnectOptions 对象。 
 *  MqttConnectOptions conOptions = new MqttConnectOptions();conOptions 是 MqttClient 构造函数的一个选项参数。
 *  
 *  b.设置 clearSession 属性。 
 *   conOptions.setCleanSession(Example.cleanSession);缺省情况下,Example.cleanSession 参数设置为 true,从而与 MqttConnectionOptions.cleanSession 的缺省设置相匹配。
 *  
 *  如果您使用缺省 MqttConnectOptions,或者在连接客户机之前将 MqttConnectOptions.cleanSession 设置为 true,那么在客户机建立连接时,将除去客户机的任何旧预订。当客户机断开连接时,会除去客户机在会话期间创建的任何新预订。
 *  
 *  如果您在连接之前将 MqttConnectOptions.cleanSession 设置为 false,那么客户机创建的任何预订都会被添加至客户机在连接之前就已存在的所有预订。当客户机断开连接时,所有预订仍保持活动状态。
 *  
 *  要了解 cleanSession 属性影响预订的方式,另一种方法就是将它视作模态属性。在其缺省方式 cleanSession=true 下,客户机仅在会话的作用域内创建预订和接收发布。在另一种方式 cleanSession=false 下,预订是持久预订。客户机可以连接和断开连接,而其预订保持活动状态。当客户机重新连接时,它将接收任何未传递的发布。在它连接之后,它可以自己修改处于活动状态的预订集。
 *  
 *  在连接之前,您必须设置 cleanSession 方式;在整个会话期间都将保持此方式。要更改此属性的设置,必须将客户机断开连接,然后再重新连接客户机。如果您将方式从使用 cleanSession=false 更改为 cleanSession=true,那么此客户机先前的所有预订以及尚未接收到的任何发布都将被废弃。
 *

MQTT订阅实现类:

Java代码  

  1. package com.etrip.wsmqtt.client;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;
  4. /**
  5. *
  6. * 使用 Java 为 MQ Telemetry Transport 创建订户
  7. * 在此任务中,您将遵循教程来创建订户应用程序。订户将针对主题创建预订并接收该预订的发布。
  8. *  提供了一个示例订户应用程序 Subscribe。Subscribe 将创建预订主题 MQTT Examples,并等待获
  9. *  得该预订的发布,等待时间为 30 秒。订户可以创建预订并等待获得发布。它还可以接收发送至先前
  10. *  为同一客户机标识创建的预订的发布。
  11. * @author longgangbai
  12. */
  13. public class WSMQTTClientSubscribe {
  14. public static void main(String[] args) {
  15. try {
  16. //创建MQTT客户端对象
  17. MqttClient client = new MqttClient(WSMQTTClientConstants.TCPAddress, WSMQTTClientConstants.clientId);
  18. //创建客户端MQTT回调类
  19. WSMQTTClientCallBack callback = new WSMQTTClientCallBack(WSMQTTClientConstants.clientId);
  20. //设置MQTT回调
  21. client.setCallback(callback);
  22. //创建一个连接对象
  23. MqttConnectOptions conOptions = new MqttConnectOptions();
  24. //设置清除会话信息
  25. conOptions.setCleanSession(WSMQTTClientConstants.cleanSession);
  26. //设置超时时间
  27. conOptions.setConnectionTimeout(10000);
  28. //设置会话心跳时间
  29. conOptions.setKeepAliveInterval(20000);
  30. //设置最终端口的通知消息
  31. conOptions.setWill(client.getTopic("LastWillTopic"), "the client will stop !".getBytes(), 1, false);
  32. //连接broker
  33. client.connect(conOptions);
  34. System.out.println("Subscribing to topic \"" + WSMQTTClientConstants.topicString
  35. + "\" for client instance \"" + client.getClientId()
  36. + "\" using QoS " + WSMQTTClientConstants.QoS + ". Clean session is "
  37. + WSMQTTClientConstants.cleanSession);
  38. //订阅相关的主题信息
  39. client.subscribe(WSMQTTClientConstants.topicString, WSMQTTClientConstants.QoS);
  40. System.out.println("Going to sleep for " + WSMQTTClientConstants.sleepTimeout / 1000
  41. + " seconds");
  42. Thread.sleep(100000000000000l);
  43. //关闭相关的MQTT连接
  44. if(client.isConnected()){
  45. client.disconnect();
  46. }
  47. System.out.println("Finished");
  48. } catch (Exception e) {
  49. e.printStackTrace();
  50. }
  51. }
  52. }

MQTT订阅回调类:

Java代码  

  1. package com.etrip.wsmqtt.client;
  2. import com.ibm.micro.client.mqttv3.*;
  3. /**
  4. *
  5. * 消息订阅相关的回调类使用
  6. *
  7. * 必须实现MqttCallback的接口并实现对应的相关接口方法
  8. *
  9. * @author longgangbai
  10. */
  11. public class WSMQTTClientCallBack implements MqttCallback {
  12. private String instanceData = "";
  13. public WSMQTTClientCallBack(String instance) {
  14. instanceData = instance;
  15. }
  16. public void messageArrived(MqttTopic topic, MqttMessage message) {
  17. try {
  18. System.out.println("Message arrived: \"" + message.toString()
  19. + "\" on topic \"" + topic.toString() + "\" for instance \""
  20. + instanceData + "\"");
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void connectionLost(Throwable cause) {
  26. System.out.println("Connection lost on instance \"" + instanceData
  27. + "\" with cause \"" + cause.getMessage() + "\" Reason code "
  28. + ((MqttException)cause).getReasonCode() + "\" Cause \""
  29. + ((MqttException)cause).getCause() +  "\"");
  30. cause.printStackTrace();
  31. }
  32. public void deliveryComplete(MqttDeliveryToken token) {
  33. try {
  34. System.out.println("Delivery token \"" + token.hashCode()
  35. + "\" received by instance \"" + instanceData + "\"");
  36. } catch (Exception e) {
  37. e.printStackTrace();
  38. }
  39. }
  40. }

常量类:

Java代码  

  1. package com.etrip.wsmqtt.client;
  2. /**
  3. *
  4. * 消息订阅消息的常量字段
  5. *
  6. * @author longgangbai
  7. */
  8. public final class WSMQTTClientConstants {
  9. public static final String TCPAddress = System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
  10. public static String  clientId = String.format("%-23.23s",(System.getProperty("user.name") + "_" + (System.getProperty("clientId", "Subscribe."))).trim()).replace(‘-‘, ‘_‘);
  11. public static final String  topicString = System.getProperty("topicString", "china/beijing");
  12. public static final String  publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
  13. public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  14. public static final int sleepTimeout =Integer.parseInt(System.getProperty("timeout", "10000000"));
  15. public static final boolean cleanSession = Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
  16. public static final int  QoS = Integer.parseInt(System.getProperty("QoS", "1"));
  17. public static final boolean  retained = Boolean.parseBoolean(System.getProperty("retained", "false"));
  18. }
时间: 2024-07-30 10:27:10

MQTT的学习研究(六) MQTT moquette 的 Blocking API 订阅消息客户端使用的相关文章

MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

使用IBM MQTTv3实现相关的发布订阅功能 MQTTv3的发布消息的实现: Java代码   package com.etrip.mqttv3; import com.ibm.micro.client.mqttv3.MqttClient; import com.ibm.micro.client.mqttv3.MqttDeliveryToken; import com.ibm.micro.client.mqttv3.MqttMessage; import com.ibm.micro.clien

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式.具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker.使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动. 在MQTT moquette 中采用MINA作为底层消息的传递方式  本类的目的启动MQTT moquette Broker 的方式,本文的源代码来自  moquette-broker-0.1-jar-with-de

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

接着上一篇的moquette-mqtt 的使用之broker启动之后,我们需要启动moquette-mqtt 的服务端发布消息. 在moquette-mqtt 的mqtt-client中三种方式实现发布消息的方式: 1.采用阻塞式的连接的(BlockingConnection) 2.采用回调式的连接 (CallbackConnection) 3.采用Future样式的连接(FutureConnection) 本文采用阻塞式作为实验对象. MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(十)【转】mosquitto——一个开源的mqtt代理

MQTT(MQ Telemetry Transport),消息队列遥测传输协议,轻量级的发布/订阅协议,适用于一些条件比较苛刻的环境,进行低带宽.不可靠或间歇性的通信.值得一提的是mqtt提供三种不同质量的消息服务: “至多一次”,消息发布完全依赖底层 TCP/IP 网络.会发生消息丢失或重复.这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送. “至少一次”,确保消息到达,但消息重复可能会发生. “只有一次”,确保消息到达一次.这一级别可用于如下情况,在计

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

参看官方文档: http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm *  Java 为 MQ Telemetry Transport 创建异步发布程序 *在此任务中,您将遵循教程来修改第一个发布程序.通过修改, *使应用程序能够发送发布而不等待传递确认信息.传递确认 *信息由您创建的回调类来接收. * * * *4.使客户机断开连接 *  a

MQTT的学习研究(十六) MQTT的Mosquitto的window安装部署

在mqtt的官方网站,有许多mqtt,其中:MosquittoAn Open Source MQTT server with C, C++, Python and Javascript clients. A public, hosted test server is also available (more information) MoquetteA Java MQTT broker based on an eventing model with Apache Mina. Mosquitto的