基于Netty的IdleStateHandler实现Mqtt心跳

基于Netty的IdleStateHandler实现Mqtt心跳

IdleStateHandler解析

最近研究jetlinks编写的基于Nettymqtt-client(https://github.com/jetlinks/netty-mqtt-client),总结若干知识点.
Netty中,实现心跳机制较为简单,主要依赖于IdleStateHandler判断channel的读写超时.

    /**
     * Creates a new instance firing {@link IdleStateEvent}s.
     *
     * @param readerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#READER_IDLE}
     *        will be triggered when no read was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param writerIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#WRITER_IDLE}
     *        will be triggered when no write was performed for the specified
     *        period of time.  Specify {@code 0} to disable.
     * @param allIdleTimeSeconds
     *        an {@link IdleStateEvent} whose state is {@link IdleState#ALL_IDLE}
     *        will be triggered when neither read nor write was performed for
     *        the specified period of time.  Specify {@code 0} to disable.
     */
    public IdleStateHandler(
            int readerIdleTimeSeconds,
            int writerIdleTimeSeconds,
            int allIdleTimeSeconds) {

        this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
             TimeUnit.SECONDS);
    }

以上是IdleStateHandler的构造函数,主要依赖于三个参数readerIdleTimeSeconds,writerIdleTimeSeconds以及allIdleTimeSeconds.

如果难于理解英文注释,可参考<>https://segmentfault.com/a/1190000006931568一文中的解释:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
  • allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

IdleStateHandler中,分别通过如下函数实现对channel读写操作事件的跟踪:

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // Allow writing with void promise if handler is only configured for read timeout events.
        if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            ctx.write(msg, promise.unvoid()).addListener(writeListener);
        } else {
            ctx.write(msg, promise);
        }
    }

    // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
    private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };

其中:

  • channelRead: 判断channel是否有数据可读取;
  • channelReadComplete: 判断channel是否有数据可读取;
  • write: 判断channel是否有数据写(通过writeListener判断当前写操作是否执行成功).

IdleStateHandlerchannel激活或注册时,会执行initialize函数,根据读写超时时间创建对应的定时任务.

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Initialize early if channel is active already.
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }

        private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            // 创建读超时判断定时任务
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            // 创建写超时判断定时任务
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            // 创建读写超时判断定时任务
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

