netty心跳机制和断线重连(四)

心跳是为了保证客户端和服务端的通信可用。因为各种原因客户端和服务端不能及时响应和接收信息。比如网络断开,停电 或者是客户端/服务端 高负载。

所以每隔一段时间 客户端发送心跳包到客户端  服务端做出心跳的响应;

1.如果客户端在指定时间没有向服务端发送心跳包。则表示客户端的通信出现了问题。

2.如果客户端发送心跳包到服务端没有收到响应 则表示服务端的通信出现了问题。

netty提供IdleStateHandle 在监听距离上一次写的时间和距离上一次读的时间 如果超时则调用

源码:

public class IdleStateHandler extends ChannelDuplexHandler
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);
    super.channelActive(ctx);
}
}
 private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),//监听read的task
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),//监听写的task
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),//监听读写的task
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class WriterIdleTimeoutTask extends AbstractIdleTask {

        WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long lastWriteTime = IdleStateHandler.this.lastWriteTime;
            long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
            if (nextDelay <= 0) {
                // Writer is idle - set a new timeout and notify the callback.
                writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstWriterIdleEvent;
                firstWriterIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Write occurred before the timeout - set a new timeout with shorter delay.
                writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

    private final class AllIdleTimeoutTask extends AbstractIdleTask {

        AllIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {

            long nextDelay = allIdleTimeNanos;
            if (!reading) {
                nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
            }
            if (nextDelay <= 0) {
                // Both reader and writer are idle - set a new timeout and
                // notify the callback.
                allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstAllIdleEvent;
                firstAllIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Either read or write occurred before the timeout - set a new
                // timeout with shorter delay.
                allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

三个内部类是IdleSateHandle的内部类 可以看到内部是通过另起一个线程进行监听上一次对应事件的触发 如果超时则调用对应的事件

基于三的代码进行修改

首先是MessageHead消息头增加消息类型

public class MessageHead {
     private int headData=0X76;//协议开始标志
        private int length;//包的长度
        private String token;
        private Date createDate;
        private String type;//消息类型  ping表示心跳包
        public int getHeadData() {
            return headData;
        }
        public void setHeadData(int headData) {
            this.headData = headData;
        }
        public int getLength() {
            return length;
        }
        public void setLength(int length) {
            this.length = length;
        }

        public String getToken() {
            return token;
        }
        public void setToken(String token) {
            this.token = token;
        }
        public Date getCreateDate() {
            return createDate;
        }
        public void setCreateDate(Date createDate) {
            this.createDate = createDate;
        }

        public String getType() {
            return type;
        }
        public void setType(String type) {
            this.type = type;
        }
        @Override
        public String toString() {
            SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            // TODO Auto-generated method stub
            return "headData:"+headData+",length:"+length+",token:"+token+",createDate:"+    simpleDateFormat.format(createDate);
        }
}

MessageDecode

package com.liqiang.SimpeEcode;

import java.text.SimpleDateFormat;
import java.util.List;
import com.liqiang.nettyTest2.nettyMain;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.MessageToMessageDecoder;

public class MessageDecode extends ByteToMessageDecoder{

    private final int BASE_LENGTH=4+4+50+50+50;//协议头 类型 int+length 4个字节+消息类型加令牌和 令牌生成时间50个字节
    private int headData=0X76;//协议开始标志

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
        // 刻度长度必须大于基本长度
        if(buffer.readableBytes()>=BASE_LENGTH) {
            /**
             * 粘包 发送频繁 可能多次发送黏在一起 需要考虑  不过一个客户端发送太频繁也可以推断是否是攻击
             */
            //防止soket流攻击。客户端传过来的数据太大不合理
            if(buffer.readableBytes()>2048) {
                //buffer.skipBytes(buffer.readableBytes());

            }
        }
        int beginIndex;//记录包开始位置
        while(true) {
              // 获取包头开始的index
            beginIndex = buffer.readerIndex();
            //如果读到开始标记位置 结束读取避免拆包和粘包
            if(buffer.readInt()==headData) {
                break;
            }

            //初始化读的index为0
            buffer.resetReaderIndex();
            // 当略过,一个字节之后,
            //如果当前buffer数据小于基础数据 返回等待下一次读取
            if (buffer.readableBytes() < BASE_LENGTH) {
                return;
            }
        }
           // 消息的长度
        int length = buffer.readInt();
        // 判断请求数据包数据是否到齐   -150是消息头的长度。
        if ((buffer.readableBytes()-150) < length) {
            //没有到齐 返回读的指针 等待下一次数据到期再读
            buffer.readerIndex(beginIndex);
            return;
        }
        //读取消息类型
        byte[] typeByte=new byte[50];
        buffer.readBytes(typeByte);
        //读取令牌
        byte[] tokenByte=new byte[50];
        buffer.readBytes(tokenByte);

        //读取令牌生成时间
        byte[]createDateByte=new byte[50];
        buffer.readBytes(createDateByte);
        //读取content
        byte[] data = new byte[length];
        buffer.readBytes(data);
        MessageHead head=new MessageHead();
        head.setHeadData(headData);
        head.setToken(new String(tokenByte).trim());
        SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        head.setCreateDate(  simpleDateFormat.parse(new String(createDateByte).trim()));
        head.setLength(length);
        head.setType(new String(typeByte).trim());
        Message message=new Message(head, data);
        //认证不通过
        if(!message.authorization(message.buidToken())) {
            ctx.close();

            return;
        }
        out.add(message);
        buffer.discardReadBytes();//回收已读字节
    }

}

MessageEncoder

package com.liqiang.SimpeEcode;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Calendar;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MessageEncoder extends MessageToByteEncoder<Message> {

    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
        // TODO Auto-generated method stub
        // 写入开头的标志
        out.writeInt(msg.getHead().getHeadData());
        // 写入包的的长度
        out.writeInt(msg.getContent().length);
        byte[] typeByte = new byte[50];
        /**
         * type定长50个字节
         *  第一个参数 原数组
         *  第二个参数 原数组位置
         *  第三个参数 目标数组
         *  第四个参数 目标数组位置
         *  第五个参数 copy多少个长度
         */
        byte[] indexByte=msg.getHead().getType().getBytes();
        try {
            System.arraycopy(indexByte, 0, typeByte, 0,indexByte.length>typeByte.length?typeByte.length:indexByte.length);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }
        //写入消息类型
        out.writeBytes(typeByte);
        byte[] tokenByte = new byte[50];
        /**
         * token定长50个字节
         *  第一个参数 原数组
         *  第二个参数 原数组位置
         *  第三个参数 目标数组
         *  第四个参数 目标数组位置
         *  第五个参数 copy多少个长度
         */
         indexByte=msg.getHead().getToken().getBytes();
        try {
            System.arraycopy(indexByte, 0, tokenByte, 0,indexByte.length>tokenByte.length?tokenByte.length:indexByte.length);
        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        }

        //写入令牌
        out.writeBytes(tokenByte);
        byte[] createTimeByte = new byte[50];
        SimpleDateFormat format0 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String time = format0.format(msg.getHead().getCreateDate());
        indexByte=time.getBytes();
        System.arraycopy(indexByte, 0, createTimeByte, 0,indexByte.length>createTimeByte.length?createTimeByte.length:indexByte.length);
        //写入令牌生成时间
        out.writeBytes(createTimeByte);

        // 写入消息主体
        out.writeBytes(msg.getContent());

    }

}

红色部分为改动部分

ClientChannelInitializer

package com.liqiang.nettyTest2;

import java.util.concurrent.TimeUnit;

import com.liqiang.SimpeEcode.MessageDecode;
import com.liqiang.SimpeEcode.MessageEncoder;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

public class ClientChannelInitializer extends ChannelInitializer<SocketChannel> {

    private Client client;
    public  ClientChannelInitializer(Client client) {
        // TODO Auto-generated constructor stub
        this.client=client;
    }
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        // TODO Auto-generated method stub
        socketChannel.pipeline()
        //表示5秒向服务器发送一次心跳包   10秒没接收到服务器端信息表示服务器端通信异常 则会触发clientHandle userEventTriggered事件
         .addLast("ping",new IdleStateHandler(10, 5, 0, TimeUnit.SECONDS))
        .addLast("decoder",new MessageEncoder())
        .addLast("encoder",new MessageDecode())
        .addLast(new ClientHandle(client));//注册处理器

    }
}

ClientHandle修改

package com.liqiang.nettyTest2;

import java.util.Date;

import com.liqiang.SimpeEcode.Message;
import com.liqiang.SimpeEcode.MessageHead;

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.cors.CorsHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

public class ClientHandle extends ChannelInboundHandlerAdapter {

    Client client;
    public  ClientHandle(Client client) {
        // TODO Auto-generated constructor stub
       this.client=client;
    }
    /**
     * 读写超时事事件     * IdleStateHandle配置的 如果5秒没有触发writer事件 则会触发 userEventTrigerd方法 我们则写一次心跳     * 如果10秒没有触发read事件则表示服务器通信异常  因为我们每次发送一次心跳包 服务器都会做出对应的心跳反应
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent=((IdleStateEvent) evt);
            /**
             * 如果没有收到服务端的写 则表示服务器超时 判断是否断开连接
             */
            if(idleStateEvent.state()==IdleState.READER_IDLE) {
                System.out.println("服务器无响应");
                if(!ctx.channel().isOpen()) {
                    System.out.println("正在重连");
                    client.connection();
                    System.out.println("重连成功");
                }
            }else if(idleStateEvent.state()==IdleState.WRITER_IDLE) {
                //如果没有触发写事件则向服务器发送一次心跳包
                System.out.println("正在向服务端发送心跳包");
                MessageHead head=new MessageHead();
                byte[]content="".getBytes();
                head.setCreateDate(new Date());
                head.setType("ping");
                head.setLength(content.length);
                Message pingMessage=new Message(head,content);
                head.setToken(pingMessage.buidToken());
                 ctx.writeAndFlush(pingMessage);
            }
        }else {
            super.userEventTriggered(ctx, evt);
        }
    }
    //建立连接时回调
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        //System.out.println("与服务器建立连接成功");
        client.setServerChannel(ctx);
        client.setConnection(true);
        //ctx.fireChannelActive();//如果注册多个handle 下一个handel的事件需要触发需要调用这个方法

    }
    //读取服务器发送信息时回调
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message=(Message) msg;
        if(message.getHead().getType().equals("ping")) {
            //表示是心跳包 不做任何业务处理
        }else {
            // TODO Auto-generated method stub
            System.out.println(msg.toString());
        }

    }

    //发生异常时回调
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("发生异常 与服务器断开连接");
        ctx.close();//关闭连接
    }
}

