使用MQTTnet搭建MQTT Server

前言

MQTTnet 是MQTT协议的.NET 开源类库

MQTTnet is a .NET library for MQTT based communication. It provides a MQTT client and a MQTT server. The implementation is based on the documentation from http://mqtt.org/.

MQTT协议是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分,http://mqtt.org/。

MQTT is a machine-to-machine (M2M)/"Internet of Things" connectivity protocol. It was designed as an extremely lightweight publish/subscribe messaging transport. It is useful for connections with remote locations where a small code footprint is required and/or network bandwidth is at a premium. For example, it has been used in sensors communicating to a broker via satellite link, over occasional dial-up connections with healthcare providers, and in a range of home automation and small device scenarios. It is also ideal for mobile applications because of its small size, low power usage, minimised data packets, and efficient distribution of information to one or many receivers

通过https://github.com/mqtt/mqtt.github.io/wiki/servers 找到官方推荐的服务端软件,比如:Apache Apollo(ActiveMQ的分支),通过https://github.com/mqtt/mqtt.github.io/wiki/libraries可以找到推荐的客户端类库。

MQTTnet

通过Nuget搜索MQTT找到了MQTTnet,它不是下载量最多的,也不在官方推荐列表中,主要是因为同时支持客户端和服务端,所以开始下载试用,结果证明有坑,源码在vs2015中不能打开。

开源地址:https://github.com/chkr1011/MQTTnet

首先把官方的控制台程序改成winform的,界面如下:

 public partial class Form1 : Form
    {

        private MqttServer mqttServer = null;
        private MqttClient mqttClient = null;

        public Form1()
        {
            InitializeComponent();
        }

        private void button_启动服务端_Click(object sender, EventArgs e)
        {

            MqttTrace.TraceMessagePublished += MqttTrace_TraceMessagePublished;

            if (this.mqttServer == null)
            {
                try
                {
                    var options = new MqttServerOptions
                    {
                        ConnectionValidator = p =>
                        {
                            if (p.ClientId == "SpecialClient")
                            {
                                if (p.Username != "USER" || p.Password != "PASS")
                                {
                                    return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;
                                }
                            }

                            return MqttConnectReturnCode.ConnectionAccepted;
                        }
                    };

                    mqttServer = new MqttServerFactory().CreateMqttServer(options);

                }
                catch (Exception ex)
                {
                    MessageBox.Show(ex.Message);
                    return;

                }
            }
            mqttServer.Start();

            this.txt_服务器.AppendText( $">> 启动成功..." + Environment.NewLine);
        }

        private void MqttTrace_TraceMessagePublished(object sender, MqttTraceMessagePublishedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_服务器.AppendText($">> [{e.ThreadId}] [{e.Source}] [{e.Level}]: {e.Message}" + Environment.NewLine);
                if (e.Exception != null)
                {
                    this.txt_服务器.AppendText( e.Exception + Environment.NewLine);
                }
            }));
        }

        private void button_停止服务端_Click(object sender, EventArgs e)
        {
            if (mqttServer != null)
            {
                mqttServer.Stop();
            }
            this.txt_服务器.AppendText( $">> 停止成功" + Environment.NewLine);
        }

        private async void button_启动客户端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient == null)
            {
                var options = new MqttClientOptions
                {
                    Server = "192.168.2.54",
                    ClientId = "zbl",
                    CleanSession = true
                };
                this.mqttClient = new MqttClientFactory().CreateMqttClient(options);
                this.mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;
                this.mqttClient.Connected += MqttClient_Connected;
                this.mqttClient.Disconnected += MqttClient_Disconnected;

            }
            try
            {
                await this.mqttClient.ConnectAsync();
            }
            catch(Exception ex)
            {
                this.txt_客户端.AppendText( $"### CONNECTING FAILED ###" + Environment.NewLine);
            }

        }

        private void MqttClient_Connected(object sender, EventArgs e)
        {

            this.Invoke(new Action( async () =>
            {

                this.txt_客户端.AppendText( $"### CONNECTED WITH SERVER ###" + Environment.NewLine);
                await this.mqttClient.SubscribeAsync(new List<TopicFilter>{
                        new TopicFilter("#", MqttQualityOfServiceLevel.AtMostOnce)
                    });
                this.txt_客户端.AppendText($"### SUBSCRIBED  ###" + Environment.NewLine);
            }));
        }

        private void MqttClient_Disconnected(object sender, EventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客户端.AppendText( $"### DISCONNECTED FROM SERVER ###" + Environment.NewLine);

            }));
        }

        private void MqttClient_ApplicationMessageReceived(object sender, MQTTnet.Core.MqttApplicationMessageReceivedEventArgs e)
        {
            this.Invoke(new Action(() =>
            {
                this.txt_客户端.AppendText( $">> Topic:{e.ApplicationMessage.Topic} Payload:{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} QoS:{e.ApplicationMessage.QualityOfServiceLevel}   Retain:{e.ApplicationMessage.Retain}" + Environment.NewLine);

            }));
        }

        private async void button_停止客户端_Click(object sender, EventArgs e)
        {
            if (this.mqttClient != null)
            {
                await this.mqttClient.DisconnectAsync();
            }
            this.txt_客户端.AppendText( $">> 停止成功" + Environment.NewLine);
        }

        private async void button_发送_Click(object sender, EventArgs e)
        {

            //var options = new MqttClientOptions
            //{
            //    Server = "localhost"
            //};
            //var client = new MqttClientFactory().CreateMqttClient(options);

            //await client.ConnectAsync();

            var applicationMessage = new MqttApplicationMessage(this.txt_topic.Text,
                Encoding.UTF8.GetBytes(this.txt_message.Text), MqttQualityOfServiceLevel.AtMostOnce, false);

            await this.mqttClient.PublishAsync(applicationMessage);

        }

        private void button_清空服务端_Click(object sender, EventArgs e)
        {
            this.txt_服务器.Text = "";
        }
    }

