物联网架构成长之路(32)-SpringBoot集成MQTT客户端

一、前言
  这里虽然是说MQTT客户端。其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情。比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT。那么就可以通过HTTP请求业务服务器。然后由业务服务器利用这个MQTT客户端进行发送数据。
  还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊。这里给出的结论是不需要。保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置Web消息转发【EMQ 3.x 版本】,还有就是这种通过业务服务器订阅根Topic来保存物联网原始数据。
  这篇博客这讨论如何把MQTT客户端集成到业务服务器上(基于SpringBoot 2.0)。下一篇博客会讲到数据保存到InfluxDB,然后如何通过Grafana进行可视化Dashboard看板模式展示。

二、配置pom.xml,引入第三方库

 1         <!-- MQTT -->
 2         <dependency>
 3             <groupId>org.springframework.boot</groupId>
 4             <artifactId>spring-boot-starter-integration</artifactId>
 5         </dependency>
 6         <dependency>
 7             <groupId>org.springframework.integration</groupId>
 8             <artifactId>spring-integration-stream</artifactId>
 9         </dependency>
10         <dependency>
11             <groupId>org.springframework.integration</groupId>
12             <artifactId>spring-integration-mqtt</artifactId>
13         </dependency>

三、MQTT客户端代码(Java)

  MqttDemoApplication.java

 1 package com.wunaozai.mqtt;
 2
 3 import org.springframework.boot.SpringApplication;
 4 import org.springframework.boot.autoconfigure.SpringBootApplication;
 5
 6 import com.wunaozai.mqtt.tools.MqttPushClient;
 7
 8 @SpringBootApplication
 9 public class MqttDemoApplication {
10
11     public static void main(String[] args) {
12         SpringApplication.run(MqttDemoApplication.class, args);
13
14         test();
15     }
16
17
18     private static void test(){
19         MqttPushClient.MQTT_HOST = "tcp://mqtt.com:1883";
20         MqttPushClient.MQTT_CLIENTID = "client";
21         MqttPushClient.MQTT_USERNAME = "username";
22         MqttPushClient.MQTT_PASSWORD = "password";
23         MqttPushClient client = MqttPushClient.getInstance();
24         client.subscribe("/#");
25     }
26 }

  MqttPushCallback.java

 1 package com.wunaozai.mqtt.tools;
 2
 3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 4 import org.eclipse.paho.client.mqttv3.MqttCallback;
 5 import org.eclipse.paho.client.mqttv3.MqttMessage;
 6 import org.slf4j.Logger;
 7 import org.slf4j.LoggerFactory;
 8
 9 /**
10  * MQTT 推送回调
11  * @author wunaozai
12  * @date 2018-08-22
13  */
14 public class MqttPushCallback implements MqttCallback {
15
16     private static final Logger log = LoggerFactory.getLogger(MqttPushCallback.class);
17
18     @Override
19     public void connectionLost(Throwable cause) {
20         log.info("断开连接,建议重连" + this);
21         //断开连接,建议重连
22     }
23
24     @Override
25     public void deliveryComplete(IMqttDeliveryToken token) {
26         //log.info(token.isComplete() + "");
27     }
28
29     @Override
30     public void messageArrived(String topic, MqttMessage message) throws Exception {
31         log.info("Topic: " + topic);
32         log.info("Message: " + new String(message.getPayload()));
33     }
34
35 }

  MqttPushClient.java

  1 package com.wunaozai.mqtt.tools;
  2
  3 import org.eclipse.paho.client.mqttv3.MqttClient;
  4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
  5 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
  6 import org.eclipse.paho.client.mqttv3.MqttMessage;
  7 import org.eclipse.paho.client.mqttv3.MqttTopic;
  8 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
  9 import org.slf4j.Logger;
 10 import org.slf4j.LoggerFactory;
 11
 12 /**
 13  * 创建一个MQTT客户端
 14  * @author wunaozai
 15  * @date 2018-08-22
 16  */
 17 public class MqttPushClient {
 18
 19     private static final Logger log = LoggerFactory.getLogger(MqttPushClient.class);
 20     public static String MQTT_HOST = "";
 21     public static String MQTT_CLIENTID = "";
 22     public static String MQTT_USERNAME = "";
 23     public static String MQTT_PASSWORD = "";
 24     public static int MQTT_TIMEOUT = 10;
 25     public static int MQTT_KEEPALIVE = 10;
 26
 27     private MqttClient client;
 28     private static volatile MqttPushClient mqttClient = null;
 29     public static MqttPushClient getInstance() {
 30         if(mqttClient == null) {
 31             synchronized (MqttPushClient.class) {
 32                 if(mqttClient == null) {
 33                     mqttClient = new MqttPushClient();
 34                 }
 35             }
 36         }
 37         return mqttClient;
 38     }
 39
 40     private MqttPushClient() {
 41         log.info("Connect MQTT: " + this);
 42         connect();
 43     }
 44
 45     private void connect() {
 46         try {
 47             client = new MqttClient(MQTT_HOST, MQTT_CLIENTID, new MemoryPersistence());
 48             MqttConnectOptions option = new MqttConnectOptions();
 49             option.setCleanSession(true);
 50             option.setUserName(MQTT_USERNAME);
 51             option.setPassword(MQTT_PASSWORD.toCharArray());
 52             option.setConnectionTimeout(MQTT_TIMEOUT);
 53             option.setKeepAliveInterval(MQTT_KEEPALIVE);
 54             option.setAutomaticReconnect(true);
 55             try {
 56                 client.setCallback(new MqttPushCallback());
 57                 client.connect(option);
 58             } catch (Exception e) {
 59                 e.printStackTrace();
 60             }
 61         } catch (Exception e) {
 62             e.printStackTrace();
 63         }
 64     }
 65     /**
 66      * 发布主题,用于通知<br>
 67      * 默认qos为1 非持久化
 68      * @param topic
 69      * @param data
 70      */
 71     public void publish(String topic, String data) {
 72         publish(topic, data, 1, false);
 73     }
 74     /**
 75      * 发布
 76      * @param topic
 77      * @param data
 78      * @param qos
 79      * @param retained
 80      */
 81     public void publish(String topic, String data, int qos, boolean retained) {
 82         MqttMessage message = new MqttMessage();
 83         message.setQos(qos);
 84         message.setRetained(retained);
 85         message.setPayload(data.getBytes());
 86         MqttTopic mqttTopic = client.getTopic(topic);
 87         if(null == mqttTopic) {
 88             log.error("Topic Not Exist");
 89         }
 90         MqttDeliveryToken token;
 91         try {
 92             token = mqttTopic.publish(message);
 93             token.waitForCompletion();
 94         } catch (Exception e) {
 95             e.printStackTrace();
 96         }
 97     }
 98     /**
 99      * 订阅某个主题 qos默认为1
100      * @param topic
101      */
102     public void subscribe(String topic) {
103         subscribe(topic, 1);
104     }
105     /**
106      * 订阅某个主题
107      * @param topic
108      * @param qos
109      */
110     public void subscribe(String topic, int qos) {
111         try {
112             client.subscribe(topic, qos);
113         } catch (Exception e) {
114             e.printStackTrace();
115         }
116     }
117 }

