一起学Netty(十四)之 Netty生产级的心跳和重连机制

sigh,写这篇博客的时候老脸还是红了一下,心里还是有些唏嘘的,应该算是剽窃吧,每个人的代码功力的确是有差距的,好在文章的标题是“一起学”,而不是开涛大神的“跟我学”系列的文章,我们还是多花点时间学习吧,感叹无用~

最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址:

https://github.com/fengjiachun/Jupiter

今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程:

1)客户端连接服务端

2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s

3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)

4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道

5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路

6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连

以上描述的就是整个心跳和重连的整个过程,虽然很简单,上一篇blog也写了一个Demo,简单地做了一下上述功能

要写工业级的Netty心跳重连的代码,需要解决一下几个问题:

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理

2)重连对象的管理,也就是bootstrap对象的管理

3)重连机制编写

下面我们就看大神是如何解决这些问题的,首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers的

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;

/**
 *
 * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
 * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
 * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
 * 地获取所有的handlers
 */
public interface ChannelHandlerHolder {

    ChannelHandler[] handlers();
}

我们再来编写我们熟悉的服务端的ServerBootstrap的编写:

HeartBeatServer.java

package com.lyncc.netty.idle;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class HeartBeatServer {

    private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();

    private int port;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    public void start() {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
                    .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                            ch.pipeline().addLast(idleStateTrigger);
                            ch.pipeline().addLast("decoder", new StringDecoder());
                            ch.pipeline().addLast("encoder", new StringEncoder());
                            ch.pipeline().addLast(new HeartBeatServerHandler());
                        };

                    }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            // 绑定端口,开始接收进来的连接
            ChannelFuture future = sbs.bind(port).sync();

            System.out.println("Server start listen at " + port);
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new HeartBeatServer(port).start();
    }

}

单独写一个AcceptorIdleStateTrigger,其实也是继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                throw new Exception("idle exception");
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

HeartBeatServerHandler就是一个很简单的自定义的Handler,不是重点:

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("server channelRead..");
        System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