Form1.cs

需要注意的是按照作者说明是

while (true)
{
    Console.ReadLine();

    var applicationMessage = new MqttApplicationMessage(
        "A/B/C",
        Encoding.UTF8.GetBytes("Hello World"),
        MqttQualityOfServiceLevel.AtLeastOnce,
        false
    );

    await client.PublishAsync(applicationMessage);
}

 客户端死活都收不到消息,改成 MqttQualityOfServiceLevel.AtMostOnce  就可以了,找问题时尝试下载源码调试因为vs2015打不开项目也折腾了一会。这个问题提交了issues,期待作者回复。

最后还是建议使用 Apache Apollo,几篇博文比较有用 MQTT协议的简单介绍和服务器的安装编写和MQTT服务器通信的Android客户端程序,参考  【MQTT】在Windows下搭建MQTT服务器

搭建了MQTT服务端之后需要在Esp8266模块和手机App中分别实现客户端功能,稍后待续。。。。

时间: 2024-11-08 13:49:15

使用MQTTnet搭建MQTT Server的相关文章

用VLC搭建流媒体server

VLC开元项目相当强大,我们既能够将其作为播放核心用于二次开发,又能够将其作为高性能的流媒体server.今篇博客主要讲用VLC搭建流媒体server. VLC搭建流媒体server步骤非常easy:选择输入数据.选择输出格式.选择编码器.选择流通量.输入数据来源非常多,能够是本地文件,能够是网络流,能够是音频.甚至图片.编码器选择主要是选择音频编码器.视频编码器用以确定声音输出质量和图像质量.输出格式较多,能够是本地文件,能够是网络流,也能够是组播:流通量眼下保留.实际没有多少用处. 以下我用

从0开始搭建SQL Server AlwaysOn 第一篇(配置域控)

从0开始搭建SQL Server AlwaysOn 第一篇(配置域控) 网上的 AlwaysOn可以说是非常的多,也可以说是非常的千篇一律,而且很多都是搭建非常顺利的,没有坑的,难道搭建 AlwaysOn真的可以这麽顺利吗?????? 由于公司使用的是最新的Windows Server 2012 R2,网上用的都是Windows Server 2008 R2 ,2012 R2和2008 R2在故障转移集群界面菜单和AD 服务管理工具 已经有较大变化,有一些步骤跟Windows Server 20

在Windows Server 2012 R2中搭建SQL Server 2012故障转移集群

