mqtt 协议之 PINGREQ, PINGRESP

  mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

PINGREQ - PING request

fixed header format.

bit 7 6 5 4 3 2 1 0
byte 1 Message Type (12) DUP flag QoS level RETAIN
  1 1 0 0 x x x x
byte 2 Remaining Length (0)
  0 0 0 0 0 0 0 0

no variable header

no payload

response:   The response to a PINGREQ message is a PINGRESP message.

PINGRESP - PING response

fixed header

bit 7 6 5 4 3 2 1 0
byte 1 Message Type (13) DUP flag QoS level RETAIN
  1 1 0 1 x x x x
byte 2 Remaining Length (0)
  0 0 0 0 0 0 0 0

no variable header

no payload

------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

ok ,上代码 :

固定头部 FinedHeader

    /// <summary>
    /// Fixed header
    /// </summary>
    internal class FixedHeader
    {
        /// <summary>
        /// Message type
        /// </summary>
        public MessageType MessageType { get; set; }

        /// <summary>
        /// DUP flag
        /// </summary>
        public bool Dup { get; set; }

        /// <summary>
        /// QoS flags
        /// </summary>
        public Qos Qos { get; set; }

        /// <summary>
        /// RETAIN 保持
        /// </summary>
        public bool Retain { get; set; }

        /// <summary>
        /// Remaining Length 剩余长度
        /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
        /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
        /// MQTT协议最多允许4个字节表示剩余长度。
        /// 最大长度为:0xFF,0xFF,0xFF,0x7F,
        /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455
        /// </summary>
        public int RemaingLength { get; set; }

        public FixedHeader() { }

        public FixedHeader(Stream stream)
        {
            if (stream.Length < 2)
                throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");

            var byte1 = stream.ReadByte();
            MessageType = (MessageType)((byte1 & 0xf0) >> 4);
            Dup = ((byte1 & 0x08) >> 3) > 0;
            Qos = (Qos)((byte1 & 0x06) >> 1);
            Retain = (byte1 & 0x01) > 0;

            //Remaining Length
            //var byte2 = stream.ReadByte();
            var lengthBytes = ReadLengthBytes(stream);
            RemaingLength = CalculateLength(lengthBytes);
        }

        public void WriteTo(Stream stream)
        {
            var flags = (byte)MessageType << 4;
            flags |= (Dup ? 1 : 0) << 3;
            flags |= (byte)Qos << 1;
            flags |= Retain ? 1 : 0;

            stream.WriteByte((byte)flags);     //byte 1
            if (RemaingLength == 0)         //byte 2
                stream.WriteByte(0);
            else
            {
                do
                {
                    int digit = RemaingLength & 0x7f;
                    RemaingLength = RemaingLength >> 7;
                    if (RemaingLength > 0)
                        digit = digit | 0x80;
                    stream.WriteByte((byte)digit);
                } while (RemaingLength > 0);
            }
        }

        internal static byte[] ReadLengthBytes(Stream stream)
        {
            var lengthBytes = new List<byte>();

            // read until we‘ve got the entire size, or the 4 byte limit is reached
            byte sizeByte;
            int byteCount = 0;
            do
            {
                sizeByte = (byte)stream.ReadByte();
                lengthBytes.Add(sizeByte);
            } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);

            return lengthBytes.ToArray();
        }

        internal static int CalculateLength(byte[] lengthBytes)
        {
            var remainingLength = 0;
            var multiplier = 1;

            foreach (var currentByte in lengthBytes)
            {
                remainingLength += (currentByte & 0x7f) * multiplier;
                multiplier *= 0x80;
            }

            return remainingLength;
        }
    }

