MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现

callback方式接收没有成功。所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因。

采用Callback式 发布主题

Java代码  

  1. package com.etrip.mqtt.callback;
  2. import java.net.URISyntaxException;
  3. import org.fusesource.hawtbuf.Buffer;
  4. import org.fusesource.hawtbuf.UTF8Buffer;
  5. import org.fusesource.mqtt.client.Callback;
  6. import org.fusesource.mqtt.client.CallbackConnection;
  7. import org.fusesource.mqtt.client.Listener;
  8. import org.fusesource.mqtt.client.MQTT;
  9. import org.fusesource.mqtt.client.QoS;
  10. import org.fusesource.mqtt.client.Topic;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. *
  15. * MQTT moquette 的Server 段用于并发布主题信息
  16. *
  17. * 采用Callback式 发布主题
  18. *
  19. * @author longgangbai
  20. */
  21. public class MQTTCallbackServer {
  22. private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackServer.class);
  23. private final static String CONNECTION_STRING = "tcp://localhost:1883";
  24. private final static boolean CLEAN_START = true;
  25. private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
  26. public  static Topic[] topics = {
  27. new Topic("china/beijing", QoS.EXACTLY_ONCE),
  28. new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
  29. new Topic("china/henan", QoS.AT_MOST_ONCE)};
  30. public final  static long RECONNECTION_ATTEMPT_MAX=6;
  31. public final  static long RECONNECTION_DELAY=2000;
  32. public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
  33. public static void main(String[] args)   {
  34. //创建MQTT对象
  35. MQTT mqtt = new MQTT();
  36. try {
  37. //设置mqtt broker的ip和端口
  38. mqtt.setHost(CONNECTION_STRING);
  39. //连接前清空会话信息
  40. mqtt.setCleanSession(CLEAN_START);
  41. //设置重新连接的次数
  42. mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
  43. //设置重连的间隔时间
  44. mqtt.setReconnectDelay(RECONNECTION_DELAY);
  45. //设置心跳时间
  46. mqtt.setKeepAlive(KEEP_ALIVE);
  47. //设置缓冲的大小
  48. mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
  49. //获取mqtt的连接对象BlockingConnection
  50. final CallbackConnection connection = mqtt.callbackConnection();
  51. //添加连接的监听事件
  52. connection.listener(new Listener() {
  53. public void onDisconnected() {
  54. }
  55. public void onConnected() {
  56. }
  57. public void onPublish(UTF8Buffer topic, Buffer payload, Runnable ack) {
  58. // You can now process a received message from a topic.
  59. // Once process execute the ack runnable.
  60. ack.run();
  61. System.out.println("topic"+topic.toString()+"="+new String(payload.getData()));
  62. }
  63. public void onFailure(Throwable value) {
  64. }
  65. });
  66. //添加连接事件
  67. connection.connect(new Callback<Void>() {
  68. /**
  69. * 连接失败的操作
  70. */
  71. public void onFailure(Throwable value) {
  72. // If we could not connect to the server.
  73. System.out.println("MQTTCallbackServer.CallbackConnection.connect.onFailure"+"连接失败......"+value.getMessage());
  74. value.printStackTrace();
  75. }
  76. /**
  77. * 连接成功的操作
  78. * @param v
  79. */
  80. public void onSuccess(Void v) {
  81. int count=1;
  82. while(true){
  83. count++;
  84. // 用于发布消息,目前手机段不需要向服务端发送消息
  85. //主题的内容
  86. final String message="hello "+count+"chinese people !";
  87. final String topic = "china/beijing";
  88. System.out.println("MQTTCallbackServer  publish  topic="+topic+" message :"+message);
  89. connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false, new Callback<Void>() {
  90. public void onSuccess(Void v) {
  91. // the pubish operation completed successfully.
  92. }
  93. public void onFailure(Throwable value) {
  94. value.printStackTrace();
  95. }
  96. });
  97. try {
  98. Thread.sleep(2000);
  99. } catch (InterruptedException e) {
  100. // TODO Auto-generated catch block
  101. e.printStackTrace();
  102. }
  103. }
  104. //                  //连接断开
  105. //                  connection.disconnect(new Callback<Void>() {
  106. //                      public void onSuccess(Void v) {
  107. //                        // called once the connection is disconnected.
  108. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess", "called once the connection is disconnected.");
  109. //                      }
  110. //                      public void onFailure(Throwable value) {
  111. //                        // Disconnects never fail.
  112. //                          System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure", "Disconnects never fail."+value.getMessage());
  113. //                          value.printStackTrace();
  114. //                      }
  115. //                  });
  116. }
  117. });
  118. Thread.sleep(10000000000L);
  119. } catch (URISyntaxException e) {
  120. // TODO Auto-generated catch block
  121. e.printStackTrace();
  122. } catch (Exception e) {
  123. // TODO Auto-generated catch block
  124. e.printStackTrace();
  125. }finally{
  126. }
  127. }
  128. }