ServerChannelInitializer

package com.liqiang.nettyTest2;

import java.util.concurrent.TimeUnit;

import com.liqiang.SimpeEcode.MessageDecode;
import com.liqiang.SimpeEcode.MessageEncoder;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.AsciiHeadersEncoder.NewlineType;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

public class ServerChannelInitializer extends ChannelInitializer<SocketChannel> {

    private Server server;
    public ServerChannelInitializer(Server server) {
        this.server=server;
    }
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // TODO Auto-generated method stub
        channel.pipeline()
        //7秒没收到客户端信息 则表示客户端因为网络等原因异常关闭
        .addLast("ping",new IdleStateHandler(7, 0, 0,TimeUnit.SECONDS))
        .addLast("decoder",new MessageDecode())
        .addLast("encoder",new MessageEncoder())
        .addLast(new ServerHandle(server));
    }

}

ServerHandle

package com.liqiang.nettyTest2;

import java.util.Date;

import com.liqiang.SimpeEcode.Message;
import com.liqiang.SimpeEcode.MessageHead;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;

public class ServerHandle extends ChannelInboundHandlerAdapter {

    private Server server;

    public ServerHandle(Server server) {
        // TODO Auto-generated constructor stub
        this.server = server;
    }
    /**
     * 读写超时事事件
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent event=(IdleStateEvent)evt;
            //如果读超时
            if(event.state()==IdleState.READER_IDLE) {
                    System.out.println("有客户端超时了");
                    ctx.channel().close();//关闭连接
            }
        }else {
            super.userEventTriggered(ctx, evt);
        }

    }

    // 建立连接时回调
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("有客户端建立连接了");
        server.addClient(ctx);
        // ctx.fireChannelActive();//pipeline可以注册多个handle 这里可以理解为是否通知下一个Handle继续处理
    }

    // 接收到客户端发送消息时回调
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Message message=(Message)msg;
        if(message.getHead().getType().equals("ping")) {
            //表示心跳包 服务端响应心跳包  而不做相关业务处理
            MessageHead head=new MessageHead();
            byte[] content="".getBytes();
            head.setCreateDate(new Date());
            head.setType("ping");
            head.setLength(content.length);
            Message pingMessage=new Message(head,content);
            head.setToken(pingMessage.buidToken());
             ctx.writeAndFlush(pingMessage);
        }else {
            System.out.println("server接收到客户端发送信息:" + msg.toString());
        }
        // TODO Auto-generated method stub

        // ctx.fireChannelRead(msg);pipeline可以注册多个handle 这里可以理解为是否通知下一个Handle继续处理
    }

    // 通信过程中发生异常回调
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // TODO Auto-generated method stub
        // super.exceptionCaught(ctx, cause);
        ctx.close();// 发生异常关闭通信通道
        System.out.println("发生异常与客户端失去连接");

        cause.printStackTrace();
        // ctx.fireExceptionCaught(cause);pipeline可以注册多个handle 这里可以理解为是否通知下一个Handle继续处理
    }
}

client

package com.liqiang.nettyTest2;

import com.liqiang.SimpeEcode.Message;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.EventExecutorGroup;

public class Client implements Runnable{
    private String ip;// ip
    private int port;// 端口
    private boolean isConnection = false;
    private ChannelHandlerContext serverChannel;

    public Client(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }

    // 与服务器建立连接
    public void connection() {
        new Thread(this).start();

    }
    @Override
    public void run() {
        // TODO Auto-generated method stub
        EventLoopGroup group = new NioEventLoopGroup();// 服务器监听服务器发送信息
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                .handler(new ClientChannelInitializer(this));// 基于NIO编程模型通信
        try {
            ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();

            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            System.out.println("连接服务器失败");
        }finally {
            //尝试重连
            System.out.println("正在重连");
            run();
        }
    }

    public void close() {
        serverChannel.close();
    }
    public boolean isConnection() {
        return isConnection;
    }

    public void setConnection(boolean isConnection) {
        this.isConnection = isConnection;
    }

    public void sendMsg(Message msg) {
        while(isConnection) {
            serverChannel.writeAndFlush(msg);
        }

    }

    public ChannelHandlerContext getServerChannel() {
        return serverChannel;
    }

    public void setServerChannel(ChannelHandlerContext serverChannel) {
        this.serverChannel = serverChannel;
    }

}

Server

package com.liqiang.nettyTest2;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Vector;

import com.liqiang.SimpeEcode.Message;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class Server implements Runnable {
    private int port;// 监听端口
    private Vector<ChannelHandlerContext> clients;// 保存在线客户端信息

    public Server(int port) {
        clients = new Vector<ChannelHandlerContext>();
        this.port = port;
    }

    // 广播
    public void sendAll(Message msg) {
        clients.forEach(c -> {
            c.writeAndFlush(msg);
        });
    }

    public void addClient(ChannelHandlerContext client) {
        clients.add(client);
    }

    @Override
    public void run() {
        /**
         * NioEventLoopGroup 内部维护一个线程池 如果构造函数没有指定线程池数量 则默认为系统core*2
         */
        EventLoopGroup acceptor = new NioEventLoopGroup();// acceptor负责监客户端连接请求
        EventLoopGroup worker = new NioEventLoopGroup();// worker负责io读写(监听注册channel的 read/writer事件)

        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(acceptor, worker).channel(NioServerSocketChannel.class)
                .localAddress(new InetSocketAddress(port)).childHandler(new ServerChannelInitializer(this))
                .option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
        try {
            ChannelFuture channelFuture = bootstrap.bind(port).sync();

            System.out.println("服务器已启动");
            // 将阻塞 直到服务器端关闭或者手动调用
             channelFuture.channel().closeFuture().sync();
            // 释放资源

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }finally {
                        acceptor.shutdownGracefully();
                        worker.shutdownGracefully();
        }

    }

    public void startServer() {
        new Thread(this).start();
    }

}