消息父类: Message

    internal class Message
    {
        public FixedHeader FixedHeader { get; protected set; }

        public Message()
        {
        }

        public Message(MessageType messageType)
        {
            FixedHeader = new FixedHeader
            {
                MessageType = messageType
            };
        }

        public virtual void WriteTo(Stream stream)
        {
        }

        public static Message CreateFrom(byte[] buffer)
        {
            using (var stream = new MemoryStream(buffer))
            {
                return CreateFrom(stream);
            }
        }

        public static Message CreateFrom(Stream stream)
        {
            var header = new FixedHeader(stream);
            return CreateMessage(header, stream);
        }

        public static Message CreateMessage(FixedHeader header, Stream stream)
        {
            switch (header.MessageType)
            {
                case MessageType.CONNACK:
                    return new ConnAckMessage(header, stream);
                case MessageType.DISCONNECT:
                    return null;
                case MessageType.PINGREQ:
                    return new PingReqMessage();
                case MessageType.PUBACK:
                    return new PublishAckMessage(header, stream);
                case MessageType.PUBCOMP:
                    //return new MqttPubcompMessage(str, header);
                case MessageType.PUBLISH:
                    //return new MqttPublishMessage(str, header);
                case MessageType.PUBREC:
                    //return new MqttPubrecMessage(str, header);
                case MessageType.PUBREL:
                    //return new MqttPubrelMessage(str, header);
                case MessageType.SUBACK:
                    //return new MqttSubackMessage(str, header);
                case MessageType.UNSUBACK:
                    //return new MqttUnsubackMessage(str, header);
                case MessageType.PINGRESP:
                    return new PingRespMessage(header, stream);
                case MessageType.UNSUBSCRIBE:
                case MessageType.CONNECT:
                case MessageType.SUBSCRIBE:
                default:
                    throw new Exception("Unsupported Message Type");
            }
        }
    }

两个枚举:

MessageType  (消息类型)

Qos (服务质量等级)

    [Flags]
    public enum MessageType : byte
    {
        CONNECT     = 1,
        CONNACK     = 2,
        PUBLISH     = 3,
        PUBACK      = 4,
        PUBREC      = 5,
        PUBREL      = 6,
        PUBCOMP     = 7,
        SUBSCRIBE   = 8,
        SUBACK      = 9,
        UNSUBSCRIBE = 10,
        UNSUBACK    = 11,
        PINGREQ     = 12,
        PINGRESP    = 13,
        DISCONNECT  = 14
    }

    /// <summary>
    /// 服务质量等级
    /// </summary>
    [Flags]
    public enum Qos : byte
    {
        /// <summary>
        ///     QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.
        /// </summary>
        AtMostOnce = 0,

        /// <summary>
        ///     QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered
        ///     more than once if network errors occur.
        /// </summary>
        AtLeastOnce = 1,

        /// <summary>
        ///     QOS Level 2 - Message will be delivered once, and only once. Message will be retried until
        ///     it is successfully sent..
        /// </summary>
        ExactlyOnce = 2,
    }

ping 请求包:  PingReqMessage

响应包:         PingRespMessage

    internal sealed class PingReqMessage : Message
    {
        public PingReqMessage()
            : base(MessageType.PINGREQ)
        {
        }

        public override void WriteTo(Stream stream)
        {
            FixedHeader.WriteTo(stream);
        }
    }

    internal class PingRespMessage : Message
    {
        public PingRespMessage()
            : base(MessageType.PINGRESP)
        {
        }

        public PingRespMessage(FixedHeader header, Stream stream)
        {
            FixedHeader = header;
        }
    }

OK.

时间: 2024-10-02 09:42:30

mqtt 协议之 PINGREQ, PINGRESP的相关文章

MQTT 协议学习:Keep Alive 和连接保活

(2020-02-05 10:30) 我们提到过 Broker 需要知道 Client 是否非正常地断开了和它的连接,以发送遗愿消息.实际上 Client 也需要能够很快地检测到它失去了和 Broker 的连接,以便重新连接. MQTT 协议是基于 TCP 的一个应用层协议,理论上 TCP 协议在丢失连接时会通知上层应用,但是 TCP 有一个半打开连接的问题(half-open connection).这里我不打算深入分析 TCP 协议,需要记住的是,在这种状态下,一端的 TCP 连接已经失效,

物联网MQTT协议分析和开源Mosquitto部署验证