采用Callback式 订阅主题

Java代码  

  1. package com.etrip.mqtt.callback;
  2. import java.net.URISyntaxException;
  3. import org.fusesource.hawtbuf.Buffer;
  4. import org.fusesource.hawtbuf.UTF8Buffer;
  5. import org.fusesource.mqtt.client.Callback;
  6. import org.fusesource.mqtt.client.CallbackConnection;
  7. import org.fusesource.mqtt.client.Listener;
  8. import org.fusesource.mqtt.client.MQTT;
  9. import org.fusesource.mqtt.client.QoS;
  10. import org.fusesource.mqtt.client.Topic;
  11. import org.slf4j.Logger;
  12. import org.slf4j.LoggerFactory;
  13. /**
  14. *
  15. * MQTT moquette 的Client 段用于订阅主题,并接收主题信息
  16. *
  17. * 采用Callback式 订阅主题
  18. *
  19. * @author longgangbai
  20. */
  21. public class MQTTCallbackClient {
  22. private static final Logger LOG = LoggerFactory.getLogger(MQTTCallbackClient.class);
  23. private final static String CONNECTION_STRING = "tcp://localhost:1883";
  24. private final static boolean CLEAN_START = true;
  25. private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
  26. public  static Topic[] topics = {
  27. new Topic("china/beijing", QoS.AT_MOST_ONCE),
  28. new Topic("china/tianjin", QoS.AT_LEAST_ONCE),
  29. new Topic("china/henan", QoS.AT_MOST_ONCE)};
  30. public final  static long RECONNECTION_ATTEMPT_MAX=6;
  31. public final  static long RECONNECTION_DELAY=2000;
  32. final String topic = "china/beijing";
  33. public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
  34. public static void main(String[] args)   {
  35. //创建MQTT对象
  36. MQTT mqtt = new MQTT();
  37. //设置mqtt broker的ip和端口
  38. try {
  39. mqtt.setHost(CONNECTION_STRING);
  40. } catch (URISyntaxException e1) {
  41. e1.printStackTrace();
  42. }
  43. //连接前清空会话信息
  44. mqtt.setCleanSession(CLEAN_START);
  45. //设置重新连接的次数
  46. mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
  47. //设置重连的间隔时间
  48. mqtt.setReconnectDelay(RECONNECTION_DELAY);
  49. //设置心跳时间
  50. mqtt.setKeepAlive(KEEP_ALIVE);
  51. //设置缓冲的大小
  52. mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
  53. //获取mqtt的连接对象CallbackConnection
  54. final CallbackConnection connection= mqtt.callbackConnection();
  55. try {
  56. //添加连接的监听事件
  57. connection.listener(new Listener() {
  58. public void onDisconnected() {
  59. }
  60. public void onConnected() {
  61. System.out.println(" 连接成功!");
  62. }
  63. public void onPublish(UTF8Buffer topic, Buffer payload, Runnable onComplete) {
  64. }
  65. public void onFailure(Throwable value) {
  66. }
  67. });
  68. //添加连接事件
  69. connection.connect(new Callback<Void>() {
  70. /**
  71. * 连接失败的操作
  72. */
  73. public void onFailure(Throwable value) {
  74. // If we could not connect to the server.
  75. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onFailure  连接失败......"+value.getMessage());
  76. value.printStackTrace();
  77. }
  78. /**
  79. * 连接成功的操作
  80. * @param v
  81. */
  82. public void onSuccess(Void v) {
  83. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.onSuccess 订阅连接成功......");
  84. //订阅相关的主题
  85. connection.subscribe(topics, new Callback<byte[]>() {
  86. public void onSuccess(byte[] qoses) {
  87. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题成功......");
  88. }
  89. public void onFailure(Throwable value) {
  90. // subscribe failed.
  91. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.subscribe.onSuccess 订阅主题失败!"+value.getMessage());
  92. value.printStackTrace();
  93. }
  94. });
  95. }
  96. });
  97. Thread.sleep(100000000000L);
  98. } catch (Exception e) {
  99. // TODO Auto-generated catch block
  100. e.printStackTrace();
  101. }finally{
  102. //            //连接断开
  103. connection.disconnect(new Callback<Void>() {
  104. public void onSuccess(Void v) {
  105. // called once the connection is disconnected.
  106. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onSuccess called once the connection is disconnected.");
  107. }
  108. public void onFailure(Throwable value) {
  109. // Disconnects never fail.
  110. System.out.println("MQTTSubscribeClient.CallbackConnection.connect.disconnect.onFailure  Disconnects never fail."+value.getMessage());
  111. value.printStackTrace();
  112. }
  113. });
  114. }
  115. }
  116. }