接下来就是重点,我们需要写一个类,这个类可以去观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatchdog,顾名思义,链路检测狗,我们先看完整代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**
 *
 * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
 */
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{

    private final Bootstrap bootstrap;
    private final Timer timer;
    private final int port;

    private final String host;

    private volatile boolean reconnect = true;
    private int attempts;

    public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.port = port;
        this.host = host;
        this.reconnect = reconnect;
    }

    /**
     * channel链路每次active的时候,将其连接的次数重新? 0
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        System.out.println("当前链路已经激活了,重连尝试次数重新置为0");

        attempts = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("链接关闭");
        if(reconnect){
            System.out.println("链接关闭,将进行重连");
            if (attempts < 12) {
                attempts++;
            }
        <pre name="code" class="java">             //重连的间隔时间会越来越长
             int timeout = 2 << attempts;
             timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);

} ctx.fireChannelInactive(); } public void run(Timeout timeout) throws Exception { ChannelFuture future; //bootstrap已经初始化好了,只需要将handler填入就可以了 synchronized (bootstrap) { bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected
void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(handlers()); } }); future = bootstrap.connect(host,port); } //future对象 future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess(); //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连 if (!succeed) { System.out.println("重连失败"); f.channel().pipeline().fireChannelInactive(); }else{ System.out.println("重连成功"); } } }); }}


稍微分析一下:

1)继承了ChannelInboundHandlerAdapter,说明它也是Handler,也对,作为一个检测对象,肯定会放在链路中,否则怎么检测

2)实现了2个接口,TimeTask,ChannelHandlerHolder

①TimeTask,我们就要写run方法,这应该是一个定时任务,这个定时任务做的事情应该是重连的工作

②ChannelHandlerHolder的接口,这个接口我们刚才说过是维护的所有的Handlers,因为在重连的时候需要获取Handlers

3)bootstrap对象,重连的时候依旧需要这个对象

4)当链路断开的时候会触发channelInactive这个方法,也就说触发重连的导火索是从这边开始的

好了,我们这边再写次核心的HeartBeatsClient的代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

public class HeartBeatsClient {

    protected final HashedWheelTimer timer = new HashedWheelTimer();

    private Bootstrap boot;

    private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

    public void connect(int port, String host) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();  

        boot = new Bootstrap();
        boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));

        final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {

                public ChannelHandler[] handlers() {
                    return new ChannelHandler[] {
                            this,
                            new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
                            idleStateTrigger,
                            new StringDecoder(),
                            new StringEncoder(),
                            new HeartBeatClientHandler()
                    };
                }
            };

            ChannelFuture future;
            //进行连接
            try {
                synchronized (boot) {
                    boot.handler(new ChannelInitializer<Channel>() {

                        //初始化channel
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline().addLast(watchdog.handlers());
                        }
                    });

                    future = boot.connect(host,port);
                }

                // 以下代码在synchronized同步块外面是安全的
                future.sync();
            } catch (Throwable t) {
                throw new Exception("connects to  fails", t);
            }
    }

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                // 采用默认值
            }
        }
        new HeartBeatsClient().connect(port, "127.0.0.1");
    }

}

也稍微说明一下:

1)创建了ConnectionWatchdog对象,自然要实现handlers方法

2)初始化好bootstrap对象

3)4秒内没有写操作,进行心跳触发,也就是IdleStateHandler这个方法

最后ConnectorIdleStateTrigger这个类

package com.lyncc.netty.idle;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

@Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                // write heartbeat to server
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

HeartBeatClientHandler.java(不是重点)

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

@Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("激活时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelActive");
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("停止时间是:"+new Date());
        System.out.println("HeartBeatClientHandler channelInactive");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String message = (String) msg;
        System.out.println(message);
        if (message.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }
}

好了,到此为止,所有的代码都贴完了,我们做一个简单的测试,按照常理,如果不出任何状况的话,客户端4秒发送心跳,服务端5秒才验证是不会断连的,所以我们在启动之后,关闭服务端,然后再次重启服务端

首先启动服务端,控制台如下:

启动客户端,控制台如下:

客户端启动之后,服务端的控制台:

关闭服务端后,客户端控制台:

重启启动服务端:

重连成功~

时间: 2024-08-25 15:10:35

一起学Netty(十四)之 Netty生产级的心跳和重连机制的相关文章

HDU 6467 简单数学题 【递推公式 &amp;&amp; O(1)优化乘法】(广东工业大学第十四届程序设计竞赛)

传送门:http://acm.hdu.edu.cn/showproblem.php?pid=6467 简单数学题 Time Limit: 4000/2000 MS (Java/Others)    Memory Limit: 65536/65536 K (Java/Others)Total Submission(s): 308    Accepted Submission(s): 150 Problem Description 已知 F(n)=∑i=1n(i×∑j=inCij) 求 F(n) m

边记边学PHP-(十四)MySql数据库基础操作1

提到数据库,相信都不陌生.比较常见的是SQL Server . Oracle.Access.SQLite等等.当然还有PHP的黄金搭档,MySql数据库.当然我所写的都是MySql的一些基础,我还没有深入的学习这个数据库.深入学习还需要一段时间.但是现在学的基础基本满足我现在的学习需要. 一.MySQL简介 1.MySQL是什么 MySQL是一款安全.跨平台.高效的,并与PHP.Java等主流编程语言紧密结合的数据库系统.该数据库系统是由瑞典的MySQL AB公司开发.发布并支持,由MySQL初

(未完成)[HDUOJ]“字节跳动-文远知行杯”广东工业大学第十四届程序设计竞赛

solved 5 A(签到) 题意:两个人随机得到0或1其中之一数字,每个人都可以猜对方的数字是什么,有一个人猜对就算成功,问最优策略下00,01,10,11四种情况两人的成功概率分别是多少. 题意不明的签到题,题面说两人不能沟通,以为就是两个人随便猜,成功率都是1-0.5*0.5=0.75.结果是两个人可以提前商量好策略,那显然可以其中一个人猜和自己相同的数,另一个人猜和自己不同的数,那么所有的成功率都是100%. #include<bits/stdc++.h> using namespac

三分钟教你学Git(十四) 之 线下传输仓库

有时候还有一个人不能从远程直接clone仓库或者说由于非常大,clone非常慢或其他原因.我们能够使用bundle命令将Git仓库打包,然后通过U盘或者是其他介质拷贝给他,这样他拿到打包好的仓库后能够unbundle成仓库,达到了共享的目的,这样有时候是非常方便的. 我们看看详细怎么做: 首先我们进入仓库.git status一下看看当前的仓库状态. 然后開始打包: git bundle create zhc.bundle HEAD master Counting objects: 6, don

广东工业大学第十四届程序设计竞赛 鸽子数

Problem Description 通常来说,题面短的题目一般都比较难,所以我要把题面写得很长很长.通常来说,题面短的题目一般都比较难,所以我要把题面写得很长很长.通常来说,题面短的题目一般都比较难,所以我要把题面写得很长很长.鸽子数字由以下过程定义:从任何正整数开始,将数字替换为其各个数位的平方和,并重复该过程,直到该数字等于1.如果不能,则这个数字不是鸽子数.例如7是鸽子数,因为7->49->97->130->10->1.(7*7=49,4*4+9*9=97,9*9+

从头学pytorch(十四):lenet

卷积神经网络 在之前的文章里,对28 X 28的图像,我们是通过把它展开为长度为784的一维向量,然后送进全连接层,训练出一个分类模型.这样做主要有两个问题 图像在同一列邻近的像素在这个向量中可能相距较远.它们构成的模式可能难以被模型识别. 对于大尺寸的输入图像,使用全连接层容易造成模型过大.假设输入是高和宽均为1000像素的彩色照片(含3个通道).即使全连接层输出个数仍是256,该层权重参数的形状是\(3,000,000\times 256\),按照参数为float,占用4字节计算,它占用了大

python pytest测试框架介绍四----pytest-html插件html带错误截图及失败重测机制

一.html报告错误截图 这次介绍pytest第三方插件pytest-html 这里不介绍怎么使用,因为怎么使用网上已经很多了,这里给个地址给大家参考,pytest-html生成html报告 今天在这里介绍pytest生成的报告怎么带有截图,这在web自动化测试非常有用. 需求是测试用例错误就截图,方法如下: 我们要新建一个关于截图的插件文件conftest.py,注意,文件名不能变,因为pytest-html会自动找这个自己写的插件,内容如下: from selenium import web

Mina、Netty、Twisted一起学(十):线程模型

要想开发一个高性能的TCPserver,熟悉所使用框架的线程模型非常重要.MINA.Netty.Twisted本身都是高性能的网络框架,假设再搭配上高效率的代码.才干实现一个高大上的server. 可是假设不了解它们的线程模型.就非常难写出高性能的代码.框架本身效率再高.程序写的太差,那么server总体的性能也不会太高.就像一个电脑,CPU再好.内存小硬盘慢散热差,总体的性能也不会太高. 玩过Android开发的同学会知道,在Android应用中有一个非常重要线程:UI线程(即主线程). UI

从零开始学android&lt;android事件的处理方式.二十四.&gt;

在android中一共有 多种事件,每种事件都有自己相对应的处理机制 如以下几种 1 单击事件 View.OnClickListener public abstract void onClick (View v) 单击组件时触发 2 单击事件 View.OnLongClickListener public abstract boolean onLongClick (View v) 长按组件时触发 3 键盘事件 View.OnKeyListener public abstract boolean