需要说明的是我们搭建的SQL Server故障转移集群(SQL Server Failover Cluster)是可用性集群,而不是负载均衡集群,其目的是为了保证服务的连续性和可用性,而不是为了提高服务的性能. SQL Server始终在负载均衡集群方面都缺少自己的产品,多由第三方厂家提供,但SQL Server故障转移集群却由来已久,在SQL Server 2012还提供了一个可用性组(AlwaysOn High Availability Groups)的新特性,我们知道微软的故障转移集群(W

第三篇——第二部分——第二文 计划搭建SQL Server镜像

原文:第三篇--第二部分--第二文 计划搭建SQL Server镜像 本文紧跟上一章:SQL Server镜像简介 本文出处:http://blog.csdn.net/dba_huangzj/article/details/27203053 俗话说:工欲善其事必先利其器.计划好如何部署和使用镜像,可以减少很多不必要的风险.本文将按照三步骤的形式展示,但是要注意这不是唯一的标准,具体情况具体分析. 第一步:了解环境 在搭建SQL Server镜像时,必须先了解你所要部署的环境,才能决定镜像的配置项

搭建Git Server

windows上如何搭建Git Server Git在版本控制方面,相比与SVN有更多的灵活性,对于开源的项目,我们可以托管到Github上面,非常方便,但是闭源的项目就会收取昂贵的费用.那么私有项目,如何用Git进行代码版本控制呢?我们可以自己构建Git服务器.一般来说,在Linux上搭建Git的教程比较多,但是如何在Windows Server平台下搭建Git服务器呢? 对于很多.NET用户来说,代码编写的工具是Visual Studio,该工具是不支持SSH协议的,通过搜索和对比,发现Gi

Linux 搭建SVN server

一. SVN 简单介绍 Subversion(SVN) 是一个开源的版本号控制系統, 也就是说 Subversion 管理着随时间改变的数据. 这些数据放置在一个中央资料档案库 (repository) 中. 这个档案库非常像一个普通的文件server, 只是它会记住每一次文件的变动. 这样你就能够把档案恢复到旧的版本号, 或是浏览文件的变动历史. SVN中的一些概念 : (1). repository(源码库) 源码统一存放的地方 (2). Checkout (提取) 当你手上没有源码的时候,

从0开始搭建SQL Server AlwaysOn 第二篇(配置故障转移集群)

从0开始搭建SQL Server AlwaysOn 第二篇(配置故障转移集群) 第一篇http://www.cnblogs.com/lyhabc/p/4678330.html 第二篇http://www.cnblogs.com/lyhabc/p/4682028.html 这一篇是从0开始搭建SQL Server AlwaysOn 的第二篇,主要讲述如何搭建故障转移集群,因为AlwaysOn是基于Windows的故障转移集群的 在讲解步骤之前需要了解一下故障转移集群仲裁配置 下面图片来自<Wind

(转) 从0开始搭建SQL Server AlwaysOn 第三篇(配置AlwaysOn)

原文地址: http://www.cnblogs.com/lyhabc/p/4682986.html 这一篇是从0开始搭建SQL Server AlwaysOn 的第三篇,这一篇才真正开始搭建AlwaysOn,前两篇是为搭建AlwaysOn 做准备的 步骤 这一篇依然使用step by step的方式介绍怎麽搭建AlwaysOn 请先使用本地用户Administrator登录这两个集群节点并执行下面的操作,先不要用域用户DCADMIN登录 1.两个集群节点都需先安装.NET Framework

(转)从0开始搭建SQL Server AlwaysOn 第二篇(配置故障转移集群)

原文地址:  http://www.cnblogs.com/lyhabc/p/4682028.html 这一篇是从0开始搭建SQL Server AlwaysOn 的第二篇,主要讲述如何搭建故障转移集群,因为AlwaysOn是基于Windows的故障转移集群的 在讲解步骤之前需要了解一下故障转移集群仲裁配置 下面图片来自<Windows Server2012系统配置指南> 四种集群的仲裁配置: 1.多数节点:这种配置不会用到仲裁磁盘,而所谓多数节点就是在正常节点数量占多数的情况下,集群才会提供