MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

参看官方文档:

http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm

*  Java 为 MQ Telemetry Transport 创建异步发布程序
 *在此任务中,您将遵循教程来修改第一个发布程序。通过修改,
 *使应用程序能够发送发布而不等待传递确认信息。传递确认
 *信息由您创建的回调类来接收。
 *
 *
 *
 *4.使客户机断开连接
 *  a.除去其中包含 token.waitForCompletion 表达式的语句。 主线程将继续执行,而不等待传递发布。
 *  b.测试客户机是否已断开连接。 将错误返回到 MqttCallback 中的 lostConnection 方法之后,MQTT 客户机将断开连接,客户机应用程序也可能断开连接。测试是否有打开的连接。
 *  c.使用常量 Example.quiesceTimeout 来设置使客户机停顿的最长时间。
 *  if (client.isConnected())
 *      client.disconnect(Example.quiesceTimeout);
 *当满足下面三种情况的组合形式时,客户机就完成了: 
 *  a.已经对在此会话中(如果重新启动了会话,则是在先前会话中)已发布的所有消息调用了回调。
 *  b.消息未完成,然而停顿时间间隔已到期。缺省情况下,停顿时间间隔为 30 秒。通过将要等待的毫秒数作为 client.disconnect 的一个参数来传递,即可更改停顿超时。
 *  c.在发布了某些消息并由客户机进行排队之后,但是在发送这些消息之前调用了 client.disconnect。已排队的消息尚未处于“未完成”状态。如果会话可重新启动,那么重新启动会话时就会重新发送消息。
 *  缺省情况下,停顿时间间隔为 30 秒。

MQTT的消息发布代码:

