使用IBM MQTTv3实现相关的发布订阅功能
MQTTv3的发布消息的实现:
Java代码
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- /**
- * MQTTV3的发布消息类
- *
- * @author longgangbai
- */
- public class MQTTPub {
- public static void doTest(){
- try {
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883","mqttserver-pub");
- MqttTopic topic = client.getTopic("tokudu/china");
- MqttMessage message = new MqttMessage("Hello World. Hello IBM".getBytes());
- message.setQos(1);
- client.connect();
- while(true){
- MqttDeliveryToken token = topic.publish(message);
- while (!token.isComplete()){
- token.waitForCompletion(1000);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
MQTTV3的订阅消息类
Java代码
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttClient;
- import com.ibm.micro.client.mqttv3.MqttConnectOptions;
- /**
- * MQTTV3的订阅消息类
- *
- * @author longgangbai
- */
- public class MQTTSubsribe {
- public static String doTest() {
- try {
- //创建MqttClient
- MqttClient client = new MqttClient("tcp://192.168.208.46:1883", "java_client0000000000");
- //回调处理类
- CallBack callback = new CallBack();
- client.setCallback(callback);
- //创建连接可选项信息
- MqttConnectOptions conOptions = new MqttConnectOptions();
- //
- conOptions.setCleanSession(false);
- //连接broker
- client.connect(conOptions);
- //发布相关的订阅
- client.subscribe("tokudu/china", 1);
- //client.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- return "failed";
- }
- return "success";
- }
- }
回调处理类处理订阅的消息类
Java代码
- package com.etrip.mqttv3;
- import com.ibm.micro.client.mqttv3.MqttCallback;
- import com.ibm.micro.client.mqttv3.MqttDeliveryToken;
- import com.ibm.micro.client.mqttv3.MqttMessage;
- import com.ibm.micro.client.mqttv3.MqttTopic;
- /**
- * 回调处理类
- * 处理订阅的消息类
- *
- * @author longgangbai
- */
- public class CallBack implements MqttCallback {
- public CallBack() {
- }
- /**
- * 接收到信息的处理
- */
- public void messageArrived(MqttTopic topic, MqttMessage message) {
- try {
- System.out.println(" MQTTSubsribe message.toString()"+message.toString());
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public void connectionLost(Throwable cause) {
- }
- public void deliveryComplete(MqttDeliveryToken token) {
- }
- }
测试类:
Java代码
- package com.etrip.mqttv3;
- /**
- * MQTTV3的测试类
- *
- * @author longgangbai
- */
- public class MQTTMain {
- public static void main(String[] args) {
- //订阅消息的方法
- MQTTSubsribe.doTest();
- //发布消息的类
- MQTTPub.doTest();
- }
- }
时间: 2024-10-12 08:44:28