测试

package com.liqiang.nettyTest2;

import java.util.Date;

import com.liqiang.SimpeEcode.Message;
import com.liqiang.SimpeEcode.MessageHead;

public class nettyClientMain {
    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                Client client1 = new Client("127.0.0.1", 8081);
                client1.connection();
                String content = "哈哈哈哈!";
                byte[] bts = content.getBytes();
                MessageHead head = new MessageHead();
                // 令牌生成时间
                head.setCreateDate(new Date());
                head.setType("message");
                head.setLength(bts.length);
                Message message = new Message(head, bts);
                message.getHead().setToken(message.buidToken());
                client1.sendMsg(message);

            }
        }).start();

    }
}
package com.liqiang.nettyTest2;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

import javax.management.StringValueExp;
import javax.swing.text.StringContent;

import com.liqiang.SimpeEcode.Message;
import com.liqiang.SimpeEcode.MessageHead;

public class nettyMain {
    public static void main(String[] args) {
        new Thread(new Runnable() {

            @Override
            public void run() {
                // TODO Auto-generated method stub
                Server server = new Server(8081);
                server.startServer();

            }
        }).start();

    }

}

1.先开启服务端

2.再开启客户端

3.关闭服务端

然后我们再重新启动服务端 打印

正在重连
正在重连
正在重连
正在重连
正在重连
正在重连
正在重连
正在重连
正在向服务端发送心跳包
正在向服务端发送心跳包
正在向服务端发送心跳包
正在向服务端发送心跳包

