netty之心跳机制

  1、心跳机制,在netty3和netty5上面都有。但是写法有些不一样。

  2、心跳机制在服务端和客户端的作用也是不一样的。对于服务端来说:就是定时清除那些因为某种原因在一定时间段内没有做指定操作的客户端连接。对于服务端来说:用来检测是否断开连接,然后尝试重连等问题。游戏上面也可以来监控延时问题。

  3、我这边只写了服务端的心跳用法,客户端基本差不多。

  1)netty3的写法

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringDecoder;
import org.jboss.netty.handler.codec.string.StringEncoder;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.HashedWheelTimer;

import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Server {

    public static void main(String[] args) {

        //声明服务类
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        //设定线程池
        ExecutorService boss = Executors.newCachedThreadPool();
        ExecutorService work = Executors.newCachedThreadPool();

        //设置工厂
        serverBootstrap.setFactory(new NioServerSocketChannelFactory(boss,work));

        //设置管道流
        serverBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            @Override
            public ChannelPipeline getPipeline() throws Exception {
                ChannelPipeline channelPipeline = Channels.pipeline();
                //添加处理方式
                channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),5,5,10));
                channelPipeline.addLast("decode",new StringDecoder());
                channelPipeline.addLast("encode",new StringEncoder());
                channelPipeline.addLast("server",new ServerHandler());
                return channelPipeline;
            }
        });

        //设置端口
        serverBootstrap.bind(new InetSocketAddress(9000));
    }
}

  备注:这里和之前有变化的就是管道里面多加了一个心跳,实际的处理还是在处理类里面

 channelPipeline.addLast("idle",new IdleStateHandler(new HashedWheelTimer(),5,5,10));
import org.jboss.netty.channel.*;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateEvent;

public class ServerHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
        System.out.println("client:"+e.getMessage());
        ctx.getChannel().write(e.getMessage());
        super.messageReceived(ctx, e);
    }

    @Override
    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (e instanceof IdleStateEvent) {
            if (((IdleStateEvent)e).getState() == IdleState.ALL_IDLE) {
                ChannelFuture channelFuture = ctx.getChannel().write("Time out,You will close");
                channelFuture.addListener(channelFuture1 -> ctx.getChannel().close());
            }
        } else {
            super.handleUpstream(ctx, e);
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
        super.exceptionCaught(ctx, e);
    }

    @Override
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelConnected(ctx, e);
    }

    @Override
    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelDisconnected(ctx, e);
    }

    @Override
    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
        super.channelClosed(ctx, e);
    }
}

  说明:这里是用SimpleChannelHandler里面给出的事件处理来实现的。方法为handleUpstream

  2)netty5的写法和netty3差不多

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

public class Server {

    public static void main(String[] args) {
        //服务类
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        //声明两个线程池
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();

        try {
            //设置线程组
            serverBootstrap.group(boss,work);
            //设置服务socket工厂
            serverBootstrap.channel(NioServerSocketChannel.class);
            //设置管道
            serverBootstrap.childHandler(new ChannelInitializer<Channel>() {
                protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new IdleStateHandler(5,5,10));
                    channel.pipeline().addLast(new StringDecoder());
                    channel.pipeline().addLast(new StringEncoder());
                    channel.pipeline().addLast(new ServerHandler());
                }
            });
            //设置服务器连接数
            serverBootstrap.option(ChannelOption.SO_BACKLOG,2048);
            //设置tcp延迟状态
            serverBootstrap.option(ChannelOption.TCP_NODELAY,true);
            //设置激活状态,2小时清除
            serverBootstrap.option(ChannelOption.SO_KEEPALIVE,true);
            //监听端口
            ChannelFuture channelFuture = serverBootstrap.bind(9000);
            //等待服务器关闭
            channelFuture.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //关闭线程池
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

}
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class ServerHandler extends SimpleChannelInboundHandler<String> {

    //接收消息并处理
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
        System.out.println(s);
        channelHandlerContext.writeAndFlush("hello client");
    }

    @Override
    public void userEventTriggered(final ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            if (((IdleStateEvent)evt).state() == IdleState.ALL_IDLE) {
                ChannelFuture channelFuture = ctx.writeAndFlush("Time out,You will close");
                channelFuture.addListener(new ChannelFutureListener() {
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        ctx.channel().close();
                    }
                });
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}
时间: 2024-10-28 22:14:14

netty之心跳机制的相关文章

Netty 实现心跳机制.md

netty 心跳机制示例,使用netty4,IdleStateHandler 实现. 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服

浅析 Netty 实现心跳机制与断线重连

基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

Netty学习篇④-心跳机制及断线重连

心跳检测 前言 客户端和服务端的连接属于socket连接,也属于长连接,往往会存在客户端在连接了服务端之后就没有任何操作了,但还是占用了一个连接:当越来越多类似的客户端出现就会浪费很多连接,netty中可以通过心跳检测来找出一定程度(自定义规则判断哪些连接是无效链接)的无效链接并断开连接,保存真正活跃的连接. 什么叫心跳检测 我理解的心跳检测应该是客户端/服务端定时发送一个数据包给服务端/客户端,检测对方是否有响应: 如果是存活的连接,在一定的时间内应该会收到响应回来的数据包: 如果在一定时间内

NETTY 心跳机制

最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址: https://github.com/fengjiachun/Jupiter 今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程: 1)客

使用netty开发心跳检测和重连机制的规划与设计

心跳检测 代码逻辑与设计思路 方案1 1.1心跳机制 通过ping-pong双向心跳机制 可以保证无论通信哪一方出现网络故障,都能被及时检测出来 为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检测都失败才认定链路已经损害,需要关闭链路并重建链路.当读或者写心跳消息发生I/O异常的时候,说明链路已经中断,此时需要立即关闭链路,如果是客户端,需要重新发起连接.如果是服务端,需要重新发起连接.如果是服务端,需要清空缓存的半包信息,等待客户端重连. 1.2重连机制 如果链路中断

Netty心跳机制

概念介绍网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现.但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题.可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制.其实TCP中已经为我们实现了一个叫做心跳的机制.如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议.所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着. 以确保链接的有效性.

Ambari窥探分布式心跳机制

Ambari是在Hadoop大数据生态圈的基础上应运而生,Ambari的架构也借助了分布式的思想,细细品味,与Hadoop分布式架构有很多相似之处. Hadoop中单NN 与多DN的通信是借助netty封装的RPC机制实现,单Ambari server与多Agent通信则是基于restful api + json实现,rpc与rest api的争论不是本文要讨论的重点,我们追求的目标只有一个,完美实现业务需求. 心跳设计一个主要的原因是判断客户端是否在线,每隔一段时间会发送数据交互.Ambari

Dubbo服务合买平台搭建出售发布之服务暴露&amp;心跳机制&amp;服务注册

Dubbo服务发布 Dubbo合买平台搭建出售 dsluntan.com Q:3393756370 VX:17061863513服务发布影响流程的主要包括三个部分,依次是: 服务暴露 心跳 服务注册 服务暴露是对外提供服务及暴露端口,以便消费端可以正常调通服务.心跳机制保证服务器端及客户端正常长连接的保持,服务注册是向注册中心注册服务暴露服务的过程. Dubbo服务暴露 此处只记录主要代码部分以便能快速定位到主要的核心代码: ServiceConfig.java中代码 if (registryU