Netty 实现心跳机制.md

netty 心跳机制示例,使用netty4,IdleStateHandler 实现。

本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用。我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的。Netty中自带了一个IdleStateHandler 可以用来实现心跳检测。

心跳检测的逻辑

本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服务端发送消息。如果客户端在“干活”那么服务端必定会收到数据,如果客户端“闲下来了”那么服务端就接收不到这个服务端的消息,既然客户端闲下来了,不干事,那么何必浪费连接资源呢?所以服务端检测到一定时间内客户端不活跃的时候,将客户端连接关闭。本文要实现的逻辑步骤为:

  1. 启动服务端,启动客户端
  2. 客户端向服务端发送"I am alive",并sleep随机时间,用来模拟空闲。
  3. 服务端接收客户端消息,并返回"copy that",客户端空闲时 计数+1.
  4. 服务端客户端继续通信
  5. 服务端检测客户端空闲太多,关闭连接。客户端发现连接关闭了,就退出了。

有了这个思路,我们先来编写服务端。

心跳检测服务端代码

public class HeartBeatServer {

    int port ;
    public HeartBeatServer(int port){
        this.port = port;
    }

    public void start(){
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        try{
            bootstrap.group(boss,worker)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatInitializer());

            ChannelFuture future = bootstrap.bind(port).sync();
            future.channel().closeFuture().sync();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        HeartBeatServer server = new HeartBeatServer(8090);
        server.start();
    }
}

熟悉netty的同志,对于上面的模板一样的代码一定是在熟悉不过了。啥都不用看,只需要看childHandler(new HeartBeatInitializer()) 这一句。HeartBeatInitializer就是一个ChannelInitializer顾名思义,他就是在初始化channel的时做一些事情。我们所需要开发的业务逻辑Handler就是在这里添加的。其代码如下:

public class HeartBeatInitializer extends ChannelInitializer<Channel> {

    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast("decoder", new StringDecoder());
        pipeline.addLast("encoder", new StringEncoder());
        pipeline.addLast(new IdleStateHandler(2,2,2, TimeUnit.SECONDS));
        pipeline.addLast(new HeartBeatHandler());
    }
}

代码很简单,我们先添加了StringDecoder,和StringEncoder。这两个其实就是编解码用的,下面的IdleStateHandler才是本次心跳的核心组件。我们可以看到IdleStateHandler的构造函数中接收了4个参数,起定义如下:

public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit);

三个空闲时间参数,以及时间参数的格式。我们的例子中设置的是2,2,2,意思就是客户端2秒没有读/写,这个超时时间就会被触发。超时事件触发就需要我们来处理了,这就是上的HeartBeatInitializer中最后一行的HeartBeatHandler所做的事情。代码如下:

public class HeartBeatHandler extends SimpleChannelInboundHandler<String> {

    int readIdleTimes = 0;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
        System.out.println(" ====== > [server] message received : " + s);
       if("I am alive".equals(s)){
            ctx.channel().writeAndFlush("copy that");
        }else {
           System.out.println(" 其他信息处理 ... ");
       }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        IdleStateEvent event = (IdleStateEvent)evt;

        String eventType = null;
        switch (event.state()){
            case READER_IDLE:
                eventType = "读空闲";
                readIdleTimes ++; // 读空闲的计数加1
                break;
            case WRITER_IDLE:
                eventType = "写空闲";
                // 不处理
                break;
            case ALL_IDLE:
                eventType ="读写空闲";
                // 不处理
                break;
        }
        System.out.println(ctx.channel().remoteAddress() + "超时事件:" +eventType);
        if(readIdleTimes > 3){
            System.out.println(" [server]读空闲超过3次,关闭连接");
            ctx.channel().writeAndFlush("you are out");
            ctx.channel().close();
        }
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("=== " + ctx.channel().remoteAddress() + " is active ===");
    }

}

至此,我们的服务端写好了。

心跳检测客户端代码

netty的api设计使得编码的模式非常具有通用性,所以客户端代码和服务端的代码几乎一样:启动client端的代码几乎一样,也需要一个ChannelInitializer,也需要Handler。改动的地方很少,因此本文不对客户端代码进行详细解释。下面给出client端的完整代码:

public class HeartBeatClient  {

    int port;
    Channel channel;
    Random random ;

    public HeartBeatClient(int port){
        this.port = port;
        random = new Random();
    }
    public static void main(String[] args) throws Exception{
        HeartBeatClient client = new HeartBeatClient(8090);
        client.start();
    }

    public void start() {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try{
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new HeartBeatClientInitializer());

            connect(bootstrap,port);
            String  text = "I am alive";
            while (channel.isActive()){
                sendMsg(text);
            }
        }catch(Exception e){
            // do something
        }finally {
            eventLoopGroup.shutdownGracefully();
        }
    }

    public void connect(Bootstrap bootstrap,int port) throws Exception{
        channel = bootstrap.connect("localhost",8090).sync().channel();
    }

    public void sendMsg(String text) throws Exception{
        int num = random.nextInt(10);
        Thread.sleep(num * 1000);
        channel.writeAndFlush(text);
    }

    static class HeartBeatClientInitializer extends ChannelInitializer<Channel> {

        @Override
        protected void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast(new HeartBeatClientHandler());
        }
    }

    static class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            System.out.println(" client received :" +msg);
            if(msg!= null && msg.equals("you are out")) {
                System.out.println(" server closed connection , so client will close too");
                ctx.channel().closeFuture();
            }
        }
    }
}

