物联网架构成长之路(33)-EMQ数据存储到influxDB

一、前言
  时隔一年半,技术变化特别快,学习也要跟上才行。以前写过EMQ数据转存问题,当时用了比较笨的方法,通过写插件的方式,把MQTT里面的数据发送到数据库进行存储。当时也是为了学习erlang和emq。现在随着对物联网的深入,也结合实际需求,不停的学习。
下面将介绍我实验测试可行的物联网数据分析解决方案。采用的还是开源方案。通过订阅MQTT的根Topic,把所有物联网数据转存到InfluxDB时序数据库,然后通过Grafana进行图表显示。这应该是目前比较流行的方案。
二、安装InfluxDB
  InfluxDB是时序数据库,特别适合做数据监控和物联网数据存储。【也可以说适合我现在参与架构的物联网平台的技术选型】
  针对InfluxDB也没有什么可以多说的,详细可以查阅官方文档,或者网上的博客文章。我写的都是平时实践过程的操作记录,写博客,主要是为了以后忘记的时候,回看查阅用的。另一方面是加强跟同行读者交流的渠道。有一点要注意,一开始为了新,我用InfluxDB 2.0 版本,发现不行,那个太新的,很多对应的开发库没有完善好。所以还是采用InfluxDB 1.x版本。这样在spring boot 里面也有自带的starter库可以使用,操作起来特别方便。
  InfluxDB官方文档: https://docs.influxdata.com/influxdb/v1.7/   安装:

1 wget -qO- https://repos.influxdata.com/influxdb.key | sudo apt-key add -
2 echo "deb https://repos.influxdata.com/debian stretch stable" | sudo tee /etc/apt/sources.list.d/influxdb.list
3 apt-get update
4 apt-get install influxdb

三、InfluxDB基础命令使用
  修改配置文件 /etc/influxdb/influxdb.conf

1 [http]
2     enabled = true
3     bind-address = ":8086"
4     auth-enabled = false
5     log-enabled = true
6     write-tracing = false
7     pprof-enabled = true

  这里先设置不授权,等一下创建用户后,再修改为 auth-enabled=true,这个一般也是属于内部应用,不用ssl加密了。即使要也是通过Nginx进行反向代理。

  用户管理

1 --显示所有用户:
2 show users
3 --新增用户:
4 --普通用户 (注意:用户名用双引号,密码用单引号)
5 create user "user" with password ‘user‘
6 --管理员用户
7 create user "admin" with password ‘admin‘ with all privileges
8 --删除用户
9 drop user "user"

  创建好后,注意修改influxdb.conf 中的 auth-enable=true, 然后重启服务 service influxdb restart

1 --创建数据库
2 create database wunaozai
3 --创建好后,就可以不用管了。一些简单的操作,可以参考其他博客资料。
4 --删除数据库
5 drop database wunaozai
6 --切换使用数据库
7 use wunaozai
1 --显示所有表
2 show measurements
3 --新建表(往表里面插入数据,就是新建表了)
4 --插入数据的语法有点特殊,采用的是InfluxDB特有的语法:
5 <measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
6 insert wnztable,tag=mqtt value=33
7 --删除表
8 drop measurements wunaozai

  其他的高级语法,不如查询还有策略就不展开,暂时不是重点,等以后深入研究后,在写博客介绍。

四、EMQ转存InfluxDB
  EMQ如何把消息转存到InfluxDB呢,就是本章节的重点,利用上一篇博客中提到的,SpringBoot客户端监听EMQ的根Topic,然后把需要进行转存的Topic及其对应的Payload,构造成InfluxDB表数据,然后插入到InfluxDB中。
  下面介绍一下用到的InfluxDB工具类
  先在pom.xml中引入InfluxDB相关jar包

1 <!-- https://mvnrepository.com/artifact/org.influxdb/influxdb-java -->
2 <dependency>
3 <groupId>org.influxdb</groupId>
4 <artifactId>influxdb-java</artifactId>
5 <version>2.15</version>
6 </dependency>

  相关工具类代码

 1 import org.influxdb.InfluxDB;
 2 import org.influxdb.InfluxDBFactory;
 3 import org.influxdb.dto.Point;
 4
 5 /**
 6  * 数据缓存至InfluxDB
 7  * @author wunaozai
 8  *
 9  */