在<物联网核心协议—消息推送技术演进>一文中已向读者介绍了多种消息推送技术的情况,包括HTTP单向通信.Ajax轮询.Websocket.MQTT.CoAP等,其中MQTT协议为IBM制定并力推,其具有开放.简单.轻量级以及易于实现的特点使得其即便在资源受限的环境中也能得到很好的使用,比如运行在资源紧缺型的嵌入式系统中或网络带宽非常昂贵的环境中,除此之外,它也被广泛用于遥感勘测.智能家居.能源监测和医疗应用程序等各个领域,是物联网的重要组成部分,将来可能会成为物联网的事实标准. 本篇文章将帮助

MQTT协议简析

1 概述 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是一个C/S架构的发布/订阅模式消息传输协议,最早在1999年由IBM的Andy Stanford-Clark博士和Arcom公司的ArlenNipper博士提出,本文的MQTT协议主要基于MQTT3.1.1. 2 报文结构 MQTT报文结构主要分为固定报头.可变报头.有效载荷3个部分,具体结构如下表所示: 说明 字节数 7 6 5 4 3 2 1 0 固定报头 1 Message Ty

MQTT协议笔记之连接和心跳

前言 本篇会把连接(CONNECT).心跳(PINGREQ/PINGRESP).确认(CONNACK).断开连接(DISCONNECT)和在一起. CONNECT 像前面所说,MQTT有关字符串部分采用的修改版的UTF-8编码,CONNECT可变头部中协议名称.消息体都是采用修改版的UTF-8编码.前面基本上可变头部内容不多,下面是一个较为完整的CONNECT消息结构:   Description 7 6 5 4 3 2 1 0 Fixed header/固定头部     Message Typ

MQTT协议实现Eclipse Paho学习总结

转载自:http://xiaoxinzhou.blog.163.com/blog/static/20704538620145411306821/ 一.概述 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放.简单.轻量.易于实现.这些特点使它适用于受限环境.例如,但不仅限于此: 网络代价昂贵,带宽低.不可靠. 在嵌入设备中运行,处理器和内存资源有限. 该协议的特点有: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合. 对负载内容屏蔽的消息传输. 使

Netty实现高性能IOT服务器(Groza)之手撕MQTT协议篇上

前言 诞生及优势 MQTT由Andy Stanford-Clark(IBM)和Arlen Nipper(Eurotech,现为Cirrus Link)于1999年开发,用于监测穿越沙漠的石油管道.目标是拥有一个带宽有效且使用很少电池电量的协议,因为这些设备是通过卫星链路连接的,当时这种设备非常昂贵. 与HTTP及其请求/响应范例相比,该协议使用发布/订阅体系结构.发布/订阅是事件驱动的,可以将消息推送到客户端.中央通信点是MQTT代理,它负责调度发送者和合法接收者之间的所有消息.向代理发布消息的

MQTT协议的简单了解

MQTT协议-MQTT协议解析(MQTT数据包结构) 协议就是通信双方的一个约定, 即,表示第1位传输的什么.第2位传输的什么--. 在MQTT协议中,一个MQTT数据包由: 固定头(Fixed header). 可变头(Variable header). 消息体(payload)三部分构成 MQTT 数据包结构 * 固定头(Fixed header),存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识 * 可变头(Variable header),存在于部分MQTT数据包中,数据包

自己动手实现MQTT协议

写在前面 前段时间弄IoT相关的东西,系统学习了一下 MQTT 协议,在此分享出来. 本文先是对 MQTT 协议做了简单的介绍:接着是对 MQTT协议的内容做了较为全面的解读:最后使用 Python 语言去实现一个简单的 MQTT 客户端和服务器. 简介 MQTT 全称是 Message Queue Telemetry Transport,翻译成中文意思是"遥测传输协议".它最先是由IBM提出,是一种基于 TCP 协议,具有简单.轻量等优点,特别适合于受限环境(带宽低.网络延迟高.网络

MQTT协议(一)

一.MQTT简介 MQTT协议(Message Queuing Telemetry Transport)(消息队列遥测传输)是一种基于发布/订阅模式的“轻量级”消息协议,是IBM公司于1999年提出的,由Andy Stanford-Clark(IBM)和Arlen Nipper(Eurotech,现为Cirrus Link)于1999年开发.MQTT是一个基于TCP的发布订阅协议,设计的初始目的是为了适用于极有限的内存设备和低带宽的不可靠网络通信,非常适合物联网通信.MQTT 协议目前在 IoT