Java代码  

  1. package com.etrip.wsmqtt.server;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
  4. import com.ibm.micro.client.mqttv3.MqttMessage;
  5. import com.ibm.micro.client.mqttv3.MqttTopic;
  6. /**
  7. * 使用 Java 为 MQ Telemetry Transport 创建异步发布程序
  8. *
  9. *
  10. *
  11. *
  12. * 消息发布的类的具体的实现
  13. *
  14. * @author longgangbai
  15. *
  16. */
  17. public class WSMQTTServerPubAsync {
  18. public static void main(String[] args) {
  19. try {
  20. //创建MqttClient对象
  21. MqttClient client = new MqttClient(WSMQTTServerCommon.TCPAddress, WSMQTTServerCommon.clientId);
  22. //创建MQTT相关的主题
  23. MqttTopic topic = client.getTopic(WSMQTTServerCommon.topicString);
  24. //创建MQTT的消息体
  25. MqttMessage message = new MqttMessage();
  26. //设置消息传输的类型
  27. message.setQos(2);
  28. //设置是否在服务器中保存消息体
  29. message.setRetained(false);
  30. //设置消息的内容
  31. message.setPayload(WSMQTTServerCommon.publication.getBytes());
  32. //创建一个MQTT的回调类
  33. WSMQTTServerCallBack callback = new WSMQTTServerCallBack(WSMQTTServerCommon.clientId);
  34. //MqttClient绑定
  35. client.setCallback(callback);
  36. //MqttClient连接
  37. client.connect();
  38. System.out.println("Publishing \"" + message.toString()
  39. + "\" on topic \"" + topic.getName() + "\" with QoS = "
  40. + message.getQos());
  41. System.out.println("For client instance \"" + client.getClientId()
  42. + "\" on address " + client.getServerURI() + "\"");
  43. //发送消息并获取回执
  44. MqttDeliveryToken token = topic.publish(message);
  45. System.out.println("With delivery token \"" + token.hashCode()
  46. + " delivered: " + token.isComplete());
  47. Thread.sleep(100000000000000l);
  48. //关闭连接
  49. if (client.isConnected())
  50. client.disconnect(WSMQTTServerCommon.quiesceTimeout);
  51. System.out.println("Disconnected: delivery token \"" + token.hashCode()
  52. + "\" received: " + token.isComplete());
  53. } catch (Exception e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }

MQTT消息发布回调代码:

Java代码  

  1. package com.etrip.wsmqtt.server;
  2. import com.ibm.micro.client.mqttv3.*;
  3. /**
  4. * 发布消息的回调类
  5. *
  6. * 必须实现MqttCallback的接口并实现对应的相关接口方法
  7. *      ?CallBack 类将实现 MqttCallBack。每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。在回调中,将它用来标识已经启动了该回调的哪个实例。
  8. *  ?必须在回调类中实现三个方法:
  9. *
  10. *  public void messageArrived(MqttTopic topic, MqttMessage message)
  11. *  接收已经预订的发布。
  12. *
  13. *  public void connectionLost(Throwable cause)
  14. *  在断开连接时调用。
  15. *
  16. *  public void deliveryComplete(MqttDeliveryToken token))
  17. *      接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。
  18. *
  19. *
  20. *  ?由 MqttClient.connect 激活此回调。
  21. *
  22. * @author longgangbai
  23. */
  24. public class WSMQTTServerCallBack implements MqttCallback {
  25. private String instanceData = "";
  26. public WSMQTTServerCallBack(String instance) {
  27. instanceData = instance;
  28. }
  29. /**
  30. * 接收到消息的回调的方法
  31. */
  32. public void messageArrived(MqttTopic topic, MqttMessage message) {
  33. try {
  34. System.out.println("Message arrived: \"" + message.toString()
  35. + "\" on topic \"" + topic.toString() + "\" for instance \""
  36. + instanceData + "\"");
  37. } catch (Exception e) {
  38. e.printStackTrace();
  39. }
  40. }
  41. /**
  42. * 消息连接丢失
  43. */
  44. public void connectionLost(Throwable cause) {
  45. System.out.println("Connection lost on instance \"" + instanceData
  46. + "\" with cause \"" + cause.getMessage() + "\" Reason code "
  47. + ((MqttException)cause).getReasonCode() + "\" Cause \""
  48. + ((MqttException)cause).getCause() +  "\"");
  49. cause.printStackTrace();
  50. }
  51. /**
  52. *
  53. */
  54. public void deliveryComplete(MqttDeliveryToken token) {
  55. try {
  56. System.out.println("Delivery token \"" + token.hashCode()
  57. + "\" received by instance \"" + instanceData + "\"");
  58. } catch (Exception e) {
  59. e.printStackTrace();
  60. }
  61. }
  62. }

常量类:

Java代码  

  1. package com.etrip.wsmqtt.server;
  2. import java.util.UUID;
  3. /**
  4. *
  5. * 消息发布消息的常量字段
  6. *
  7. * @author longgangbai
  8. */
  9. public final class WSMQTTServerCommon {
  10. //发布broker的ip和端口
  11. public static final String  TCPAddress =System.getProperty("TCPAddress", "tcp://192.168.208.46:1883");
  12. //客户端的Id
  13. public static String clientId =String.format("%-23.23s",  System.getProperty("clientId", (UUID.randomUUID().toString())).trim()).replace(‘-‘, ‘_‘);
  14. //发布消息的主题
  15. public static final String topicString = System.getProperty("topicString", "china/beijing");
  16. //发布的消息
  17. public static final String publication =System.getProperty("publication", "Hello World " + String.format("%tc", System.currentTimeMillis()));
  18. //超时时间
  19. public static final int quiesceTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  20. public static final int  sleepTimeout = Integer.parseInt(System.getProperty("timeout", "10000"));
  21. public static final boolean cleanSession =Boolean.parseBoolean(System.getProperty("cleanSession", "false"));
  22. public static final int QoS =Integer.parseInt(System.getProperty("QoS", "1"));
  23. public static final boolean retained =Boolean.parseBoolean(System.getProperty("retained", "false"));
  24. }
时间: 2024-11-05 18:50:35

MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用的相关文章

SCVMM2008R2学习(五),为虚拟机安装来宾服务

在上一篇博文中我们通过SCVMM2008R2的平台创建了2003 x86一个虚拟机,在本篇博文中我们来为虚拟机安装来宾服务.SCVMM中的来宾服务其实就是Hyper-v中的集成服务,只不过在这里改名为了来宾服务.Hyper-v虚拟机中的集成服务跟vmware workstation 中的安装vmware tools差不多,都是在安装完成虚拟机后为了提高虚拟机的可用性. 目前的网络拓扑如下图 如下图,这是我们上篇博文中创建的test 2003虚拟机 我们右击虚拟机选择"属性" 我们看到现

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt Blocking API客户端订阅并接收主题信息

在上面两篇关于mqtt的broker的启动和mqtt的服务端发布主题信息之后,我们客户端需要订阅相关的信息并接收相关的主题信息. Java代码   package com.etrip.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式.具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker.使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动. 在MQTT moquette 中采用MINA作为底层消息的传递方式  本类的目的启动MQTT moquette Broker 的方式,本文的源代码来自  moquette-broker-0.1-jar-with-de

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

接着上一篇的moquette-mqtt 的使用之broker启动之后,我们需要启动moquette-mqtt 的服务端发布消息. 在moquette-mqtt 的mqtt-client中三种方式实现发布消息的方式: 1.采用阻塞式的连接的(BlockingConnection) 2.采用回调式的连接 (CallbackConnection) 3.采用Future样式的连接(FutureConnection) 本文采用阻塞式作为实验对象. MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(十)【转】mosquitto——一个开源的mqtt代理

MQTT(MQ Telemetry Transport),消息队列遥测传输协议,轻量级的发布/订阅协议,适用于一些条件比较苛刻的环境,进行低带宽.不可靠或间歇性的通信.值得一提的是mqtt提供三种不同质量的消息服务: “至多一次”,消息发布完全依赖底层 TCP/IP 网络.会发生消息丢失或重复.这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送. “至少一次”,确保消息到达,但消息重复可能会发生. “只有一次”,确保消息到达一次.这一级别可用于如下情况,在计

Qt学习心得之网络编程简单的局域网聊天服务端建立

学而不思则罔,思而不学则殆.学习和思考是相辅相成的,通过这几天对网络编程的学习,收获颇丰.接下来我将利用Qt做的一个以TcpIp协议为传输方式的简单的局域网聊天服务端与大家分享下: 首先谈谈我个人对Tcp协议的理解:Tcp就是网上购物,买家和买家之间的物品传递,快递公司的扮演.快递公司将卖家所要寄出的物品进行包装,给予独特的号码,并从卖家获取目的地地址,得知这些明确信息后准确将物品送到买家,买家签收后,卖家通过快递单号查询到买家签收的消息. 其次是这个简单局域网聊天服务器的创建思路.如下图是思路