四、MQTT客户端代码(C#)
  为了下下篇博客Grafana有数据可以展示,我需要开发一个PC小工具【设备仿真】,用来模拟设备一直发送数据。这里就不对C#开发进行过多的说明了。通过nuget,引入第三方mqtt库。这个工具是我现在开发平台工具链的一个小工具。至于里面的Payload协议,可以不用管。读者可以根据自己的业务制定自己的通信协议。

  部分C#代码(连接服务器与发送数据)

  1 using MQTTClient.Model;
  2 using MQTTnet;
  3 using MQTTnet.Core;
  4 using MQTTnet.Core.Client;
  5 using Newtonsoft.Json;
  6 using System;
  7 using System.Collections.Generic;
  8 using System.Text;
  9 using System.Threading.Tasks;
 10 using System.Windows.Forms;
 11
 12 namespace MQTTClient
 13 {
 14     public partial class MainPage : Form
 15     {
 16         public MainPage()
 17         {
 18             InitializeComponent();
 19             init();
 20         }
 21         private void init()
 22         {
 23             txtusername.Text = "";
 24             txtpassword.Text = "";
 25             txtclientid.Text = "";
 26             txttopic.Text = "iot/UUID/device/devicepub/update";
 27         }
 28
 29         IMqttClient client = null;
 30         private async Task ConnectMqttServerAsync()
 31         {
 32             if(client == null)
 33             {
 34                 client = new MqttClientFactory().CreateMqttClient() as MqttClient;
 35                 client.ApplicationMessageReceived += mqttClientApplicationMessageReceived;
 36                 client.Connected += mqttClientConnected;
 37                 client.Disconnected += mqttClientDisconnected;
 38             }
 39             try
 40             {
 41                 await client.DisconnectAsync();
 42                 var option = getMQTTOption();
 43                 await client.ConnectAsync(option);
 44             }catch(Exception e)
 45             {
 46                 Invoke((new Action(() =>
 47                 {
 48                     lblStatus.Text = "连接服务器失败: " + e.Message;
 49                 })));
 50             }
 51         }
 52         private void mqttClientDisconnected(object sender, EventArgs e)
 53         {
 54             Invoke((new Action(() =>
 55             {
 56                 lblStatus.Text = "连接服务器失败: ERROR";
 57             })));
 58         }
 59         private void mqttClientConnected(object sender, EventArgs e)
 60         {
 61             Invoke((new Action(() =>
 62             {
 63                 lblStatus.Text = "连接服务器成功";
 64             })));
 65         }
 66         private void mqttClientApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
 67         {
 68             //本工具部收数据
 69             throw new NotImplementedException();
 70         }
 71
 72         private void btnconnect_Click(object sender, EventArgs e)
 73         {
 74             Task.Run(async () => { await ConnectMqttServerAsync(); });
 75         }
 76         private void btndisconnect_Click(object sender, EventArgs e)
 77         {
 78             client.DisconnectAsync();
 79         }
 80         private void btnsendone_Click(object sender, EventArgs e)
 81         {
 82             sendPayload();
 83         }
 84         private void btnsendts_Click(object sender, EventArgs e)
 85         {
 86             timer1.Interval = Convert.ToInt32(txttime.Text);
 87             timer1.Enabled = true;
 88         }
 89         private void btnstopts_Click(object sender, EventArgs e)
 90         {
 91             timer1.Enabled = false;
 92         }
 93         private void timer1_Tick(object sender, EventArgs e)
 94         {
 95             sendPayload();
 96         }
 97         private int sendPayload()
 98         {
 99             if (client.IsConnected == false)
100             {
101                 return -1;
102             }
103             PayloadModel payload = getPayload();
104             string json = JsonConvert.SerializeObject(payload, Formatting.Indented);
105             txtview.Text = json;
106             string topic = txttopic.Text;
107             var msg = new MqttApplicationMessage(topic, Encoding.Default.GetBytes(json),
108                 MQTTnet.Core.Protocol.MqttQualityOfServiceLevel.AtMostOnce, false);
109             client.PublishAsync(msg);
110             lblSendStatus.Text = "发送: " + DateTime.Now.ToLongTimeString();
111             return 0;
112         }
113
114         private MqttClientTcpOptions getMQTTOption()
115         {
116             MqttClientTcpOptions option = new MqttClientTcpOptions();
117             string hostname = txthostname.Text;
118             string[] host_port = hostname.Split(‘:‘);
119             int port = 1883;
120             if(host_port.Length >= 2)
121             {
122                 hostname = host_port[0];
123                 port = Convert.ToInt32(host_port[1]);
124             }
125             option.Server = hostname;
126             option.ClientId = txtclientid.Text;
127             option.UserName = txtusername.Text;
128             option.Password = txtpassword.Text;
129             option.Port = port;
130             option.CleanSession = true;
131             return option;
132         }
133
134         private PayloadModel getPayload()
135         {
136             PayloadModel payload = new PayloadModel();
137             //略
138             return payload;
139         }
140
141         Random rand1 = new Random(System.DateTime.Now.Millisecond);
142         private int getRandomNum()
143         {
144             int data = rand1.Next(0, 100);
145             return data;
146         }
147
148         int linenum = 0;
149         Random rand2 = new Random(System.DateTime.Now.Millisecond);
150         private int getLineNum()
151         {
152             int f = rand2.Next(0, 100);
153             int data = rand2.Next(0, 5);
154             if(f % 2 == 1)
155             {
156                 linenum += data;
157             }
158             else
159             {
160                 linenum -= data;
161             }
162             return linenum;
163         }
164
165     }
166 }

本文地址: https://www.cnblogs.com/wunaozai/p/11147841.html

原文地址:https://www.cnblogs.com/wunaozai/p/11147841.html

时间: 2024-10-29 10:43:02

物联网架构成长之路(32)-SpringBoot集成MQTT客户端的相关文章

物联网架构成长之路(13)-SpringBoot入门

1. 前言 下载最新版的JavaEE eclipse-jee-oxygen-2-win32-x86_64.zip 安装STS插件 Window->Eclipse Marketplace -> popular 下那个 Spring Tools(aka Spring IDE and Spring Tool Suite) 然后通过STS工具创建一个新的Spring boot工程,这里就不细说了.网上资料很多,也比较简单就可以搭建起来.后面对SpringBoot也只是简单的提一下,还有说一下注意点.没

物联网架构成长之路(56)-SpringCloudGateway+JWT实现网关鉴权

0. 前言 结合前面两篇博客,前面博客实现了Gateway网关的路由功能.此时,如果每个微服务都需要一套帐号认证体系就没有必要了.可以在网关处进行权限认证.然后转发请求到后端服务.这样后面的微服务就可以直接调用,而不需要每个都单独一套鉴权体系.参考了Oauth2和JWT,发现基于微服务,使用JWT会更方便一些,所以准备集成JWT作为微服务架构的认证方式. [https://www.cnblogs.com/wunaozai/p/12512753.html] 物联网架构成长之路(54)-基于Naco

物联网架构成长之路(0)-目录

一.基础 [http://www.cnblogs.com/wunaozai/p/8067621.html] 物联网架构成长之路(1)-前言 [http://www.cnblogs.com/wunaozai/p/8075640.html] 物联网架构成长之路(2)-脚手架工具准备 [http://www.cnblogs.com/wunaozai/p/8082332.html] 物联网架构成长之路(3)-EMQ消息服务器了解

物联网架构成长之路(33)-EMQ数据存储到influxDB

一.前言 时隔一年半,技术变化特别快,学习也要跟上才行.以前写过EMQ数据转存问题,当时用了比较笨的方法,通过写插件的方式,把MQTT里面的数据发送到数据库进行存储.当时也是为了学习erlang和emq.现在随着对物联网的深入,也结合实际需求,不停的学习.下面将介绍我实验测试可行的物联网数据分析解决方案.采用的还是开源方案.通过订阅MQTT的根Topic,把所有物联网数据转存到InfluxDB时序数据库,然后通过Grafana进行图表显示.这应该是目前比较流行的方案.二.安装InfluxDB I

物联网架构成长之路(12)-物联网架构小结1

1. 说明 这一小节,也不具体讲些什么了.最近一个半月都在摸鱼,没什么事做,慢慢学习着SpringBoot和SpringCloud.下面两张图是进行的一次小结.以后随着深入,整个架构肯定是会变的.现在记录一下,每个项目成长都是有一个过程的. 原文地址:https://www.cnblogs.com/wunaozai/p/8312891.html

物联网架构成长之路(24)-Docker练习之Compose容器编排

0.前言 一开始学的之后,是想一步到位直接上Kubernetes(K8s)的,后面没想到,好像有点复杂,有些概念不是很懂.因此学习东西还是要循序渐进,慢慢来.先了解单机编排技术Docker Compose,了解一些技术细节及原理后,在入手K8s.还是不能一口吃成胖子,要多吃几口才可以.而且目前公司都是一些小项目,能用得上DockerCompose已经很不错了,还想要上K8s,估计是不现实的. 1. 安装 可以通过运行下面命令进行安装, 1 curl -L https://github.com/d

物联网架构成长之路(35)-利用Netty解析物联网自定义协议

一.前言 前面博客大部分介绍了基于EMQ中间件,通信协议使用的是MQTT,而传输的数据为纯文本数据,采用JSON格式.这种方式,大部分一看就知道是熟悉Web开发.软件开发的人喜欢用的方式.由于我也是做web软件开发的,也是比较喜欢这种方式.阿里的物联网平台,也是推荐这种方式.但是,但是做惯硬件开发,嵌入式开发就比较喜欢用裸TCP-Socket连接.采用的是二进制协议.基于此大部分应用场合为了兼容旧设备,就需要单独开发一个TCP服务器的网关.这里使用以前学过的,也是比较流行的Netty框架. 话不

物联网架构成长之路(36)-Vue前端入门

1. 前言 物联网平台,需要有一个类似大屏看板的功能. 找了一圈,发现阿里已经有对应的DataV产品,但是那个价格有点贵啊.所以找了这个[http://datav.jiaminghi.com/demo/],这看起来也是挺不错的.就是需要了解一些前端Vue.说到前端,我之前好久就想入门了.断断续续看视频,写Demo,写小程序.但都处于入门阶段.而且前端变化太快了.半年没看,就各种更新了.不过还是迟早要学的. 2. 安装环境 安装IDE,这里推荐VSCode,然后安装Vetur 插件 Google

物联网架构成长之路(11)-Redis缓存主从复制

1. 说明 在我的物联网平台框架框架中,会用到Redis这个中间件.作为EMQ权限认证的缓存.https://www.cnblogs.com/think-in-java/p/5123884.html 2. 编译&运行 1 wget http://download.redis.io/releases/redis-4.0.6.tar.gz 2 make && make test && make PREFIX=/home/user/workspace/redis inst