10 public class InfluxDBService {
11
12     private static String INFLUXDB_URL = "http://127.0.0.1:8086";
13     private static String INFLUXDB_USERNAME = "admin";
14     private static String INFLUXDB_PASSWORD = "admin";
15     private static String INFLUXDB_DATABASE = "wunaozai"; //注意这里对应数据库,一般要先在命令行中创建数据库
16     private static InfluxDB influxDB = null;
17
18     private InfluxDBService(){
19
20     }
21
22     public static InfluxDB getInstance(){
23         if(influxDB == null){
24             influxDB = InfluxDBFactory.connect(INFLUXDB_URL, INFLUXDB_USERNAME, INFLUXDB_PASSWORD);
25             influxDB.setDatabase(INFLUXDB_DATABASE);
26             influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);
27         }
28         return influxDB;
29     }
30     public static int writePoint(Point point){
31         getInstance().write(point);
32         return 0;
33     }
34 }

  在上一篇博客中的MqttPushCallback.java中的

public void messageArrived(String topic, MqttMessage message);

  这个函数来转存。

 1     @Override
 2     public void messageArrived(String topic, MqttMessage message) throws Exception {
 3         try{
 4             System.out.println(topic);
 5             String json = new String(message.getPayload());
 6             MQTTProtocolVoModel protocol = BaseModel.parseJSON(json, MQTTProtocolVoModel.class);
 7
 8             String cmd = protocol.getCmd();
 9             String customer_id = protocol.getProfile().getCustomer_id(); //厂商ID
10             String product_id = protocol.getProfile().getProduct_id(); //产品ID
11             String device_sn = protocol.getProfile().getDevice_sn(); //设备ID
12             Map<String, String> para = protocol.getDatapoint().getPara();
13             Map<String, Object> fields = new HashMap<>(); //这里是客户端传过来的数据点,就是需要被显示和监控的数据
14             for (Map.Entry<String, String> entry : para.entrySet()) {
15                 fields.put(entry.getKey(), entry.getValue());
16             }
17             Map<String, String> tag = new HashMap<>();
18             tag.put("customer_id", customer_id);
19             tag.put("product_id", product_id);
20             tag.put("device_sn", device_sn);
21             //这里可以添加很多Tag,为了简单演示,这里隐藏部分Tag
22             //构造数据点
23             Point point = Point.measurement("datapoint")
24                     .tag(tag).fields(fields).build();
25             InfluxDBService.writePoint(point);
26         }catch (Exception e) {
27             e.printStackTrace();
28         }
29     }

  这里可以通过EMQ Dashboard自带的Websocket进行发送,也可以通过前面小节用到的PC工具,网上Web端MQTT客户也很多,可以通过任意MQTT工具进行测试。
  下面这个是查询InfluxDB得到的表数据。

参考资料:
  https://www.cnblogs.com/jason1990/p/11076310.html
  https://blog.csdn.net/caodanwang/article/details/51967393
  https://docs.influxdata.com/influxdb/v1.7/
  https://www.cnblogs.com/shhnwangjian/p/6897216.html

本文地址: https://www.cnblogs.com/wunaozai/p/11160730.html

原文地址:https://www.cnblogs.com/wunaozai/p/11160730.html

时间: 2024-11-05 16:41:05

物联网架构成长之路(33)-EMQ数据存储到influxDB的相关文章

物联网架构成长之路(4)-EMQ插件创建

1. 说明 以下用到的知识,是建立在我目前所知道的知识领域,以后如果随着知识的拓展,不一定会更新内容.由于不是EMQ公司的人,EMQ的文档又很少,很多知识点都是靠猜的.2. 一些资料 架构设计 http://emqtt.com/docs/v2/design.html 扩展插件 http://emqtt.com/docs/v2/plugins.html  一些自带的插件3. 写插件 经过上一篇博客讲解的,编译_relx后,在deps目录下就会包含了所有依赖包以及插件包,在这个目录下有个emq_pl

物联网架构成长之路(7)-EMQ权限验证小结

1. 前言 经过前面几小节,讲了一下插件开发,这一小节主要对一些代码和目录结构进行讲解,这些都是测试过程中一些个人经验,不一定是官方做法.而且也有可能会因为版本不一致导致差异. 2. 目录结构 这个目录结构整体就是从 emq-plugin-template 复制一份过来的,然后修改. .erlang.mk 这个是编译过程的临时文件,不用管 .git 这个是源代码版本管理,不用管 data 一些系统数据文件,不用管 deps 这个是在make编译的时候,自动下载一些依赖包,不用管 ebin 这个是

