MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例

使用IBM MQTTv3实现相关的发布订阅功能

MQTTv3的发布消息的实现:

Java代码  

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
  4. import com.ibm.micro.client.mqttv3.MqttMessage;
  5. import com.ibm.micro.client.mqttv3.MqttTopic;
  6. /**
  7. * MQTTV3的发布消息类
  8. *
  9. * @author longgangbai
  10. */
  11. public class MQTTPub {
  12. public static void doTest(){
  13. try {
  14. MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
  15. MqttTopic topic = client.getTopic("tokudu/china");
  16. MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
  17. message.setQos(1);
  18. client.connect();
  19. while(true){
  20. MqttDeliveryToken token = topic.publish(message);
  21. while (!token.isComplete()){
  22. token.waitForCompletion(1000);
  23. }
  24. }
  25. } catch (Exception e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. }

MQTTV3的订阅消息类

Java代码  

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttClient;
  3. import com.ibm.micro.client.mqttv3.MqttConnectOptions;
  4. /**
  5. * MQTTV3的订阅消息类
  6. *
  7. * @author longgangbai
  8. */
  9. public class MQTTSubsribe {
  10. public static String doTest() {
  11. try {
  12. //创建MqttClient
  13. MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
  14. //回调处理类
  15. CallBack callback = new CallBack();
  16. client.setCallback(callback);
  17. //创建连接可选项信息
  18. MqttConnectOptions conOptions = new MqttConnectOptions();
  19. //
  20. conOptions.setCleanSession(false);
  21. //连接broker
  22. client.connect(conOptions);
  23. //发布相关的订阅
  24. client.subscribe("tokudu/china", 1);
  25. //client.disconnect();
  26. } catch (Exception e) {
  27. e.printStackTrace();
  28. return "failed";
  29. }
  30. return "success";
  31. }
  32. }

回调处理类处理订阅的消息类

Java代码  

  1. package com.etrip.mqttv3;
  2. import com.ibm.micro.client.mqttv3.MqttCallback;
  3. import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
  4. import com.ibm.micro.client.mqttv3.MqttMessage;
  5. import com.ibm.micro.client.mqttv3.MqttTopic;
  6. /**
  7. * 回调处理类
  8. * 处理订阅的消息类
  9. *
  10. * @author longgangbai
  11. */
  12. public class CallBack implements MqttCallback {
  13. public CallBack() {
  14. }
  15. /**
  16. * 接收到信息的处理
  17. */
  18. public void messageArrived(MqttTopic topic, MqttMessage message) {
  19. try {
  20. System.out.println(" MQTTSubsribe  message.toString()"+message.toString());
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. }
  25. public void connectionLost(Throwable cause) {
  26. }
  27. public void deliveryComplete(MqttDeliveryToken token) {
  28. }
  29. }

测试类:

Java代码  

  1. package com.etrip.mqttv3;
  2. /**
  3. * MQTTV3的测试类
  4. *
  5. * @author longgangbai
  6. */
  7. public class MQTTMain {
  8. public static void main(String[] args) {
  9. //订阅消息的方法
  10. MQTTSubsribe.doTest();
  11. //发布消息的类
  12. MQTTPub.doTest();
  13. }
  14. }
时间: 2024-08-10 13:39:56

MQTT的学习研究(十三) IBM MQTTV3 简单发布订阅实例的相关文章

MQTT的学习研究(十一) IBM MQTT 简单发布订阅实例

package com.etrip.push; import com.ibm.mqtt.MqttAdvancedCallback; import com.ibm.mqtt.MqttClient; import com.ibm.mqtt.MqttException; import com.ibm.mqtt.MqttSimpleCallback; /** * Android推送方案分析(MQTT/XMPP/GCM) 方案1. 使用GCM服务(Google Cloud Messaging) 简介:Go

MQTT的学习研究(十)【转】mosquitto——一个开源的mqtt代理

MQTT(MQ Telemetry Transport),消息队列遥测传输协议,轻量级的发布/订阅协议,适用于一些条件比较苛刻的环境,进行低带宽.不可靠或间歇性的通信.值得一提的是mqtt提供三种不同质量的消息服务: “至多一次”,消息发布完全依赖底层 TCP/IP 网络.会发生消息丢失或重复.这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送. “至少一次”,确保消息到达,但消息重复可能会发生. “只有一次”,确保消息到达一次.这一级别可用于如下情况,在计

MQTT的学习研究(三)moquette-mqtt 的使用之mqtt服务发布主题信息

接着上一篇的moquette-mqtt 的使用之broker启动之后,我们需要启动moquette-mqtt 的服务端发布消息. 在moquette-mqtt 的mqtt-client中三种方式实现发布消息的方式: 1.采用阻塞式的连接的(BlockingConnection) 2.采用回调式的连接 (CallbackConnection) 3.采用Future样式的连接(FutureConnection) 本文采用阻塞式作为实验对象. MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅

MQTT的学习研究(十四) MQTT moquette 的 Callback API 消息发布订阅的实现

在moquette-mqtt中提供了回调callback模式的发布和订阅但是在订阅之后没有发现有消息接收的方法,参看moquette-mqtt中Block,Future式的发布订阅基础是callback式订阅发布,但是本人在研究源代码测试,发现 callback方式接收没有成功.所以本文中只是callback式的发布和订阅没有消息接收的过程,尚未查到原因. 采用Callback式 发布主题 Java代码   package com.etrip.mqtt.callback; import java

MQTT的学习研究(二)moquette-mqtt 的使用之mqtt broker的启动

在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式.具体参看官网,Moquette是基于Apache Mina 的模型的一个Java MQTT broker.使用过Mina的同学发现其实broker的启动过程就是一个Mina应用的启动. 在MQTT moquette 中采用MINA作为底层消息的传递方式  本类的目的启动MQTT moquette Broker 的方式,本文的源代码来自  moquette-broker-0.1-jar-with-de

activeMQ学习(2)---------点对点、发布订阅的消息代码实现

以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习 点对点的实现 2 @Test public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名

MQTT的学习研究(六) MQTT moquette 的 Blocking API 订阅消息客户端使用

* 使用 Java 为 MQ Telemetry Transport 创建订户 * 在此任务中,您将遵循教程来创建订户应用程序.订户将针对主题创建预订并接收该预订的发布. * 提供了一个示例订户应用程序 Subscribe.Subscribe 将创建预订主题 MQTT Examples,并等待获 * 得该预订的发布,等待时间为 30 秒.订户可以创建预订并等待获得发布.它还可以接收发送至先前 * 为同一客户机标识创建的预订的发布. * MqttConnectionOptions.cleanSes

MQTT的学习研究(五) MQTT moquette 的 Blocking API 发布消息服务端使用

参看官方文档: http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/index.jsp?topic=/com.ibm.mq.amqtat.doc/tt00000_.htm *  Java 为 MQ Telemetry Transport 创建异步发布程序 *在此任务中,您将遵循教程来修改第一个发布程序.通过修改, *使应用程序能够发送发布而不等待传递确认信息.传递确认 *信息由您创建的回调类来接收. * * * *4.使客户机断开连接 *  a

MQTT的学习研究(八)基于HTTP DELETE MQTT 订阅消息服务端使用

HTTP DELETE 订阅主题请求协议和响应协议http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21240_.htm 请求响应头各个字段的含义的讲解http://publib.boulder.ibm.com/infocenter/wmqv7/v7r0/topic/com.ibm.mq.csqzau.doc/ts21250_.htm 响应错误处理http://publib.boul