Netty实现服务端客户端长连接通讯及心跳检测

通过netty实现服务端与客户端的长连接通讯,及心跳检测。

       基本思路:netty服务端通过一个Map保存所有连接上来的客户端SocketChannel,客户端的Id作为Map的key。每次服务器端如果要向某个客户端发送消息,只需根据ClientId取出对应的SocketChannel,往里面写入message即可。心跳检测通过IdleEvent 事件,定时向服务端放送Ping消息,检测SocketChannel是否终断。

        环境JDK1.8 和netty5

        以下是具体的代码实现和介绍:

1公共的Share部分(主要包含消息协议类型的定义)

     设计消息类型:

public enum  MsgType {
    PING,ASK,REPLY,LOGIN
}

Message基类:

//必须实现序列,serialVersionUID 一定要有,否者在netty消息序列化反序列化会有问题,接收不到消息!!!
public abstract class BaseMsg  implements Serializable {
    private static final long serialVersionUID = 1L;
    private MsgType type;
    //必须唯一,否者会出现channel调用混乱
    private String clientId;

    //初始化客户端id
    public BaseMsg() {
        this.clientId = Constants.getClientId();
    }

    public String getClientId() {
        return clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    public MsgType getType() {
        return type;
    }

    public void setType(MsgType type) {
        this.type = type;
    }
}

常量设置:

public class Constants {
    private static String clientId;
    public static String getClientId() {
        return clientId;
    }
    public static void setClientId(String clientId) {
        Constants.clientId = clientId;
    }
}

登录类型消息:

public class LoginMsg extends BaseMsg {
    private String userName;
    private String password;
    public LoginMsg() {
        super();
        setType(MsgType.LOGIN);
    }

    public String getUserName() {
        return userName;
    }

    public void setUserName(String userName) {
        this.userName = userName;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

心跳检测Ping类型消息:

public class PingMsg extends BaseMsg {
    public PingMsg() {
        super();
        setType(MsgType.PING);
    }
}

请求类型消息:

public class AskMsg extends BaseMsg {
    public AskMsg() {
        super();
        setType(MsgType.ASK);
    }
    private AskParams params;

    public AskParams getParams() {
        return params;
    }

    public void setParams(AskParams params) {
        this.params = params;
    }
}
//请求类型参数
//必须实现序列化接口
public class AskParams implements Serializable {
    private static final long serialVersionUID = 1L;
    private String auth;

    public String getAuth() {
        return auth;
    }

    public void setAuth(String auth) {
        this.auth = auth;
    }
}

响应类型消息:

public class ReplyMsg extends BaseMsg {
    public ReplyMsg() {
        super();
        setType(MsgType.REPLY);
    }
    private ReplyBody body;

    public ReplyBody getBody() {
        return body;
    }

    public void setBody(ReplyBody body) {
        this.body = body;
    }
}
//相应类型body对像
public class ReplyBody implements Serializable {
    private static final long serialVersionUID = 1L;
}
public class ReplyClientBody extends ReplyBody {
    private String clientInfo;

    public ReplyClientBody(String clientInfo) {
        this.clientInfo = clientInfo;
    }

    public String getClientInfo() {
        return clientInfo;
    }

    public void setClientInfo(String clientInfo) {
        this.clientInfo = clientInfo;
    }
}
public class ReplyServerBody extends ReplyBody {
    private String serverInfo;
    public ReplyServerBody(String serverInfo) {
        this.serverInfo = serverInfo;
    }
    public String getServerInfo() {
        return serverInfo;
    }
    public void setServerInfo(String serverInfo) {
        this.serverInfo = serverInfo;
    }
}

2 Server端:主要包含对SocketChannel引用的Map,ChannelHandler的实现和Bootstrap.

Map:

public class NettyChannelMap {
    private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();
    public static void add(String clientId,SocketChannel socketChannel){
        map.put(clientId,socketChannel);
    }
    public static Channel get(String clientId){
       return map.get(clientId);
    }
    public static void remove(SocketChannel socketChannel){
        for (Map.Entry entry:map.entrySet()){
            if (entry.getValue()==socketChannel){
                map.remove(entry.getKey());
            }
        }
    }

}

Handler

public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //channel失效,从Map中移除
        NettyChannelMap.remove((SocketChannel)ctx.channel());
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {

        if(MsgType.LOGIN.equals(baseMsg.getType())){
            LoginMsg loginMsg=(LoginMsg)baseMsg;
            if("robin".equals(loginMsg.getUserName())&&"yao".equals(loginMsg.getPassword())){
                //登录成功,把channel存到服务端的map中
                NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());
                System.out.println("client"+loginMsg.getClientId()+" 登录成功");
            }
        }else{
            if(NettyChannelMap.get(baseMsg.getClientId())==null){
                    //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录
                    LoginMsg loginMsg=new LoginMsg();
                    channelHandlerContext.channel().writeAndFlush(loginMsg);
            }
        }
        switch (baseMsg.getType()){
            case PING:{
                PingMsg pingMsg=(PingMsg)baseMsg;
                PingMsg replyPing=new PingMsg();
                NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);
            }break;
            case ASK:{
                //收到客户端的请求
                AskMsg askMsg=(AskMsg)baseMsg;
                if("authToken".equals(askMsg.getParams().getAuth())){
                    ReplyServerBody replyBody=new ReplyServerBody("server info $$$$ !!!");
                    ReplyMsg replyMsg=new ReplyMsg();
                    replyMsg.setBody(replyBody);
                    NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);
                }
            }break;
            case REPLY:{
                //收到客户端回复
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();
                System.out.println("receive client msg: "+clientBody.getClientInfo());
            }break;
            default:break;
        }
        ReferenceCountUtil.release(baseMsg);
    }
}