物联网架构成长之路(5)-EMQ插件配置

1. 前言 上一小结说了插件的创建,这一节主要怎么编写代码,以及具体流程之类的.2. 增加一句Hello World 修改 ./deps/emq_plugin_wunaozai/src/emq_plugin_wunaozai.erl 增加一行Hello World 增加后,保存 1 make clean 2 make 3 cp -r ebin ../../_rel/emqttd/lib/emq_plugin_wunaozai-2.3.1 这样就把最新版本复制到_rel 目录下了. 回到eqm-r

物联网架构成长之路(50)-EMQ配置SSL证书,实现MQTTs协议

0. 前言 EMQ是带有SSL功能的,需要进行简单的配置,才能使用.下面就简单说一下如何实现自签证书. 1. 利用OpenSSL签发证书 1 ? cat createCA.sh 2 #/bin/sh 3 # 生成自签名的CA key和证书 4 openssl genrsa -out ca.key 2048 5 openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -subj "/CN=www.wunaozai.com"

物联网架构成长之路(0)-目录

一.基础 [http://www.cnblogs.com/wunaozai/p/8067621.html] 物联网架构成长之路(1)-前言 [http://www.cnblogs.com/wunaozai/p/8075640.html] 物联网架构成长之路(2)-脚手架工具准备 [http://www.cnblogs.com/wunaozai/p/8082332.html] 物联网架构成长之路(3)-EMQ消息服务器了解

物联网架构成长之路(56)-SpringCloudGateway+JWT实现网关鉴权

0. 前言 结合前面两篇博客,前面博客实现了Gateway网关的路由功能.此时,如果每个微服务都需要一套帐号认证体系就没有必要了.可以在网关处进行权限认证.然后转发请求到后端服务.这样后面的微服务就可以直接调用,而不需要每个都单独一套鉴权体系.参考了Oauth2和JWT,发现基于微服务,使用JWT会更方便一些,所以准备集成JWT作为微服务架构的认证方式. [https://www.cnblogs.com/wunaozai/p/12512753.html] 物联网架构成长之路(54)-基于Naco

苏宁人工智能研发中心智能创意平台架构成长之路(二)--大数据架构篇

苏宁人工智能研发中心智能创意平台架构成长之路(一)--长篇开篇 https://www.cnblogs.com/laoqing/p/11326132.html   我们接着第一篇继续. (这是第二篇大数据架构篇,成长之路序列会包含多篇,笔者作为这个平台的架构兼技术经理,充分讲述其中的迭代心酸之路以及中间遇到的问题和解决方案) 声明:文章不涉及公司内部技术资料的外泄,涉及的图片都是重画的简易架构图,主要通过架构的演进,讲述分享技术的迭代之路和过程. 在第二轮迭代完成后,第三轮迭代中,我们就开始做平

物联网架构成长之路(32)-SpringBoot集成MQTT客户端

一.前言 这里虽然是说MQTT客户端.其实对于服务器来说,这里的一个具有超级权限的MQTT客户端,就可以做很多事情.比如手机APP或者网页或者第三方服务需要发送数据到设备,但是这些又不是设备,又不能让他们连到MQTT.那么就可以通过HTTP请求业务服务器.然后由业务服务器利用这个MQTT客户端进行发送数据. 还有,之前好多人问我,怎么保存这些物联网数据,真的要像前面的博客那样,要自己写插件吗?特别麻烦的啊.这里给出的结论是不需要.保存数据,除了写EMQ插件,还可以在EMQ的规则引擎上进行配置We

物联网架构成长之路(35)-利用Netty解析物联网自定义协议

一.前言 前面博客大部分介绍了基于EMQ中间件,通信协议使用的是MQTT,而传输的数据为纯文本数据,采用JSON格式.这种方式,大部分一看就知道是熟悉Web开发.软件开发的人喜欢用的方式.由于我也是做web软件开发的,也是比较喜欢这种方式.阿里的物联网平台,也是推荐这种方式.但是,但是做惯硬件开发,嵌入式开发就比较喜欢用裸TCP-Socket连接.采用的是二进制协议.基于此大部分应用场合为了兼容旧设备,就需要单独开发一个TCP服务器的网关.这里使用以前学过的,也是比较流行的Netty框架. 话不