运行代码

在上面的代码写好之后,我们先启动服务端,然后在启动客户端。运行日志如下:

server端:

=== /127.0.0.1:57700 is active ===
 ====== > [server] message received : I am alive
 ====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:写空闲
 ====== > [server] message received : I am alive
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
/127.0.0.1:57700超时事件:写空闲
/127.0.0.1:57700超时事件:读写空闲
/127.0.0.1:57700超时事件:读空闲
 [server]读空闲超过3次,关闭连接

client端:

 client sent msg and sleep 2
 client received :copy that
 client received :copy that
 client sent msg and sleep 6
 client sent msg and sleep 6
 client received :copy that
 client received :you are out
 server closed connection , so client will close too

Process finished with exit code 0

通过上面的运行日志,我们可以看到:

1.客户端在与服务器成功建立之后,发送了3次‘I am alive‘,服务端也回应了3次:‘copy that‘

2.由于客户端消极怠工,超时了多次,服务端关闭了链接。

3.客户端知道服务端抛弃自己之后,也关闭了连接,程序退出。

以上简单了演示了一下,netty的心跳机制,其实主要就是使用了IdleStateHandler。源码下载:https://gitee.com/dimixu/netty_learn.git

原文地址:https://www.cnblogs.com/demingblog/p/9957143.html

时间: 2024-11-09 11:47:02

Netty 实现心跳机制.md的相关文章

浅析 Netty 实现心跳机制与断线重连

基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制

netty之心跳机制

1.心跳机制,在netty3和netty5上面都有.但是写法有些不一样. 2.心跳机制在服务端和客户端的作用也是不一样的.对于服务端来说:就是定时清除那些因为某种原因在一定时间段内没有做指定操作的客户端连接.对于服务端来说:用来检测是否断开连接,然后尝试重连等问题.游戏上面也可以来监控延时问题. 3.我这边只写了服务端的心跳用法,客户端基本差不多. 1)netty3的写法 import org.jboss.netty.bootstrap.ServerBootstrap; import org.j

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

Netty学习篇④-心跳机制及断线重连

心跳检测 前言 客户端和服务端的连接属于socket连接,也属于长连接,往往会存在客户端在连接了服务端之后就没有任何操作了,但还是占用了一个连接:当越来越多类似的客户端出现就会浪费很多连接,netty中可以通过心跳检测来找出一定程度(自定义规则判断哪些连接是无效链接)的无效链接并断开连接,保存真正活跃的连接. 什么叫心跳检测 我理解的心跳检测应该是客户端/服务端定时发送一个数据包给服务端/客户端,检测对方是否有响应: 如果是存活的连接,在一定的时间内应该会收到响应回来的数据包: 如果在一定时间内

NETTY 心跳机制

最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址: https://github.com/fengjiachun/Jupiter 今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程: 1)客

使用netty开发心跳检测和重连机制的规划与设计

心跳检测 代码逻辑与设计思路 方案1 1.1心跳机制 通过ping-pong双向心跳机制 可以保证无论通信哪一方出现网络故障,都能被及时检测出来 为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检测都失败才认定链路已经损害,需要关闭链路并重建链路.当读或者写心跳消息发生I/O异常的时候,说明链路已经中断,此时需要立即关闭链路,如果是客户端,需要重新发起连接.如果是服务端,需要重新发起连接.如果是服务端,需要清空缓存的半包信息,等待客户端重连. 1.2重连机制 如果链路中断

Netty心跳机制

概念介绍网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现.但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题.可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制.其实TCP中已经为我们实现了一个叫做心跳的机制.如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议.所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着. 以确保链接的有效性.

Ambari窥探分布式心跳机制

Ambari是在Hadoop大数据生态圈的基础上应运而生,Ambari的架构也借助了分布式的思想,细细品味,与Hadoop分布式架构有很多相似之处. Hadoop中单NN 与多DN的通信是借助netty封装的RPC机制实现,单Ambari server与多Agent通信则是基于restful api + json实现,rpc与rest api的争论不是本文要讨论的重点,我们追求的目标只有一个,完美实现业务需求. 心跳设计一个主要的原因是判断客户端是否在线,每隔一段时间会发送数据交互.Ambari

Dubbo服务合买平台搭建出售发布之服务暴露&amp;心跳机制&amp;服务注册

Dubbo服务发布 Dubbo合买平台搭建出售 dsluntan.com Q:3393756370 VX:17061863513服务发布影响流程的主要包括三个部分,依次是: 服务暴露 心跳 服务注册 服务暴露是对外提供服务及暴露端口,以便消费端可以正常调通服务.心跳机制保证服务器端及客户端正常长连接的保持,服务注册是向注册中心注册服务暴露服务的过程. Dubbo服务暴露 此处只记录主要代码部分以便能快速定位到主要的核心代码: ServiceConfig.java中代码 if (registryU