NETTY 心跳机制

最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址:

https://github.com/fengjiachun/Jupiter

今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程:

1)客户端连接服务端

2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s

3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)

4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道

5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路

6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连

以上描述的就是整个心跳和重连的整个过程,虽然很简单,上一篇blog也写了一个Demo,简单地做了一下上述功能

要写工业级的Netty心跳重连的代码,需要解决一下几个问题:

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理

2)重连对象的管理,也就是bootstrap对象的管理

3)重连机制编写

完整的代码:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle

下面我们就看大神是如何解决这些问题的,首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers的

package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;

/**
*
* 客户端的ChannelHandler集合,由子类实现,这样做的好处:
* 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
* 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
* 地获取所有的handlers
*/
public interface ChannelHandlerHolder {

ChannelHandler[] handlers();
}
我们再来编写我们熟悉的服务端的ServerBootstrap的编写:
HeartBeatServer.java

package com.lyncc.netty.idle;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public class HeartBeatServer {

private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();

private int port;

public HeartBeatServer(int port) {
this.port = port;
}

public void start() {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
.localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
ch.pipeline().addLast(idleStateTrigger);
ch.pipeline().addLast("decoder", new StringDecoder());
ch.pipeline().addLast("encoder", new StringEncoder());
ch.pipeline().addLast(new HeartBeatServerHandler());
};

}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定端口,开始接收进来的连接
ChannelFuture future = sbs.bind(port).sync();

System.out.println("Server start listen at " + port);
future.channel().closeFuture().sync();
} catch (Exception e) {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new HeartBeatServer(port).start();
}

}
单独写一个AcceptorIdleStateTrigger,其实也是继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.READER_IDLE) {
throw new Exception("idle exception");
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
HeartBeatServerHandler就是一个很简单的自定义的Handler,不是重点:
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead..");
System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}
接下来就是重点,我们需要写一个类,这个类可以去观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatchdog,顾名思义,链路检测狗,我们先看完整代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

/**
*
* 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
*/
@Sharable
public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{

private final Bootstrap bootstrap;
private final Timer timer;
private final int port;

private final String host;

private volatile boolean reconnect = true;
private int attempts;

public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
this.bootstrap = bootstrap;
this.timer = timer;
this.port = port;
this.host = host;
this.reconnect = reconnect;
}