此处,我们将剖析AllIdleTimeoutTask任务.
此任务,会判断在超时时间段内,是否有读写操作:

  • 有读或者写操作,则重新创建定时任务,等待下次执行;
  • 没有读或者写操作,则创建IdleStateEvent对象,通过ChannelHandlerContext通知注册了用户事件触发器的handler(即handler重载了userEventTriggered函数).
  private final class AllIdleTimeoutTask extends AbstractIdleTask {

        AllIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstAllIdleEvent;
                firstAllIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

了解了IdleStateHandler,我们接下来学习如何编写Mqtt的心跳handler.

Mqtt心跳handler

以下是jetlinks编写的Mqtt心跳handler代码,我们截取部分代码学习.

final class MqttPingHandler extends ChannelInboundHandlerAdapter {

    private final int keepaliveSeconds;

    private ScheduledFuture<?> pingRespTimeout;

    MqttPingHandler(int keepaliveSeconds) {
        this.keepaliveSeconds = keepaliveSeconds;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!(msg instanceof MqttMessage)) {
            ctx.fireChannelRead(msg);
            return;
        }
        MqttMessage message = (MqttMessage) msg;
        if (message.fixedHeader().messageType() == MqttMessageType.PINGREQ) {
            this.handlePingReq(ctx.channel());
        } else if (message.fixedHeader().messageType() == MqttMessageType.PINGRESP) {
            this.handlePingResp();
        } else {
            ctx.fireChannelRead(ReferenceCountUtil.retain(msg));
        }
    }

    /**
     * IdleStateHandler,在连接处于idle状态超过设定时间后,会发送IdleStateEvent
     * 接收到IdleStateEvent,当前类会发送心跳包至server,保持连接
     *
     * @param ctx 上下文
     * @param evt 事件
     * @throws Exception 异常
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);

        // 确认监听事件为IdleStateEvent,即发送心跳包至server
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.WRITER_IDLE) {
                this.sendPingReq(ctx.channel());
            }
        }
    }

    /**
     * 发送心跳包至server端,并建立心跳超时断开连接任务
     * 此处,先行创建心跳超时任务,后续再发送心跳包(避免收到心跳响应时,心跳超时任务未建立完成)
     *
     * @param channel 连接
     */
    private void sendPingReq(Channel channel) {

        // 创建心跳超时,断开连接任务
        if (this.pingRespTimeout == null) {
            this.pingRespTimeout = channel.eventLoop().schedule(() -> {
                MqttFixedHeader disconnectHeader =
                        new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
                channel.writeAndFlush(new MqttMessage(disconnectHeader)).addListener(ChannelFutureListener.CLOSE);
                //TODO: what do when the connection is closed ?
            }, this.keepaliveSeconds, TimeUnit.SECONDS);
        }

        // 创建心跳包,并发送至Mqtts Server
        MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PINGREQ, false, MqttQoS.AT_MOST_ONCE, false, 0);
        channel.writeAndFlush(new MqttMessage(pingHeader));
    }

    /**
     * 处理ping resp,取消ping超时任务(断开连接)
     */
    private void handlePingResp() {
        if (this.pingRespTimeout != null && !this.pingRespTimeout.isCancelled() && !this.pingRespTimeout.isDone()) {
            this.pingRespTimeout.cancel(true);
            this.pingRespTimeout = null;
        }
    }
}

函数解析:

(1) 接收超时事件,发送心跳请求

MqttPingHandler中重载了userEventTriggered函数,用以接收ChannelHandlerContext传递的事件,代码中会判断事件是否为IdleStateEvent.
如果当前接收事件为IdleStateEvent,则说明当前channel在超时时间内未发生读写事件,则客户端发送Mqtt心跳请求.

(2) 发送心跳请求,建立请求响应超时关闭连接任务

sendPingReq函数中(以下两步操作,顺序可任意安排):

  • 建立心跳请求响应超时判断任务,如果在一定时长内未接收到心跳响应,则会关闭连接;
  • 构建Mqtt心跳包,发送至远端服务器.

(3) 取消心跳响应超时关闭连接任务

channelRead读取数据,判断是否是Mqtt的心跳响应包.
如果是,则执行handlePingResp函数,取消心跳响应超时关闭连接任务.

handler添加

    ch.pipeline().addLast("idleStateHandler",
        new IdleStateHandler(keepAliveTimeSeconds, keepAliveTimeSeconds, 0));
    ch.pipeline().addLast("mqttPingHandler",
        new MqttPingHandler(MqttClientImpl.this.clientConfig.getKeepAliveTimeSeconds()));

只需要以上两句代码,就可以完成Mqtt心跳维持功能.

原文地址:https://www.cnblogs.com/jason1990/p/11600070.html

时间: 2024-08-10 13:54:24

基于Netty的IdleStateHandler实现Mqtt心跳的相关文章

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

基于netty轻量的高性能分布式RPC服务框架forest&lt;下篇&gt;

基于netty轻量的高性能分布式RPC服务框架forest<上篇> 文章已经简单介绍了forest的快速入门,本文旨在介绍forest用户指南. 基本介绍 Forest是一套基于java开发的RPC框架,除了常规的点对点调用外,Motan还提供服务治理功能,包括服务节点的自动发现.摘除.高可用和负载均衡等. 架构概述 Forest中分为服务提供方(RPC Server),服务调用方(RPC Client)和服务注册中心(Registry)三个角色. Server提供服务,向Registry注册

