基于NIO的消息路由的实现(四) 服务端通讯主线程(1)

一、简单介绍:

服务端通讯主线程是消息路由服务的启动类,其主要作用如下:

1、初始化相关配置;

2、根据配置的ip和port创建tcp服务;

3、接收客户端连接,并给客户端分配令牌;

4、接收客户端的登录请求,并将客户端相关信息(令牌、客户端登录标识、最后访问时间、当前token所使用的通道,保存到缓冲区)

5、接收客户端的报文请求,并添加到通讯队列,等待处理;

6、接收来自各处的指令发送请求,并发送至相关通道;

二、详细介绍:

1、启动方法:首先加载配置信息;然后启动主线程、通讯报文消费线程(处理通讯类报文)、超时、失效通道回收线程(进行超时和失效通道的回收工作)、短消息消费者线程(专门针对短消息队列进行处理的线程)。尤其是OP_WRITE,在OP_WRITE之后,必须将selector注册为OP_READ,否则会一直循环下去,死循环。

public static void main(String arg[]) throws Exception {
    //初始化配置数据
    Config cfg = new Config(arg[0]);
    final GVServer gvServer = new GVServer();
    //启动ServerSocket通道
    if (gvServer.initServer(cfg)) {

        ExecutorService threadPool = Executors.newCachedThreadPool();
        //启动通讯服务主线程
        threadPool.execute(gvServer);
        //启动通讯报文消费线程
        threadPool.execute(new CQueueConsumer(cfg.getWaitTime()));
        //启动超时通道、失效通道回收线程
        threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle()));
        threadPool.execute(new MQueueConsumer());
    }
}

2、初始化配置:打开tcp服务等待连接(略);

3、通讯事件处理:通讯主线程的run方法,主要对接收到的事件分别处理。这个地方尤其要注意的是,我第一篇文章提到的,所有触发的时间都必须被消费,否则会一直循环下去。

    public void run() {

        while (true) {
            try {

                //监听事件key
                selector.select(2000);
                //迭代一组事件key
                Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
                while (keys.hasNext()) {
                    //定义一个socket通道
                    SocketChannel socketChannel = null;

                    int count = 0;

                    SelectionKey key = keys.next();
                    //  Logs.info("有网络事件被触发,事件类型为:" + key.interestOps());
                    //删除Iterator中的当前key,避免重复处理
                    keys.remove();
                    if (!key.isValid()) {
                        continue;
                    } else if (key.isAcceptable()) {
                        //从客户端送来的key中获取ServerSocket通道
                        serverSocketChannel = (ServerSocketChannel) key.channel();
                        //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续
                        socketChannel = serverSocketChannel.accept();
                        //将此socket通道设置为非阻塞模式
                        socketChannel.configureBlocking(false);
                        //将此通道注册到selector,并等待接收客户端的读入数据
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        allocToken(socketChannel);

                    } else if (key.isReadable()) {

                        //获取事件key中的channel
                        socketChannel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock());
                        //清理缓冲区,便于使用
                        byteBuffer.clear();
                        //将channel中的字节流读入缓冲区
                        count = socketChannel.read(byteBuffer);
                        byteBuffer.flip();
                        //处理粘包
                        if (count > 0) {
                            try {
                                handlePacket(socketChannel, byteBuffer);
                            } catch (Exception e) {
                                e.printStackTrace();
//                                continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释
                            }
                        } else if (count == 0) {
                            continue;
                        } else {
                            socketChannel.close();
                            logger.info("客户端"+socketChannel.toString()+"连接关闭!");

                        }

                    } else if (key.isWritable()) {
                        ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ);
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }

4、将合法报文放入通讯队列:我在配置初始化的时候,明确规定了几种报文类,并在协议中限定了报文的格式,凡是不符合我报文格式的报文均视为非法报文,直接会给客户端回复一个错误指令。

/**
 * 处理不合法报文以及将合法报文放入队列等待处理
 *
 * @param socketChannel
 * @param strPacket
 */
private void offerPacket(SocketChannel socketChannel, String strPacket) {
    IPacket packet = AnalyseTools.analysePacket(strPacket);
    if (packet.getHeader().equals(LoginPacket.HEADER)) {
        handleLoginPacket(socketChannel, packet);
    }
    //如果类为空或者从handle单例map中无法取到类,则证明报文非法
    if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) {
        //不在服务端识别范围内的报文,回复E响应,告知客户端不合法
        ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER);
        errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG);

        logger.info("客户端发送非法报文:" + strPacket);
        GVServer.write2Client(errorOrder, socketChannel);
        //将合法报文放入消息队列中等待处理
    } else {
        if (!GVQueue.CQUEUE.offer(packet)) {
            logger.error("消息队列已满,请增加队列容量!");

        } else {
            if (logger.isDebugEnabled()) {
                logger.debug("将添加至消息队列,队列内消息数量为:" + GVQueue.CQUEUE.size());
            }
        }
    }
}