原文地址:https://www.cnblogs.com/LQBlog/p/9163424.html

时间: 2024-11-10 14:28:55

netty心跳机制和断线重连(四)的相关文章

浅析 Netty 实现心跳机制与断线重连

基础 何为心跳 顾名思义, 所谓 心跳, 即在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. 为什么需要心跳 因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等, 会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的. 为了解决这个问题, 我们就需要引入 心跳 机制. 心跳机制

Netty学习篇④-心跳机制及断线重连

心跳检测 前言 客户端和服务端的连接属于socket连接,也属于长连接,往往会存在客户端在连接了服务端之后就没有任何操作了,但还是占用了一个连接:当越来越多类似的客户端出现就会浪费很多连接,netty中可以通过心跳检测来找出一定程度(自定义规则判断哪些连接是无效链接)的无效链接并断开连接,保存真正活跃的连接. 什么叫心跳检测 我理解的心跳检测应该是客户端/服务端定时发送一个数据包给服务端/客户端,检测对方是否有响应: 如果是存活的连接,在一定的时间内应该会收到响应回来的数据包: 如果在一定时间内

NETTY 心跳机制

最近工作比较忙,但闲暇之余还是看了阿里的冯家春(fengjiachun)的github上的开源代码Jupiter,写的RPC框架让我感叹人外有人,废话不多说,下面的代码全部截取自Jupiter,写了一个比较完整的例子,供大家一起学习分享,再次对@Luca抱拳,Jupiter的Github地址: https://github.com/fengjiachun/Jupiter 今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程: 1)客