基于Netty与RabbitMQ的消息服务

Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架. 前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下: 将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点: 1.设备TCP消息解析: NettyMQServer和采集设备Device之间采用TCP通

基于Netty的聊天系统(三)协议定制----消息篇

今天我们继续来讨论协议,今天基本就把一对一聊天的协议定制完毕了,上一篇我们讲述了登录的过程,那么登录完毕就是聊天了,首先我们还是以A和B为例子,A发送消息给B,那么这条消息的的协议如下 发送消息协议: {"id":"xxxx","#":"msg","text":"内容","to":"接收用户ID","type":0,"

基于Netty的私有协议栈的开发

基于Netty的私有协议栈的开发 书是人类进步的阶梯,每读一本书都使自己得以提升,以前看书都是看了就看了,当时感觉受益匪浅,时间一长就又还回到书本了!所以说,好记性不如烂笔头,以后每次看完一本书都写一些读后感,对于技术书则把对让自己醍醐灌顶的篇章记录下来,以便以后翻阅查看,也是记录自己学习的过程- _ -. OK!言归正传,最近由于公司需要做一个网关项目,需要用到基于TCP/IP私有协议接收数据,看完了<Netty权威指南>这本书,感觉作者写的很好,有些地方让我获益良多,虽然书上有些例子跑不通

基于Netty的Redis客户端-Nedis

最近温习了一遍Redis命令,忧伤的是很多东西已交还给老师,正好赶上antirez大神在愚人节发布了Redis 3.0,Redis终于有了支持集群的正式版本,于是心血来潮决定自己实现一个Redis客户端来抚慰我这颗忧伤的心灵. Jedis已经足够强大,它的网络连接是基于阻塞式IO,实现非常简单易懂,但是OIO和NIO相比性能上有劣势,于是决定通过NIO来实现和Redis服务器的网络连接,现在业界最优秀的NIO框架非Netty莫属了,正好以前也学过Netty框架,所以决定基于Netty来实现这个R

基于Netty构建高性能的部标808协议的GPS服务器

使用Java语言开发一个高质量和高性能的jt808 协议的GPS通信服务器,并不是一件简单容易的事情,开发出来一段程序和能够承受数十万台车载接入是两码事,除去开发部标808协议的固有复杂性和几个月长周期的协议Bug调试,作为大批量794车载终端接入的服务端,需要能够处理网络的闪断.客户端的重连.安全认证和消息的编解码.半包处理等.如果没有足够的网络编程经验积累和深入了解部标808协议文档,自研的GPS服务器往往需要半年甚至数年的时间才能最终稳定下来,这种成本即便对一个大公司而言也是个严重的挑战.

基于Netty的聊天系统(一)通讯原理篇

今天周六,正好顺便把聊天系统的通讯原理写一下,本来是用XMPP+Openfire做了一个聊天,但是在做群聊那块需要去写插件来主动向表里变去写数据,因为openfire外国人写的,最初设计的群聊是会议室那种形式,和我们现在这种QQ群聊还是有差别的,改造起来比较麻烦,需要去通都源码等等,openfire是基于mina来写的,mina和netty又出自同一作者之手,那么我们就基于netty来写一个吧,首先我们来谈一谈通讯的原理 1:通讯原理 首先比方说A和B两个人要进行聊天(这里我们先讨论一对一这种聊

基于Netty打造RPC服务器设计经验谈

自从在园子里,发表了两篇如何基于Netty构建RPC服务器的文章:谈谈如何使用Netty开发实现高性能的RPC服务器.Netty实现高性能RPC服务器优化篇之消息序列化 之后,收到了很多同行.园友们热情的反馈和若干个优化建议,于是利用闲暇时间,打算对原来NettyRPC中不合理的模块进行重构,并且增强了一些特性,主要的优化点如下: 在原来编码解码器:JDK原生的对象序列化方式.kryo.hessian,新增了:protostuff. 优化了NettyRPC服务端的线程池模型,支持LinkedBl