ServerBootstrap:

public class NettyServerBootstrap {
    private int port;
    private SocketChannel socketChannel;
    public NettyServerBootstrap(int port) throws InterruptedException {
        this.port = port;
        bind();
    }

    private void bind() throws InterruptedException {
        EventLoopGroup boss=new NioEventLoopGroup();
        EventLoopGroup worker=new NioEventLoopGroup();
        ServerBootstrap bootstrap=new ServerBootstrap();
        bootstrap.group(boss,worker);
        bootstrap.channel(NioServerSocketChannel.class);
        bootstrap.option(ChannelOption.SO_BACKLOG, 128);
        //通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
        bootstrap.option(ChannelOption.TCP_NODELAY, true);
        //保持长连接状态
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ChannelPipeline p = socketChannel.pipeline();
                p.addLast(new ObjectEncoder());
                p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                p.addLast(new NettyServerHandler());
            }
        });
        ChannelFuture f= bootstrap.bind(port).sync();
        if(f.isSuccess()){
            System.out.println("server start---------------");
        }
    }
    public static void main(String []args) throws InterruptedException {
        NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);
        while (true){
            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");
            if(channel!=null){
                AskMsg askMsg=new AskMsg();
                channel.writeAndFlush(askMsg);
            }
            TimeUnit.SECONDS.sleep(5);
        }
    }
}

3 Client端:包含发起登录,发送心跳,及对应消息处理

handler