Netty心跳机制

概念介绍网络中的接收和发送数据都是使用操作系统中的SOCKET进行实现.但是如果此套接字已经断开,那发送数据和接收数据的时候就一定会有问题.可是如何判断这个套接字是否还可以使用呢?这个就需要在系统中创建心跳机制.其实TCP中已经为我们实现了一个叫做心跳的机制.如果你设置了心跳,那TCP就会在一定的时间(比如你设置的是3秒钟)内发送你设置的次数的心跳(比如说2次),并且此信息不会影响你自己定义的协议.所谓“心跳”就是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着. 以确保链接的有效性.

webSocket使用心跳包实现断线重连

首先new一个webscoket的连接 this.noticeSocketLink = new WebSocket(‘webSocket的地址’) 这里是连接成功之后的操作 linkNoticeWebsocket(){ this.noticeSocketLink.onopen = ()=>{ 在连接成功打开的时候调用断线重连的函数 this.webCloseLink(this.noticeSocketLink) } this.noticeSocketLink.onmessage = res =>

Netty 实现心跳机制.md

netty 心跳机制示例,使用netty4,IdleStateHandler 实现. 本文假设你已经了解了Netty的使用,或者至少写过netty的helloworld,知道了netty的基本使用.我们知道使用netty的时候,大多数的东西都与Handler有关,我们的业务逻辑基本都是在Handler中实现的.Netty中自带了一个IdleStateHandler 可以用来实现心跳检测. 心跳检测的逻辑 本文中我们将要实现的心跳检测逻辑是这样的:服务端启动后,等待客户端连接,客户端连接之后,向服

基于netty实现的长连接,心跳机制及重连机制

技术:maven3.0.5 + netty4.1.33 + jdk1.8 概述 Netty是由JBOSS提供的一个java开源框架.Netty提供异步的.事件驱动的网络应用程序框架和工具,用以快速开发高性能.高可靠性的网络服务器和客户端程序. 也就是说,Netty 是一个基于NIO的客户.服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户.服务端应用.Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务

netty之心跳机制

1.心跳机制,在netty3和netty5上面都有.但是写法有些不一样. 2.心跳机制在服务端和客户端的作用也是不一样的.对于服务端来说:就是定时清除那些因为某种原因在一定时间段内没有做指定操作的客户端连接.对于服务端来说:用来检测是否断开连接,然后尝试重连等问题.游戏上面也可以来监控延时问题. 3.我这边只写了服务端的心跳用法,客户端基本差不多. 1)netty3的写法 import org.jboss.netty.bootstrap.ServerBootstrap; import org.j

Netty 超时机制及心跳程序实现

Netty 超时机制的介绍 Netty 的超时类型 IdleState 主要分为: ALL_IDLE : 一段时间内没有数据接收或者发送 READER_IDLE : 一段时间内没有数据接收 WRITER_IDLE : 一段时间内没有数据发送 在 Netty 的 timeout 包下,主要类有: IdleStateEvent : 超时的事件 IdleStateHandler : 超时状态处理 ReadTimeoutHandler : 读超时状态处理 WriteTimeoutHandler : 写超