5、分配客户端的令牌:客户端连接之后,服务端马上会给客户端回复一个T指令,告诉客户端它的令牌,从此以后,客户端每次报文,都必须携带此令牌;在通讯这一层里,服务端根据token确定客户端;

    private void allocToken(SocketChannel socketChannel) {

        //给连接上来的通道分配token
        TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER);
        String token = UUID.randomUUID().toString();
        tokenOrder.initTokenOrder(token);
        //返回给客户端token
        write2Client(tokenOrder, socketChannel);
        logger.info("客户端:<" + token + ">已经连接!");
        //将连接后的channel保存到全局map中
        //2015.8.13修改,先把userId存为null,等待用户登录后,在将,
//        GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc());
//        GVConnTools.addConn2Cache(gvConnection);

    }

6、客户端登录处理:仅凭借客户端token,我没法将服务用于业务中,业务中往往会存在一个用户的用户标记,我需要能够根据用户的标记,往通道里面写入消息;所以,有了客户端登录过程,客户端将自己唯一的业务标记提交到服务端。服务端建立一个token、用户标记、用户最后访问时间、通道的缓冲区(统一成了一个类GVConnection),专门用语指令的发送,并且保持几项内容的同步,GVConnTools为操作这些内容的唯一入口;

/**
 * 专门处理客户端登录报文,保存GVConn到缓冲区
 * 【注】对于userId重复的情况,在这里不做处理了,由业务系统自己处理,
 * 这里对userId重复相当于后登录的用户替换了先登录用户的通道。
 *
 * @param socketChannel
 * @param packet
 */
private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) {

    GVConnection gvConn = new GVConnection(packet.getClientToken(),
            packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc());
    GVConnTools.addConn2Cache(gvConn);

}

7、消息写入通道:其实我完全可以在奔雷的外部提供一个专门的写入方法,但是当时好像脑子进水了,这个等以后迭代的时候在考虑如何处理吧。暂时放到这里。需要注意的是,这个方法是唯一对协议的前四位包头进行封装的方法,在所有其他的类中,都不需要对报文的前四位予以考虑。在客户端读取的时候,也会将前四位截取掉之后,或者将字符串放入队列,或者将一个报文(指令)对象放入队列。(为什么需要这四位,我将在下一个小部分——粘包、断包中讲解)

/**
 * 向客户端写入信息的方法
 *
 * @param iOrder        报文处理类接口
 * @param socketChannel
 */
public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) {
    try {
        socketChannel.register(selector, SelectionKey.OP_WRITE);
        //创建一个byteBuffer用来存储要写入的buffer
        ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock());
        //得出整个包体的长度
        String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length);
        //讲包体长度放入buffer的前四位
        byteBuffer.put(packetSize.getBytes());
        //移动buffer的postion指针到第四位,包体将从第四位开始写入
        byteBuffer.position(PACKET_HEAD_LENGTH);
        String str = iOrder.generateOrderStr();
        //写入包体
        if (logger.isDebugEnabled()) {
            logger.debug("服务端写入通道的包体:" + str);
        }
        byteBuffer.put(str.getBytes());
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    } catch (IOException e) {
        e.printStackTrace();
    }
}

8、粘包、断包。

晕,服务器保存不了这么多文字了,我另开一章吧,不好意思啊。

时间: 2024-10-13 21:57:19

基于NIO的消息路由的实现(四) 服务端通讯主线程(1)的相关文章

基于NIO的消息路由的实现(三)服务端与客户端结构

一.服务器端结构: 如图所示: 指令类和报文类:对下行的指令和上行的报文进行了类的封装,分别实现IOrder和IPacket接口,继承Order,Packet基类: 服务主线程:接受客户端连接,将客户端发送的报文投递到通讯队列中,发送指令给客户端:保存连接对象(GVConnection) 通讯队列CQ:存储客户端发送过来的报文,此报文由通讯主线程放入: 通讯队列消费者:对通讯队列中的报文进行处理,该做什么做什么,如果是短消息,则放入消息队列MQ中单独处理: 消息队列消费者:对MQ中的短消息进行处