public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {
    //利用写空闲发送心跳检测消息
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent e = (IdleStateEvent) evt;
            switch (e.state()) {
                case WRITER_IDLE:
                    PingMsg pingMsg=new PingMsg();
                    ctx.writeAndFlush(pingMsg);
                    System.out.println("send ping to server----------");
                    break;
                default:
                    break;
            }
        }
    }
    @Override
    protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {
        MsgType msgType=baseMsg.getType();
        switch (msgType){
            case LOGIN:{
                //向服务器发起登录
                LoginMsg loginMsg=new LoginMsg();
                loginMsg.setPassword("yao");
                loginMsg.setUserName("robin");
                channelHandlerContext.writeAndFlush(loginMsg);
            }break;
            case PING:{
                System.out.println("receive ping from server----------");
            }break;
            case ASK:{
                ReplyClientBody replyClientBody=new ReplyClientBody("client info **** !!!");
                ReplyMsg replyMsg=new ReplyMsg();
                replyMsg.setBody(replyClientBody);
                channelHandlerContext.writeAndFlush(replyMsg);
            }break;
            case REPLY:{
                ReplyMsg replyMsg=(ReplyMsg)baseMsg;
                ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();
                System.out.println("receive client msg: "+replyServerBody.getServerInfo());
            }
            default:break;
        }
        ReferenceCountUtil.release(msgType);
    }
}

bootstrap

public class NettyClientBootstrap {
    private int port;
    private String host;
    private SocketChannel socketChannel;
    private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);
    public NettyClientBootstrap(int port, String host) throws InterruptedException {
        this.port = port;
        this.host = host;
        start();
    }
    private void start() throws InterruptedException {
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup();
        Bootstrap bootstrap=new Bootstrap();
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);
        bootstrap.group(eventLoopGroup);
        bootstrap.remoteAddress(host,port);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));
                socketChannel.pipeline().addLast(new ObjectEncoder());
                socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
                socketChannel.pipeline().addLast(new NettyClientHandler());
            }
        });
        ChannelFuture future =bootstrap.connect(host,port).sync();
        if (future.isSuccess()) {
            socketChannel = (SocketChannel)future.channel();
            System.out.println("connect server  成功---------");
        }
    }
    public static void main(String[]args) throws InterruptedException {
        Constants.setClientId("001");
        NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");

        LoginMsg loginMsg=new LoginMsg();
        loginMsg.setPassword("yao");
        loginMsg.setUserName("robin");
        bootstrap.socketChannel.writeAndFlush(loginMsg);
        while (true){
            TimeUnit.SECONDS.sleep(3);
            AskMsg askMsg=new AskMsg();
            AskParams askParams=new AskParams();
            askParams.setAuth("authToken");
            askMsg.setParams(askParams);
            bootstrap.socketChannel.writeAndFlush(askMsg);
        }
    }
}

具体的例子和相应pom.xml 见 https://github.com/WangErXiao/ServerClient

 

来源: http://my.oschina.net/robinyao/blog/399060

时间: 2024-08-06 20:28:51

Netty实现服务端客户端长连接通讯及心跳检测的相关文章

在HTTP通讯过程中,是客户端还是服务端主动断开连接?

比如说:IE访问IIS,获取文件,肯定是要建立一个连接,这个连接在完成通讯后,是客户端Close了连接,还是服务端Close了连接.我用程序测模拟IE和IIS,都没有收到断开连接的消息,也就是都没有触发OnClose事件.我是用Socket建立的连接.如果两方面都没有主动断开连接,那么我猜测可能是传输的数据中有结束的标志,请问这个标志是怎样的?谢谢各位. 解决方案 ? 不知道iis是怎么弄得http的回应包中有个字段通常是close收到指定长度之后就应该断开的. HTTP 你的意思是B/S模式的

Netty实例-简单的服务端-客户端实现,注释详细

       书籍推荐:                                       实例代码 :http://download.csdn.net/detail/jiangtao_st/7677503 Netty Server端实现 /** * * <p> * Netty Server Simple * </p> * * @author 卓轩 * @创建时间:2014年7月7日 * @version: V1.0 */ public class NettyServer

手写一个模块化的 TCP 服务端客户端