时间: 2024-10-01 20:22:44

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现的相关文章

MQTT的学习研究(十二) MQTT moquette 的 Future API 消息发布订阅的实现

MQTT moquette 的Server发布主题 Java代码   package com.etrip.mqtt.future; import java.net.URISyntaxException; import org.fusesource.mqtt.client.FutureConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.mqtt.client.QoS; import org.fuseso

MQTT的学习研究(四)moquette-mqtt 的使用之mqtt Blocking API客户端订阅并接收主题信息

在上面两篇关于mqtt的broker的启动和mqtt的服务端发布主题信息之后,我们客户端需要订阅相关的信息并接收相关的主题信息. Java代码   package com.etrip.mqtt; import java.net.URISyntaxException; import org.fusesource.mqtt.client.BlockingConnection; import org.fusesource.mqtt.client.MQTT; import org.fusesource.

JavaScript学习总结(十四)——JavaScript编写类的扩展方法

在?J?a?v?a?S?c?r?i?p?t?中?可以使?用?类的p?r?o?t?o?t?y?p?e属性来?扩?展?类的属?性?和?方?法,在实际开发当中,当JavaScript内置的那些类所提供的动态方法和动态属性不满足我们实际开发时,我们就可以通过"prototype"属性给自定义类添加方法和属性或者扩展原有的类中的方法和属性. 一.扩展JavaScript内置类,添加动态方法 语法格式: 类名.prototype.方法名 = function([param1],[param2],.

Swift学习笔记十四:构造(Initialization)

类和结构体在实例创建时,必须为所有存储型属性设置合适的初始值.存储型属性的值不能处于一个未知的状态. 你可以在构造器中为存储型属性赋初值,也可以在定义属性时为其设置默认值.以下章节将详细介绍这两种方法. 注意: 当你为存储型属性设置默认值或者在构造器中为其赋值时,它们的值是被直接设置的,不会触发任何属性观测器(property observers). 一.基本语法 class Human{ var name :String init(){ name = "human" } init(n

laravel3学习笔记(十四)

原作者博客:ieqi.net ==================================================================================================== 运行时配置 在 Laravel3 中很多地方我们都可以看到“约定大于配置”的影子,我本人也很喜欢这种工程哲学尤其是在框架领域,当然这并不能代替所有的配置.我们知道 Laravel3 中,主要配置都写在 application/config 文件夹下,在应用逻辑中,往往

C++语言学习(十四)——C++类成员函数调用分析

C++语言学习(十四)--C++类成员函数调用分析 一.C++成员函数 1.C++成员函数的编译 C++中的函数在编译时会根据命名空间.类.参数签名等信息进行重新命名,形成新的函数名.函数重命名的过程通过一个特殊的Name Mangling(名字编码)算法来实现.Name Mangling算法是一种可逆的算法,既可以通过现有函数名计算出新函数名,也可以通过新函数名逆向推导出原有函数名.Name Mangling算法可以确保新函数名的唯一性,只要命名空间.所属的类.参数签名等有一个不同,那么产生的

MQTT的学习研究(十)【转】mosquitto——一个开源的mqtt代理

MQTT(MQ Telemetry Transport),消息队列遥测传输协议,轻量级的发布/订阅协议,适用于一些条件比较苛刻的环境,进行低带宽.不可靠或间歇性的通信.值得一提的是mqtt提供三种不同质量的消息服务: “至多一次”,消息发布完全依赖底层 TCP/IP 网络.会发生消息丢失或重复.这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送. “至少一次”,确保消息到达,但消息重复可能会发生. “只有一次”,确保消息到达一次.这一级别可用于如下情况,在计

MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

参看官方文档: http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm *  Java 为 MQ Telemetry Transport 创建异步发布程序 *在此任务中,您将遵循教程来修改第一个发布程序.通过修改, *使应用程序能够发送发布而不等待传递确认信息.传递确认 *信息由您创建的回调类来接收. * * * *4.使客户机断开连接 *  a

MQTT的学习研究(一)MQTT学习网站

MQTT的官方推荐网站: http://mqtt.org/software 使用IBM 的MQTT协议实现push消息地址: http://tokudu.com/2010/how-to-implement-push-notifications-for-android/ google code 下载MQTT moquette Broker 地址:    http://code.google.com/p/moquette-mqtt/    GIT 下载MQTT moquette client 地址: