一、简单介绍:
服务端通讯主线程是消息路由服务的启动类,其主要作用如下:
1、初始化相关配置;
2、根据配置的ip和port创建tcp服务;
3、接收客户端连接,并给客户端分配令牌;
4、接收客户端的登录请求,并将客户端相关信息(令牌、客户端登录标识、最后访问时间、当前token所使用的通道,保存到缓冲区)
5、接收客户端的报文请求,并添加到通讯队列,等待处理;
6、接收来自各处的指令发送请求,并发送至相关通道;
二、详细介绍:
1、启动方法:首先加载配置信息;然后启动主线程、通讯报文消费线程(处理通讯类报文)、超时、失效通道回收线程(进行超时和失效通道的回收工作)、短消息消费者线程(专门针对短消息队列进行处理的线程)。尤其是OP_WRITE,在OP_WRITE之后,必须将selector注册为OP_READ,否则会一直循环下去,死循环。
public static void main(String arg[]) throws Exception { //初始化配置数据 Config cfg = new Config(arg[0]); final GVServer gvServer = new GVServer(); //启动ServerSocket通道 if (gvServer.initServer(cfg)) { ExecutorService threadPool = Executors.newCachedThreadPool(); //启动通讯服务主线程 threadPool.execute(gvServer); //启动通讯报文消费线程 threadPool.execute(new CQueueConsumer(cfg.getWaitTime())); //启动超时通道、失效通道回收线程 threadPool.execute(new ConnTimeoutCleanThread(cfg.getCleanThreadOutTime(), cfg.getCleanThreadCycle())); threadPool.execute(new MQueueConsumer()); } }
2、初始化配置:打开tcp服务等待连接(略);
3、通讯事件处理:通讯主线程的run方法,主要对接收到的事件分别处理。这个地方尤其要注意的是,我第一篇文章提到的,所有触发的时间都必须被消费,否则会一直循环下去。
public void run() { while (true) { try { //监听事件key selector.select(2000); //迭代一组事件key Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while (keys.hasNext()) { //定义一个socket通道 SocketChannel socketChannel = null; int count = 0; SelectionKey key = keys.next(); // Logs.info("有网络事件被触发,事件类型为:" + key.interestOps()); //删除Iterator中的当前key,避免重复处理 keys.remove(); if (!key.isValid()) { continue; } else if (key.isAcceptable()) { //从客户端送来的key中获取ServerSocket通道 serverSocketChannel = (ServerSocketChannel) key.channel(); //接收此ServerSocket通道中的Socket通道,accept是一个阻塞方法,一直到获取到连接才会继续 socketChannel = serverSocketChannel.accept(); //将此socket通道设置为非阻塞模式 socketChannel.configureBlocking(false); //将此通道注册到selector,并等待接收客户端的读入数据 socketChannel.register(selector, SelectionKey.OP_READ); allocToken(socketChannel); } else if (key.isReadable()) { //获取事件key中的channel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getReadBlock()); //清理缓冲区,便于使用 byteBuffer.clear(); //将channel中的字节流读入缓冲区 count = socketChannel.read(byteBuffer); byteBuffer.flip(); //处理粘包 if (count > 0) { try { handlePacket(socketChannel, byteBuffer); } catch (Exception e) { e.printStackTrace(); // continue;//如果当前包存在非法抛出异常,那么不再进行处理直接跳出循环,处理下一个包;此处存疑,测试阶段暂时注释 } } else if (count == 0) { continue; } else { socketChannel.close(); logger.info("客户端"+socketChannel.toString()+"连接关闭!"); } } else if (key.isWritable()) { ((SocketChannel) key.channel()).register(selector, SelectionKey.OP_READ); } } } catch (IOException e) { e.printStackTrace(); } } }
4、将合法报文放入通讯队列:我在配置初始化的时候,明确规定了几种报文类,并在协议中限定了报文的格式,凡是不符合我报文格式的报文均视为非法报文,直接会给客户端回复一个错误指令。
/** * 处理不合法报文以及将合法报文放入队列等待处理 * * @param socketChannel * @param strPacket */ private void offerPacket(SocketChannel socketChannel, String strPacket) { IPacket packet = AnalyseTools.analysePacket(strPacket); if (packet.getHeader().equals(LoginPacket.HEADER)) { handleLoginPacket(socketChannel, packet); } //如果类为空或者从handle单例map中无法取到类,则证明报文非法 if (packet == null || Config.getPacketInstance(packet.getHeader()) == null) { //不在服务端识别范围内的报文,回复E响应,告知客户端不合法 ErrorOrder errorOrder = (ErrorOrder) Config.getOrderInstance(ErrorOrder.HEADER); errorOrder.initErrorOrder(errorOrder.INVAILD_REQ_CODE, errorOrder.INVAILD_REQ_MSG); logger.info("客户端发送非法报文:" + strPacket); GVServer.write2Client(errorOrder, socketChannel); //将合法报文放入消息队列中等待处理 } else { if (!GVQueue.CQUEUE.offer(packet)) { logger.error("消息队列已满,请增加队列容量!"); } else { if (logger.isDebugEnabled()) { logger.debug("将添加至消息队列,队列内消息数量为:" + GVQueue.CQUEUE.size()); } } } }
5、分配客户端的令牌:客户端连接之后,服务端马上会给客户端回复一个T指令,告诉客户端它的令牌,从此以后,客户端每次报文,都必须携带此令牌;在通讯这一层里,服务端根据token确定客户端;
private void allocToken(SocketChannel socketChannel) { //给连接上来的通道分配token TokenOrder tokenOrder = (TokenOrder) Config.getOrderInstance(TokenOrder.HEADER); String token = UUID.randomUUID().toString(); tokenOrder.initTokenOrder(token); //返回给客户端token write2Client(tokenOrder, socketChannel); logger.info("客户端:<" + token + ">已经连接!"); //将连接后的channel保存到全局map中 //2015.8.13修改,先把userId存为null,等待用户登录后,在将, // GVConnection gvConnection = new GVConnection(token,null, socketChannel, CommonTools.systemTimeUtc()); // GVConnTools.addConn2Cache(gvConnection); }
6、客户端登录处理:仅凭借客户端token,我没法将服务用于业务中,业务中往往会存在一个用户的用户标记,我需要能够根据用户的标记,往通道里面写入消息;所以,有了客户端登录过程,客户端将自己唯一的业务标记提交到服务端。服务端建立一个token、用户标记、用户最后访问时间、通道的缓冲区(统一成了一个类GVConnection),专门用语指令的发送,并且保持几项内容的同步,GVConnTools为操作这些内容的唯一入口;
/** * 专门处理客户端登录报文,保存GVConn到缓冲区 * 【注】对于userId重复的情况,在这里不做处理了,由业务系统自己处理, * 这里对userId重复相当于后登录的用户替换了先登录用户的通道。 * * @param socketChannel * @param packet */ private void handleLoginPacket(SocketChannel socketChannel, IPacket packet) { GVConnection gvConn = new GVConnection(packet.getClientToken(), packet.getPacketBody(), socketChannel, CommonTools.systemTimeUtc()); GVConnTools.addConn2Cache(gvConn); }
7、消息写入通道:其实我完全可以在奔雷的外部提供一个专门的写入方法,但是当时好像脑子进水了,这个等以后迭代的时候在考虑如何处理吧。暂时放到这里。需要注意的是,这个方法是唯一对协议的前四位包头进行封装的方法,在所有其他的类中,都不需要对报文的前四位予以考虑。在客户端读取的时候,也会将前四位截取掉之后,或者将字符串放入队列,或者将一个报文(指令)对象放入队列。(为什么需要这四位,我将在下一个小部分——粘包、断包中讲解)
/** * 向客户端写入信息的方法 * * @param iOrder 报文处理类接口 * @param socketChannel */ public static synchronized void write2Client(IOrder iOrder, SocketChannel socketChannel) { try { socketChannel.register(selector, SelectionKey.OP_WRITE); //创建一个byteBuffer用来存储要写入的buffer ByteBuffer byteBuffer = ByteBuffer.allocate(Config.getWriteBlock()); //得出整个包体的长度 String packetSize = Integer.toString(iOrder.generateOrderStr().getBytes().length); //讲包体长度放入buffer的前四位 byteBuffer.put(packetSize.getBytes()); //移动buffer的postion指针到第四位,包体将从第四位开始写入 byteBuffer.position(PACKET_HEAD_LENGTH); String str = iOrder.generateOrderStr(); //写入包体 if (logger.isDebugEnabled()) { logger.debug("服务端写入通道的包体:" + str); } byteBuffer.put(str.getBytes()); byteBuffer.flip(); socketChannel.write(byteBuffer); } catch (IOException e) { e.printStackTrace(); } }
8、粘包、断包。
晕,服务器保存不了这么多文字了,我另开一章吧,不好意思啊。