/**
* channel链路每次active的时候,将其连接的次数重新? 0
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

System.out.println("当前链路已经激活了,重连尝试次数重新置为0");

attempts = 0;
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("链接关闭");
if(reconnect){
System.out.println("链接关闭,将进行重连");
if (attempts < 12) {
attempts++;
//重连的间隔时间会越来越长
int timeout = 2 << attempts;
timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
}
}
ctx.fireChannelInactive();
}

public void run(Timeout timeout) throws Exception {

ChannelFuture future;
//bootstrap已经初始化好了,只需要将handler填入就可以了
synchronized (bootstrap) {
bootstrap.handler(new ChannelInitializer<Channel>() {

@Override
protected void initChannel(Channel ch) throws Exception {

ch.pipeline().addLast(handlers());
}
});
future = bootstrap.connect(host,port);
}
//future对象
future.addListener(new ChannelFutureListener() {

public void operationComplete(ChannelFuture f) throws Exception {
boolean succeed = f.isSuccess();

//如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
if (!succeed) {
System.out.println("重连失败");
f.channel().pipeline().fireChannelInactive();
}else{
System.out.println("重连成功");
}
}
});

}

}

稍微分析一下:
1)继承了ChannelInboundHandlerAdapter,说明它也是Handler,也对,作为一个检测对象,肯定会放在链路中,否则怎么检测

2)实现了2个接口,TimeTask,ChannelHandlerHolder

①TimeTask,我们就要写run方法,这应该是一个定时任务,这个定时任务做的事情应该是重连的工作

②ChannelHandlerHolder的接口,这个接口我们刚才说过是维护的所有的Handlers,因为在重连的时候需要获取Handlers

3)bootstrap对象,重连的时候依旧需要这个对象

4)当链路断开的时候会触发channelInactive这个方法,也就说触发重连的导火索是从这边开始的

好了,我们这边再写次核心的HeartBeatsClient的代码:

package com.lyncc.netty.idle;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.HashedWheelTimer;

import java.util.concurrent.TimeUnit;

public class HeartBeatsClient {

protected final HashedWheelTimer timer = new HashedWheelTimer();

private Bootstrap boot;

private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();

public void connect(int port, String host) throws Exception {

EventLoopGroup group = new NioEventLoopGroup();

boot = new Bootstrap();
boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));

final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {

public ChannelHandler[] handlers() {
return new ChannelHandler[] {
this,
new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
idleStateTrigger,
new StringDecoder(),
new StringEncoder(),
new HeartBeatClientHandler()
};
}
};

ChannelFuture future;
//进行连接
try {
synchronized (boot) {
boot.handler(new ChannelInitializer<Channel>() {

//初始化channel
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(watchdog.handlers());
}
});

future = boot.connect(host,port);
}

// 以下代码在synchronized同步块外面是安全的
future.sync();
} catch (Throwable t) {
throw new Exception("connects to fails", t);
}
}

/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
new HeartBeatsClient().connect(port, "127.0.0.1");
}

}
也稍微说明一下:
1)创建了ConnectionWatchdog对象,自然要实现handlers方法

2)初始化好bootstrap对象

3)4秒内没有写操作,进行心跳触发,也就是IdleStateHandler这个方法

最后ConnectorIdleStateTrigger这个类

package com.lyncc.netty.idle;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;

@Sharable
public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {

private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
CharsetUtil.UTF_8));

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleState state = ((IdleStateEvent) evt).state();
if (state == IdleState.WRITER_IDLE) {
// write heartbeat to server
ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}
HeartBeatClientHandler.java(不是重点)
package com.lyncc.netty.idle;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

import java.util.Date;

@Sharable
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("激活时间是:"+new Date());
System.out.println("HeartBeatClientHandler channelActive");
ctx.fireChannelActive();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("停止时间是:"+new Date());
System.out.println("HeartBeatClientHandler channelInactive");
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String message = (String) msg;
System.out.println(message);
if (message.equals("Heartbeat")) {
ctx.write("has read message from server");
ctx.flush();
}
ReferenceCountUtil.release(msg);
}
}

好了,到此为止,所有的代码都贴完了,我们做一个简单的测试,按照常理,如果不出任何状况的话,客户端4秒发送心跳,服务端5秒才验证是不会断连的,所以我们在启动之后,关闭服务端,然后再次重启服务端

首先启动服务端,控制台如下:

启动客户端,控制台如下:

客户端启动之后,服务端的控制台:

关闭服务端后,客户端控制台:

重启启动服务端:

---------------------
作者:BazingaLyncc
来源:CSDN
原文:https://blog.csdn.net/linuu/article/details/51509847
版权声明:本文为博主原创文章,转载请附上博文链接!

原文地址:https://www.cnblogs.com/cfas/p/10113312.html

时间: 2024-11-09 14:10:34

NETTY 心跳机制的相关文章

Netty心跳机制

概念介绍网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现.但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题.可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制.其实TCP中已经为我们实现了一个叫做心跳的机制.如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议.所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着. 以确保链接的有效性.

netty心跳机制和断线重连(四)

心跳是为了保证客户端和服务端的通信可用.因为各种原因客户端和服务端不能及时响应和接收信息.比如网络断开,停电 或者是客户端/服务端 高负载. 所以每隔一段时间 客户端发送心跳包到客户端  服务端做出心跳的响应: 1.如果客户端在指定时间没有向服务端发送心跳包.则表示客户端的通信出现了问题. 2.如果客户端发送心跳包到服务端没有收到响应 则表示服务端的通信出现了问题. netty提供IdleStateHandle 在监听距离上一次写的时间和距离上一次读的时间 如果超时则调用 源码: public

Netty 实现心跳机制.md

netty 心跳机制示例,使用netty4,IdleStateHandler 实现. 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服

Netty 超时机制及心跳程序实现

Netty 超时机制的介绍 Netty 的超时类型 IdleState 主要分为: ALL_IDLE : 一段时间内没有数据接收或者发送 READER_IDLE : 一段时间内没有数据接收 WRITER_IDLE : 一段时间内没有数据发送 在 Netty 的 timeout 包下,主要类有: IdleStateEvent : 超时的事件 IdleStateHandler : 超时状态处理 ReadTimeoutHandler : 读超时状态处理 WriteTimeoutHandler : 写超

netty之心跳机制

1.心跳机制,在netty3和netty5上面都有.但是写法有些不一样. 2.心跳机制在服务端和客户端的作用也是不一样的.对于服务端来说:就是定时清除那些因为某种原因在一定时间段内没有做指定操作的客户端连接.对于服务端来说:用来检测是否断开连接,然后尝试重连等问题.游戏上面也可以来监控延时问题. 3.我这边只写了服务端的心跳用法,客户端基本差不多. 1)netty3的写法 import org.jboss.netty.bootstrap.ServerBootstrap; import org.j

浅析 Netty 实现心跳机制与断线重连

基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

Netty学习篇④-心跳机制及断线重连

心跳检测 前言 客户端和服务端的连接属于socket连接,也属于长连接,往往会存在客户端在连接了服务端之后就没有任何操作了,但还是占用了一个连接:当越来越多类似的客户端出现就会浪费很多连接,netty中可以通过心跳检测来找出一定程度(自定义规则判断哪些连接是无效链接)的无效链接并断开连接,保存真正活跃的连接. 什么叫心跳检测 我理解的心跳检测应该是客户端/服务端定时发送一个数据包给服务端/客户端,检测对方是否有响应: 如果是存活的连接,在一定的时间内应该会收到响应回来的数据包: 如果在一定时间内

[Netty] Netty 超时机制及心跳程序实现

本文介绍了 Netty 超时机制的原理,以及如何在连接闲置时发送一个心跳来维持连接. Netty 超时机制的介绍 Netty 的超时类型 IdleState 主要分为: ALL_IDLE : 一段时间内没有数据接收或者发送 READER_IDLE :    一段时间内没有数据接收 WRITER_IDLE :     一段时间内没有数据发送 在 Netty 的 timeout 包下,主要类有: IdleStateEvent :     超时的事件 IdleStateHandler : 超时状态处理