前面的博客 基于 socket 手写一个 TCP 服务端及客户端 写过一个简单的 TCP 服务端客户端,没有对代码结构进行任何设计,仅仅是实现了相关功能,用于加深对 socket 编程的认识. 这次我们对整个代码结构进行一下优化,使其模块化,易扩展,成为一个简单意义上的“框架”. 对于 Socket 编程这类所需知识偏底层的情况(OS 协议栈的运作机制,TCP 协议的理解,多线程的理解,BIO/NIO 的理解,阻塞函数的运作原理甚至是更底层处理器的中断.网卡等外设与内核的交互.核心态与内核态的切

PHP-Socket服务端客户端发送接收通信实例详解

Socket介绍 什么是socket 所谓socket通常也称作"套接字",用于描述IP地址和端口,是一个通信链的句柄.应用程序通常通过"套接字"向网络发出请求或者应答网络请求. 在Internet上的主机一般运行了多个服务软件,同时提供几种服务.每种服务都打开一个Socket,并绑定到一个端口上,不同的端口对应于不同的服务. Socket连接过程 根据连接启动的方式以及本地套接字要连接的目标,套接字之间的连接过程可以分为三个步骤:服务器监听,客户端请求,连接确认.

原理剖析-Netty之服务端启动工作原理分析(下)

一.大致介绍 1.由于篇幅过长难以发布,所以本章节接着上一节来的,上一章节为[原理剖析(第 010 篇)Netty之服务端启动工作原理分析(上)]: 2.那么本章节就继续分析Netty的服务端启动,分析Netty的源码版本为:netty-netty-4.1.22.Final: 二.三.四章节请看上一章节 四.源码分析Netty服务端启动 上一章节,我们主要分析了一下线程管理组对象是如何被实例化的,并且还了解到了每个线程管理组都有一个子线程数组来处理任务: 那么接下来我们就直接从4.6开始分析了:

TCP/IP网络编程之基于TCP的服务端/客户端(二)

回声客户端问题 上一章TCP/IP网络编程之基于TCP的服务端/客户端(一)中,我们解释了回声客户端所存在的问题,那么单单是客户端的问题,服务端没有任何问题?是的,服务端没有问题,现在先让我们回顾下服务端的I/O代码 echo_server.c --while ((str_len = read(clnt_sock, messag, 1024)) != 0) write(clnt_sock, messag, str_len);-- 接着,我们回顾客户端的代码 echo_client.c -- wr

Netty的服务端的建立

用Netty建立服务端, 首先我们应该新建立一个新的类, 作为服务端, 在其中写一个run方法, 作为启动: 1 public void run(){ 2 // 处理 Nio的Accept 3 EventLoopGroup boss = new NioEventLoopGroup(); 4 // 处理 Nio的Read和Write事件 5 EventLoopGroup worker = new NioEventLoopGroup(); 6 try { 7 // Netty中服务端启动类 8 Se

互联网推送服务原理:长连接+心跳机制(MQTT协议)

互联网推送消息的方式很常见,特别是移动互联网上,手机每天都能收到好多推送消息,经过研究发现,这些推送服务的原理都是维护一个长连接(要不不可能达到实时效果),但普通的socket连接对服务器的消耗太大了,所以才会出现像MQTT这种轻量级低消耗的协议来维护长连接,那么要如何维护长连接呢: 在写之前,我们首先了解一下为什么Android维护长连接需要心跳机制,首先我们知道,维护任何一个长连接都需要心跳机制,客户端发送一个心跳给 服务器,服务器给客户端一个心跳应答,这样就形成客户端服务器的一次完整的握手

回显服务端/客户端

回显服务端/客户端 在这一章,我们将会实现一个小的客户端/服务端应用,这可能会是你写过的最简单的客户端/服务端应用.回显应用就是一个把客户端发过来的任何内容回显给其本身,然后关闭连接的的服务端.这个服务端可以处理任何数量的客户端.每个客户端连接之后发送一个消息,服务端接收到完成消息后把它发送回去.在那之后,服务端关闭连接. 因此,每个回显客户端连接到服务端,发送一个消息,然后读取服务端返回的结果,确保这是它发送给服务端的消息就结束和服务端的会话. 我们首先实现一个同步应用,然后实现一个异步应用,