基于NIO的消息路由的实现(四) 服务端通讯主线程(2)断包和粘包的处理

本来我打算单独开一章,专门说明粘包和断包,但是觉得这个事儿我在做的时候挺头疼的,但是对于别人或许不那么重要,于是就在这里写吧. 那么何谓粘包.何谓断包呢? 粘包:我们知道客户端在写入报文给服务端的时候,首先要将需要写入的内容写入Buffer,以ByteBuffer为例,如果你Buffer定义的足够大,并且你发送的报文足够快,此时就会产生粘包现象,举例来说 你发送一个 报文" M|A",然后你有发送了一个"M|B",如果产生粘包,服务端从缓冲区里面读出的就是"

基于NIO的消息路由的实现(七)客户端的一些实现,维持链路,断线重连

一.客户端代码存在的必要性以及我认为需要解决的问题 就NIO通讯本身而言完全没必要分开,其实客户端代码和服务端代码可以放到一起.但是在业务上是分开的.我在做nio的时候思考了许多我自己认为应该解决的问题:主要的如下: 1.链路维护(心跳): 定期的向服务端发送维持链路报文,获得服务端的响应,以证明其仍然在存活状态:同时服务端会记录客户端每次维持链路的时间,用于服务端对通道的超时 判断: 2.断线重连: 一种情况是正常断线,目前我利用对channel的read返回来进行判断: 另一种是非正常短线,

基于NIO的消息路由的实现(六)报文队列的处理

一.报文队列的处理: 如果将多路复用器获取到的所有事件,阻塞式的同步处理,那恐怕会严重影响selector的性能,所以我把从客户端接收到的大部分消息,都放入了队列中,然后另外启动队列的消费线程对消息进行异步的处理:具体如下: 1.通讯报文队列消费者:在selector对read事件的处理过程中,我在最后都把客户端发送的报文放入了一个叫CQUEUE的队列中,具体定义如下,CQUEUE是所有客户端发送报文的队列,在CQUEUE队列中的消费者线程中,我又对M类报文进行了对垒处理,放入了另一个队列MQU

Handler消息传递机制(四)子线程接收主线程发送的消息

package com.example.looper; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.view.View; import android.view.View.OnClickListener; import android.widg

Handler详解系列(四)——利用Handler在主线程与子线程之间互发消息

MainActivity如下: package cc.c; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.widget.TextView; /** * Demo描述: * * 示例步骤如下: * 1 子线程给子线程本身发送消息 * 2 收到1的消

Handler具体解释系列(四)——利用Handler在主线程与子线程之间互发消息

MainActivity例如以下: package cc.c; import android.app.Activity; import android.os.Bundle; import android.os.Handler; import android.os.Looper; import android.os.Message; import android.widget.TextView; /** * Demo描写叙述: * * 演示样例过程例如以下: * 1 子线程给子线程本身发送消息 *

IOS 推送消息 php做推送服务端

IOS推送消息是许多IOS应用都具备的功能,最近也在研究这个功能,参考了很多资料终于搞定了,下面就把步骤拿出来分享下: iOS消息推送的工作机制可以简单的用下图来概括: Provider是指某个iPhone软件的Push服务器,APNS是Apple Push Notification Service的缩写,是苹果的服务器. 上图可以分为三个阶段: 第一阶段:应用程序把要发送的消息.目的iPhone的标识打包,发给APNS. 第二阶段:APNS在自身的已注册Push服务的iPhone列表中,查找有

Java 消息推送------GoEasy实现服务端推送和web端推送

项目中需要消息推送,又想较低开发成本,具体需求:角色用户在后台管理页面发布一个消息,所有用这个系统的用户无论在哪个页面都能及时收到他发布的消息,后来我在网上查询到了一个第三方的免费推送服务-GoEasy push, 它可以满足我的需求,下面是如何用GoEasy进行信息推送及接收: 第一种:Java服务器端推送,web端接收推送信息 步骤: 从GoEasy官网下载jar包,并放到项目中. https://cdn.goeasy.io/sdk/goeasy-0.1.jar Java代码来了,你没有看错