因为最近要从公司离职,害怕用nio写的网络程序没有人能看懂(或许是因为写的不好吧),就调整成了mina(这样大家接触起来非常方便,即使没有socket基础,用起来也不难),所以之前基于nio写的网络程序就开放出来好了!
写的比较挫,大家见谅!
首先是PollServer类,主要处理select,做网络事件的监听和基于FutureTask的数据发送,代码如下:
package gs.gate; import gs.gate.handle.ClientHandle; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.Vector; import java.util.concurrent.FutureTask; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import org.apache.log4j.Logger; import door.HeartTimer; public class PollServer implements Runnable { private Logger log = Logger.getLogger(getClass()); private Selector select = null; private ServerSocketChannel serverSocketChannel = null; private HeartTimer writerExpire = null; private volatile boolean run = true; private List<FutureTask<Integer>> writeTasks = new Vector<FutureTask<Integer>>(); public PollServer(String host,int port) throws IOException { select = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(host, port)); serverSocketChannel.register(select, SelectionKey.OP_ACCEPT); } public Selector getSelector() { return this.select; } public void stop() { this.run = false; } public void run() { if(writerExpire == null) { writerExpire = new HeartTimer(50); } while(this.run) { try { this.listen(); } catch (Exception e) { log.info("PollServer listen() err! " + e.toString()); } try { List<FutureTask<Integer>> writeTasks_ = null; synchronized (writeTasks) { writeTasks_ = new ArrayList<FutureTask<Integer>>(this.writeTasks); this.writeTasks.clear(); } for(FutureTask<Integer> task : writeTasks_) { task.run(); } } catch (Exception e) { log.error("PollServer processOutput() err" ,e); } } } public void listen() throws IOException { select.select(10); Set<SelectionKey> readyKeys = select.selectedKeys(); Iterator<SelectionKey> itr = readyKeys.iterator(); //处理接受 while(itr.hasNext()) { SelectionKey key = itr.next(); itr.remove(); if(key.isAcceptable()) { SocketChannel newConnection = serverSocketChannel.accept(); this.addClient(newConnection); } else if(key.isReadable()) { ClientHandle handle = (ClientHandle)key.attachment(); try { if(handle.handleRead() <= 0) { log.info("if handleRead < 0"); this.removeClient(handle); } } catch (Exception e) { log.error("exception",e); this.removeClient(handle); } } else if(key.isWritable()) { ClientHandle handle = (ClientHandle)key.attachment(); try { handle.handleWrite(); if(handle.hasRemaining() == false) { key.cancel(); } } catch (Exception e) { this.removeClient(handle); log.error("if handleWrite error",e); } } } } public void addWriteTask(FutureTask<Integer> future) { this.writeTasks.add(future); } public void addClient(SocketChannel socket) { ClientHandle handle = new ClientHandle(socket,this); try { socket.socket().setTcpNoDelay(run); socket.configureBlocking(false); socket.register(select, SelectionKey.OP_READ,handle); } catch (Exception e) { try { log.error("create client err",e); socket.close(); } catch (Exception err) {} } } public void removeClient(ClientHandle handle) { if(handle == null) { return ; } log.info(" remove Client "); handle.handleDisConnected(); } }
主要函数: listen();作用:基于网络事件处理接受新链接和消息的接收!
主要函数: processOutput(); 作用: 做统一的发送处理,在这篇 浅谈游戏服务器的发送数据处理 中有讲解!每个连接在发送的时候,将数据和连接封装成FutureTask,然后投递到Pollserver中的安全队列中,在这里统一将安全队列中的任务执行完毕! 如果有数据没有发送完毕,就监听写时间,直到这个链接成为可写事件(即:写缓冲区中有空闲)。
下面是ClientHandle类的代码,做每个连接的处理,比如拆包分包,代码如下
package gs.gate.handle; import gs.gate.PollServer; import java.io.IOException; import java.net.InetAddress; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.FutureTask; import org.apache.log4j.Logger; import dc.control.DCThread; import dc.util.DcTask; import door.IPlayer; import senv.server.ServerKit; import slib.net.ISession; import slib.util.ByteBuffer; public class ClientHandle implements ISession { private Logger log = Logger.getLogger(getClass()); public final static int RW_BUFFER_SIZE = 1024; private SocketChannel socket = null; private java.nio.ByteBuffer reader = java.nio.ByteBuffer.allocate(8*RW_BUFFER_SIZE); private java.nio.ByteBuffer writer = java.nio.ByteBuffer.allocate(10*RW_BUFFER_SIZE); private volatile IPlayer player = null; private PollServer poll = null; private boolean active = false; private boolean tgwProcessed = false; private String tgw = "tgw_l7_forward\r\nHost:" + ServerKit.ip+ ":" + ServerKit.port + "\r\n\r\n"; public ClientHandle(SocketChannel socket,PollServer poll) { this.socket = socket; this.writer.limit(this.writer.capacity()); this.active = true; this.poll = poll; try { this.socket.socket().setSendBufferSize(10*RW_BUFFER_SIZE); this.socket.socket().setReceiveBufferSize(8*RW_BUFFER_SIZE); this.socket.socket().setTcpNoDelay(true); this.socket.socket().setSoLinger(true, 3600); } catch (Exception e) { log.error("err",e); } } public SocketChannel getSocketChannel() { return this.socket; } public int handleWrite() throws IOException { if (!this.isActive()) { return -1; } this.writer.flip(); this.socket.write(writer); if(this.writer.hasRemaining()) { this.writer.compact(); } else { this.writer.clear(); } return 0; } public boolean hasRemaining() { return this.writer.position() > 0; } public Integer writeData(ByteBuffer data) throws IOException { if (!this.isActive()) { return -1; } if(data == null) { return 0; } this.writer.putInt(data.length()); this.writer.put(data.toByteArray(), 0, data.length()); this.writer.flip(); int result = this.socket.write(this.writer); if(this.writer.hasRemaining()) { this.writer.compact(); } else { this.writer.clear(); } return result; } @SuppressWarnings("deprecation") public int handleRead() throws IOException { if(socket == null) { return -1; } if(!this.isActive()) { return -1; } int r = this.socket.read(this.reader); if(r <= 0) { return r; } if(this.tgwProcessed == false) { //腾讯平台 你mb this.reader.flip(); byte bytes[] = new byte[tgw.length()]; this.reader.get(bytes); String vali = new String(bytes,"UTF-8"); if(vali.equals(tgw)) { log.info("tgw 校验成功"); } else { log.info("tgw 校验失败"); } this.tgwProcessed = true; this.reader.compact(); } while(true) { this.reader.flip(); ByteBuffer data = this.createBuffer(); if(data == null) { break; } this.reader.get(data.getByteArray(), data.top(), data.capacity()); this.processData(data); if(this.reader.hasRemaining()) { this.reader.compact(); } else { this.reader.clear(); break; } } return 1; } public void processData(ByteBuffer data) { if(player == null) { DcTask task = new DcTask(); task.object = this; task.data = data; DCThread.getInstance().insertTask(task); } else { player.insertData(data); } } public void handleDisConnected() { if(!this.isActive()) { return ; } if(player != null) { this.player.setSession(null); this.player.logOut(); } this.close(); this.player = null; } private ByteBuffer createBuffer() { if(reader.remaining() < 4) { return null; } int len = reader.getInt(); if(len > reader.remaining()) { this.reader.rewind(); this.reader.compact(); return null; } if (len > 0 && len <= 10 * 1024) { return new ByteBuffer(len); } return null; } @Override public void close() { setActive(false); try { log.error("socket close !"); this.socket.close(); this.socket.keyFor(this.poll.getSelector()).cancel(); } catch (Exception e) { log.error("err" , e); } } @Override public long getActiveTime() { return 0; } @Override public InetAddress getAddress() { return null; } @Override public String getCode() { return null; } @Override public int getPing() { return 0; } @Override public long getPingTime() { return 0; } @Override public int getPort() { return 0; } @Override public int getServerId() { return 0; } @Override public int getSessionId() { return 0; } @Override public Object getSource() { return this.player; } @Override public int getTimeout() { return 0; } @Override public boolean isActive() { return this.active; } @Override public void send(ByteBuffer data) { WriteTask task = new WriteTask(this,data); FutureTask<Integer> future = new FutureTask<Integer>(task); poll.addWriteTask(future); } @Override public void send(ByteBuffer arg0, ByteBuffer arg1) { } @Override public void setCode(String arg0) { } @Override public void setPing(int arg0) { } @Override public void setPingTime(long arg0) { } public void enableWriteEvent() { try { this.socket.register(this.poll.getSelector(), SelectionKey.OP_WRITE, this); } catch (Exception e) { e.printStackTrace(); } } public void shutdownWriteEvent() { } @Override public void setSource(Object arg0) { this.player = (IPlayer) arg0; } protected void setActive(boolean b) { this.active = b; } }
重要的几个函数: send(ByteBuffer data) 将发送处理包装成FutureTask,投递到PollServer中进行处理,就是PollServer::processOutput中处理
handleRead() 这里处理接受数据事件,做了拆包,将二进制数据,按照 长度-内容 的格式进行解析,拆分成一个个ByteBuffer(定义见下文)包,然后进行处理。
ClientHandle继承自ISession接口,其实这个无所谓,大家可以自己定义。我这里因为要和之前的系统兼容,所以才继承了这个。这里一不小心居然用到了适配器模式,我以为这辈子只会用到创建者模式呢? 个人还是觉得,这些设计模式还是为了解决问题用到的,而不是为了多变的需求而想太多用到的;设计模式用得多用的频繁,反而增加代码的可读性!
最后看下WriteTask的封装
package gs.gate.handle; import java.util.concurrent.Callable; import slib.util.ByteBuffer; public class WriteTask implements Callable<Integer> { private ClientHandle client = null; private ByteBuffer data = null; public WriteTask(ClientHandle handle,ByteBuffer data) { client = handle; this.data = data; } @Override public Integer call() throws Exception { return this.client.writeData(data); } }
这个就不多解析了!
缺陷1: 没有做空闲连接的处理,后来的mina库,提供了这个功能!有兴趣的同学自己写个吧!
缺陷2: 自定义的消息包,用了ByteBuffer类,和nio提供的ByteBuffer 重复!
给出自定义的ByteBuffer的处理:
package slib.util; /** * 类说明:字节缓存类,字节操作高位在前,低位在后 * * @version 1.0 * @author fxxxysh <[email protected]> */ public class ByteBuffer { /* static fields */ /** 默认的初始容量大小 */ public static final int CAPACITY = 32; /** 默认的动态数据或文字的最大长度,400k */ public static final int MAX_DATA_LENGTH = 400 * 1024; /* fields */ /** 字节数组 */ byte[] bytes; /** 字节缓存的长度 */ int top; /** 字节缓存的偏移量 */ int offset; /* constructors */ /** 按默认的大小构造一个字节缓存对象 */ public ByteBuffer() { this(CAPACITY); } /** 按指定的大小构造一个字节缓存对象 */ public ByteBuffer(int capacity) { if (capacity < 1) throw new IllegalArgumentException(getClass().getName() + " <init>, invalid capatity:" + capacity); bytes = new byte[capacity]; top = 0; offset = 0; } /** 按指定的字节数组构造一个字节缓存对象 */ public ByteBuffer(byte[] data) { if (data == null) throw new IllegalArgumentException(getClass().getName() + " <init>, null data"); bytes = data; top = data.length; offset = 0; } /** 按指定的字节数组构造一个字节缓存对象 */ public ByteBuffer(byte[] data, int index, int length) { if (data == null) throw new IllegalArgumentException(getClass().getName() + " <init>, null data"); if (index < 0 || index > data.length) throw new IllegalArgumentException(getClass().getName() + " <init>, invalid index:" + index); if (length < 0 || data.length < index + length) throw new IllegalArgumentException(getClass().getName() + " <init>, invalid length:" + length); bytes = data; top = index + length; offset = index; } /* properties */ /** 得到字节缓存的容积 */ public int capacity() { return bytes.length; } /** 设置字节缓存的容积,只能扩大容积 */ public void setCapacity(int len) { int c = bytes.length; if (len <= c) return; for (; c < len; c = (c << 1) + 1) ; byte[] temp = new byte[c]; System.arraycopy(bytes, 0, temp, 0, top); bytes = temp; } /** 得到字节缓存的长度 */ public int top() { return top; } /** 设置字节缓存的长度 */ public void setTop(int top) { if (top < offset) throw new IllegalArgumentException(this + " setTop, invalid top:" + top); if (top > bytes.length) setCapacity(top); this.top = top; } /** 得到字节缓存的偏移量 */ public int offset() { return offset; } /** 设置字节缓存的偏移量 */ public void setOffset(int offset) { if (offset < 0 || offset > top) throw new IllegalArgumentException(this + " setOffset, invalid offset:" + offset); this.offset = offset; } /** 得到字节缓存的使用长度 */ public int length() { return top - offset; } /** 得到字节缓存的字节数组,一般使用toArray()方法 */ public byte[] getByteArray() { return bytes; } /* methods */ /* byte methods */ /** 得到指定偏移位置的字节 */ public byte read(int pos) { return bytes[pos]; } /** 设置指定偏移位置的字节 */ public void write(int b, int pos) { bytes[pos] = (byte) b; } /* read methods */ /** * 按当前偏移位置读入指定的字节数组 * * @param data * 指定的字节数组 * @param pos * 指定的字节数组的起始位置 * @param len * 读入的长度 */ public void read(byte[] data, int pos, int len) { System.arraycopy(bytes, offset, data, pos, len); offset += len; } /** 读出一个布尔值 */ public boolean readBoolean() { return (bytes[offset++] != 0); } /** 读出一个字节 */ public byte readByte() { return bytes[offset++]; } /** 读出一个无符号字节 */ public int readUnsignedByte() { return bytes[offset++] & 0xff; } /** 读出一个字符 */ public char readChar() { return (char) readUnsignedShort(); } /** 读出一个短整型数值 */ public short readShort() { return (short) readUnsignedShort(); } /** 读出一个无符号的短整型数值 */ public int readUnsignedShort() { int pos = offset; offset += 2; return (bytes[pos + 1] & 0xff) + ((bytes[pos] & 0xff) << 8); } /** 读出一个整型数值 */ public int readInt() { int pos = offset; offset += 4; return (bytes[pos + 3] & 0xff) + ((bytes[pos + 2] & 0xff) << 8) + ((bytes[pos + 1] & 0xff) << 16) + ((bytes[pos] & 0xff) << 24); } /** 读出一个浮点数值 */ public float readFloat() { return Float.intBitsToFloat(readInt()); } /** 读出一个长整型数值 */ public long readLong() { int pos = offset; offset += 8; return (bytes[pos + 7] & 0xffL) + ((bytes[pos + 6] & 0xffL) << 8) + ((bytes[pos + 5] & 0xffL) << 16) + ((bytes[pos + 4] & 0xffL) << 24) + ((bytes[pos + 3] & 0xffL) << 32) + ((bytes[pos + 2] & 0xffL) << 40) + ((bytes[pos + 1] & 0xffL) << 48) + ((bytes[pos] & 0xffL) << 56); } /** 读出一个双浮点数值 */ public double readDouble() { return Double.longBitsToDouble(readLong()); } /** * 读出动态长度, 数据大小采用动态长度,整数类型下,最大为512M 1xxx,xxxx表示(0~0x80) 0~128B * 01xx,xxxx,xxxx,xxxx表示(0~0x4000) 0~16K * 001x,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx,xxxx表示(0~0x20000000) 0~512M */ public int readLength() { int n = bytes[offset] & 0xff; if (n >= 0x80) { offset++; return n - 0x80; } else if (n >= 0x40) return readUnsignedShort() - 0x4000; else if (n >= 0x20) return readInt() - 0x20000000; else throw new IllegalArgumentException(this + " readLength, invalid number:" + n); } /** 读出一个指定长度的字节数组,可以为null */ public byte[] readData() { int len = readLength() - 1; if (len < 0) return null; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readData, data overflow:" + len); byte[] data = new byte[len]; read(data, 0, len); return data; } /** 读出一个短字节数组,长度不超过254 */ public byte[] readShortData() { int len = readUnsignedByte(); if (len == 255) return null; byte[] data = new byte[len]; if (len != 0) read(data, 0, len); return data; } /** 读出一个指定长度的字符串 */ public String readString(int len) { byte[] data = new byte[len]; if (len == 0) return ""; read(data, 0, len); return new String(data); } /** 读出一个短字符串,长度不超过254 */ public String readShortString() { int len = readUnsignedByte(); if (len == 255) return null; return readString(len); } /** 读出一个字符串,长度不超过65534 */ public String readString() { int len = readUnsignedShort(); if (len == 65535) return null; return readString(len); } /** 读出一个指定长度和编码类型的字符串 */ public String readUTF(String charsetName) { int len = readLength() - 1; if (len < 0) return null; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readUTF, data overflow:" + len); byte[] data = new byte[len]; read(data, 0, len); if (charsetName == null) return new String(data); try { return new String(data, charsetName); } catch (Exception e) { throw new IllegalArgumentException(this + " readUTF, invalid charsetName:" + charsetName); } } /** 读出一个指定长度的utf字符串 */ public String readUTF() { int len = readLength() - 1; if (len < 0) return null; if (len == 0) return ""; if (len > MAX_DATA_LENGTH) throw new IllegalArgumentException(this + " readUTF, data overflow:" + len); StringBuffer sb = new StringBuffer(len); int pos = ByteKit.readUTF(bytes, offset, len, sb); if (pos > 0) throw new IllegalArgumentException(this + " readUTF, format err, len=" + len + ", pos:" + pos); offset += len; return sb.toString(); } /* write methods */ /** * 写入指定字节数组 * * @param data * 指定的字节数组 * @param pos * 指定的字节数组的起始位置 * @param len * 写入的长度 */ public void write(byte[] data, int pos, int len) { if (bytes.length < top + len) setCapacity(top + len); System.arraycopy(data, pos, bytes, top, len); top += len; } /** 写入一个布尔值 */ public void writeBoolean(boolean b) { if (bytes.length < top + 1) setCapacity(top + CAPACITY); bytes[top++] = (byte) (b ? 1 : 0); } /** 写入一个字节 */ public void writeByte(int b) { if (bytes.length < top + 1) setCapacity(top + CAPACITY); bytes[top++] = (byte) b; } /** 写入一个字符 */ public void writeChar(int c) { writeShort(c); } /** 写入一个短整型数值 */ public void writeShort(int s) { int pos = top; if (bytes.length < pos + 2) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (s >>> 8); bytes[pos + 1] = (byte) s; top += 2; } /** 在指定位置写入一个短整型数值,length不变 */ public void writeShort(int s, int pos) { if (bytes.length < pos + 2) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (s >>> 8); bytes[pos + 1] = (byte) s; } /** 写入一个整型数值 */ public void writeInt(int i) { int pos = top; if (bytes.length < pos + 4) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (i >>> 24); bytes[pos + 1] = (byte) (i >>> 16); bytes[pos + 2] = (byte) (i >>> 8); bytes[pos + 3] = (byte) i; top += 4; } /** 在指定位置写入一个整型数值,length不变 */ public void writeInt(int i, int pos) { if (bytes.length < pos + 4) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (i >>> 24); bytes[pos + 1] = (byte) (i >>> 16); bytes[pos + 2] = (byte) (i >>> 8); bytes[pos + 3] = (byte) i; } /** 写入一个浮点数值 */ public void writeFloat(float f) { writeInt(Float.floatToIntBits(f)); } /** 写入一个长整型数值 */ public void writeLong(long l) { int pos = top; if (bytes.length < pos + 8) setCapacity(pos + CAPACITY); bytes[pos] = (byte) (l >>> 56); bytes[pos + 1] = (byte) (l >>> 48); bytes[pos + 2] = (byte) (l >>> 40); bytes[pos + 3] = (byte) (l >>> 32); bytes[pos + 4] = (byte) (l >>> 24); bytes[pos + 5] = (byte) (l >>> 16); bytes[pos + 6] = (byte) (l >>> 8); bytes[pos + 7] = (byte) l; top += 8; } /** 写入一个双浮点数值 */ public void writeDouble(double d) { writeLong(Double.doubleToLongBits(d)); } /** 写入动态长度 */ public void writeLength(int len) { if (len >= 0x20000000 || len < 0) throw new IllegalArgumentException(this + " writeLength, invalid len:" + len); if (len >= 0x4000) writeInt(len + 0x20000000); else if (len >= 0x80) writeShort(len + 0x4000); else writeByte(len + 0x80); } /** 写入一个字节数组,可以为null */ public void writeData(byte[] data) { writeData(data, 0, (data != null) ? data.length : 0); } /** 写入一个字节数组,可以为null */ public void writeData(byte[] data, int pos, int len) { if (data == null) { writeLength(0); return; } writeLength(len + 1); write(data, pos, len); } /** 写入一个字符串,可以为null */ public void writeString(String s) { if (s != null) { byte[] temp = s.getBytes(); if (temp.length > 65534) throw new IllegalArgumentException(getClass().getName() + " writeString, invalid s:" + s); writeShort(temp.length); if (temp.length != 0) write(temp, 0, temp.length); } else writeShort(65535); } /** 写入一个字符串,以指定的字符进行编码 */ public void writeUTF(String str, String charsetName) { if (str == null) { writeLength(0); return; } byte[] data; if (charsetName != null) { try { data = str.getBytes(charsetName); } catch (Exception e) { throw new IllegalArgumentException(this + " writeUTF, invalid charsetName:" + charsetName); } } else data = str.getBytes(); writeLength(data.length + 1); write(data, 0, data.length); } /** 写入一个utf字符串,可以为null */ public void writeUTF(String str) { writeUTF(str, 0, (str != null) ? str.length() : 0); } /** 写入一个utf字符串中指定的部分,可以为null */ public void writeUTF(String str, int index, int length) { if (str == null) { writeLength(0); return; } int len = ByteKit.getUTFLength(str, index, length); writeLength(len + 1); int pos = top; if (bytes.length < pos + len) setCapacity(pos + len); ByteKit.writeUTF(str, index, length, bytes, pos); top += len; } /** 检查是否为相同类型的实例 */ public boolean checkClass(Object obj) { return (obj instanceof ByteBuffer); } /** 在指定位置写入一个字节,length不变 */ public void writeByte(int b, int pos) { if (bytes.length < pos + 1) setCapacity(pos + CAPACITY); bytes[pos] = (byte) b; } /** 得到字节缓存当前长度的字节数组 */ public byte[] toByteArray() { byte[] data = new byte[top - offset]; System.arraycopy(bytes, offset, data, 0, data.length); return data; } /** 清除字节缓存对象 */ public void clear() { top = 0; offset = 0; } /* common methods */ public int hashCode() { int hash = 17; for (int i = top - 1; i >= 0; i--) hash = 65537 * hash + bytes[i]; return hash; } public boolean equals(Object obj) { if (this == obj) return true; if (!checkClass(obj)) return false; ByteBuffer bb = (ByteBuffer) obj; if (bb.top != top) return false; if (bb.offset != offset) return false; for (int i = top - 1; i >= 0; i--) { if (bb.bytes[i] != bytes[i]) return false; } return true; } public String toString() { return super.toString() + "[" + top + "," + offset + "," + bytes.length + "] "; } }
相应的ByteKit类代码:
/** * Copyright 2001 by seasky <www.seasky.cn>. */ package slib.util; /** * 类说明: 字节及字节数组的方法操作库 * * @version 1.0 * @author zminleo <[email protected]> */ public final class ByteKit { /* static fields */ /** 库信息 */ public static final String toString=ByteKit.class.getName(); /* static methods */ /** 在字节数组中指定位置读出一个布尔值 */ public static boolean readBoolean(byte[] bytes,int pos) { return bytes[pos]!=0; } /** 在字节数组中指定位置读出一个字节 */ public static byte readByte(byte[] bytes,int pos) { return bytes[pos]; } /** 在字节数组中指定位置读出一个无符号字节 */ public static int readUnsignedByte(byte[] bytes,int pos) { return bytes[pos]&0xff; } /** 在字节数组中指定位置读出一个字符 */ public static char readChar(byte[] bytes,int pos) { return (char)readUnsignedShort(bytes,pos); } /** 在字节数组中指定位置读出一个字符,低位在前,高位在后 */ public static char readChar_(byte[] bytes,int pos) { return (char)readUnsignedShort_(bytes,pos); } /** 在字节数组中指定位置读出一个短整型数值 */ public static short readShort(byte[] bytes,int pos) { return (short)readUnsignedShort(bytes,pos); } /** 在字节数组中指定位置读出一个短整型数值,低位在前,高位在后 */ public static short readShort_(byte[] bytes,int pos) { return (short)readUnsignedShort_(bytes,pos); } /** 在字节数组中指定位置读出一个无符号短整型数值 */ public static int readUnsignedShort(byte[] bytes,int pos) { return (bytes[pos+1]&0xff)+((bytes[pos]&0xff)<<8); } /** 在字节数组中指定位置读出一个无符号短整型数值,低位在前,高位在后 */ public static int readUnsignedShort_(byte[] bytes,int pos) { return ((bytes[pos+1]&0xff)<<8)+(bytes[pos]&0xff); } /** 在字节数组中指定位置读出一个整型数值 */ public static int readInt(byte[] bytes,int pos) { return ((bytes[pos+3]&0xff))+((bytes[pos+2]&0xff)<<8) +((bytes[pos+1]&0xff)<<16)+((bytes[pos]&0xff)<<24); } /** 在字节数组中指定位置读出一个整型数值,低位在前,高位在后 */ public static int readInt_(byte[] bytes,int pos) { return ((bytes[pos+3]&0xff)<<24)+((bytes[pos+2]&0xff)<<16) +((bytes[pos+1]&0xff)<<8)+((bytes[pos]&0xff)); } /** 在字节数组中指定位置读出一个浮点数值 */ public static float readFloat(byte[] bytes,int pos) { return Float.intBitsToFloat(readInt(bytes,pos)); } /** 在字节数组中指定位置读出一个浮点数值,低位在前,高位在后 */ public static float readFloat_(byte[] bytes,int pos) { return Float.intBitsToFloat(readInt_(bytes,pos)); } /** 在字节数组中指定位置读出一个长整型数值 */ public static long readLong(byte[] bytes,int pos) { return (bytes[pos+7]&0xffL)+((bytes[pos+6]&0xffL)<<8) +((bytes[pos+5]&0xffL)<<16)+((bytes[pos+4]&0xffL)<<24) +((bytes[pos+3]&0xffL)<<32)+((bytes[pos+2]&0xffL)<<40) +((bytes[pos+1]&0xffL)<<48)+((bytes[pos]&0xffL)<<56); } /** 在字节数组中指定位置读出一个长整型数值,低位在前,高位在后 */ public static long readLong_(byte[] bytes,int pos) { return ((bytes[pos+7]&0xffL)<<56)+((bytes[pos+6]&0xffL)<<48) +((bytes[pos+5]&0xffL)<<40)+((bytes[pos+4]&0xffL)<<32) +((bytes[pos+3]&0xffL)<<24)+((bytes[pos+2]&0xffL)<<16) +((bytes[pos+1]&0xffL)<<8)+(bytes[pos]&0xffL); } /** 在字节数组中指定位置读出一个双浮点数值 */ public static double readDouble(byte[] bytes,int pos) { return Double.longBitsToDouble(readLong(bytes,pos)); } /** 在字节数组中指定位置读出一个双浮点数值,低位在前,高位在后 */ public static double readDouble_(byte[] bytes,int pos) { return Double.longBitsToDouble(readLong_(bytes,pos)); } /** 写入一个布尔值在字节数组中指定位置 */ public static void writeBoolean(boolean b,byte[] bytes,int pos) { bytes[pos]=(byte)(b?1:0); } /** 写入一个字节在字节数组中指定位置 */ public static void writeByte(int b,byte[] bytes,int pos) { bytes[pos]=(byte)b; } /** 在字节数组中指定位置写入一个字符 */ public static void writeChar(int c,byte[] bytes,int pos) { writeShort(c,bytes,pos); } /** 写入一个字符在字节数组中指定位置,低位在前,高位在后 */ public static void writeChar_(int c,byte[] bytes,int pos) { writeShort_(c,bytes,pos); } /** 写入一个短整型数值在字节数组中指定位置 */ public static void writeShort(int s,byte[] bytes,int pos) { bytes[pos]=(byte)(s>>>8); bytes[pos+1]=(byte)s; } /** 写入一个短整型数值在字节数组中指定位置,低位在前,高位在后 */ public static void writeShort_(int s,byte[] bytes,int pos) { bytes[pos]=(byte)s; bytes[pos+1]=(byte)(s>>>8); } /** 写入一个整型数值在字节数组中指定位置 */ public static void writeInt(int i,byte[] bytes,int pos) { bytes[pos]=(byte)(i>>>24); bytes[pos+1]=(byte)(i>>>16); bytes[pos+2]=(byte)(i>>>8); bytes[pos+3]=(byte)i; } /** 在字节数组中指定位置写入一个整型数值,低位在前,高位在后 */ public static void writeInt_(int i,byte[] bytes,int pos) { bytes[pos]=(byte)i; bytes[pos+1]=(byte)(i>>>8); bytes[pos+2]=(byte)(i>>>16); bytes[pos+3]=(byte)(i>>>24); } /** 写入一个浮点数值在字节数组中指定位置 */ public static void writeFloat(float f,byte[] bytes,int pos) { writeInt(Float.floatToIntBits(f),bytes,pos); } /** 写入一个浮点数值在字节数组中指定位置,低位在前,高位在后 */ public static void writeFloat_(float f,byte[] bytes,int pos) { writeInt_(Float.floatToIntBits(f),bytes,pos); } /** 写入一个长整型数值在字节数组中指定位置 */ public static void writeLong(long l,byte[] bytes,int pos) { bytes[pos]=(byte)(l>>>56); bytes[pos+1]=(byte)(l>>>48); bytes[pos+2]=(byte)(l>>>40); bytes[pos+3]=(byte)(l>>>32); bytes[pos+4]=(byte)(l>>>24); bytes[pos+5]=(byte)(l>>>16); bytes[pos+6]=(byte)(l>>>8); bytes[pos+7]=(byte)l; } /** 写入一个长整型数值在字节数组中指定位置,低位在前,高位在后 */ public static void writeLong_(long l,byte[] bytes,int pos) { bytes[pos]=(byte)l; bytes[pos+1]=(byte)(l>>>8); bytes[pos+2]=(byte)(l>>>16); bytes[pos+3]=(byte)(l>>>24); bytes[pos+4]=(byte)(l>>>32); bytes[pos+5]=(byte)(l>>>40); bytes[pos+6]=(byte)(l>>>48); bytes[pos+7]=(byte)(l>>>56); } /** 写入一个双浮点数值在字节数组中指定位置 */ public static void writeDouble(double d,byte[] bytes,int pos) { writeLong(Double.doubleToLongBits(d),bytes,pos); } /** 写入一个双浮点数值在字节数组中指定位置,低位在前,高位在后 */ public static void writeDouble_(double d,byte[] bytes,int pos) { writeLong_(Double.doubleToLongBits(d),bytes,pos); } /** 将指定的字节数据转换为ISO-8859-1格式的字符串 */ public static String readISO8859_1(byte[] data) { return readISO8859_1(data,0,data.length); } /** 将指定的字节数据转换为ISO-8859-1格式的字符串 */ public static String readISO8859_1(byte[] data,int pos,int len) { char[] array=new char[len]; for(int i=pos+len-1,j=array.length-1;i>=pos;i--,j--) array[j]=(char)data[i]; return new String(array); } /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */ public static byte[] writeISO8859_1(String str) { return writeISO8859_1(str,0,str.length()); } /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */ public static byte[] writeISO8859_1(String str,int index,int len) { byte[] data=new byte[len]; writeISO8859_1(str,index,len,data,0); return data; } /** 将指定的字符串转换为ISO-8859-1格式的字节数据 */ public static void writeISO8859_1(String str,int index,int len, byte[] data,int pos) { int c; for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--) { c=str.charAt(i); data[j]=(c>256)?63:(byte)c; } } /** 将指定的字符数组转换为ISO-8859-1格式的字节数据 */ public static void writeISO8859_1(char[] chars,int index,int len, byte[] data,int pos) { int c; for(int i=index+len-1,j=pos+len-1;i>=index;i--,j--) { c=chars[i]; data[j]=(c>256)?63:(byte)c; } } /** 将指定的UTF8格式的字节数据转换为字符串,返回null表示失败 */ public static String readUTF(byte[] data) { StringBuffer sb=new StringBuffer(data.length); int pos=readUTF(data,0,data.length,sb); return (pos==0)?sb.toString():null; } /** * 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置 */ public static int readUTF(byte[] data,StringBuffer sb) { return readUTF(data,0,data.length,sb); } /** * 将指定的UTF8格式的字节数据转换为字符串, 返回0表示成功,否则表示失败位置 */ public static int readUTF(byte[] data,int pos,int len,StringBuffer sb) { int i,c,cc,ccc; int end=pos+len; while(pos<end) { c=data[pos]&0xff; i=c>>4; if(i<8) { // 0xxx xxxx pos++; sb.append((char)c); } else if(i==12||i==13) { // 110x xxxx 10xx xxxx pos+=2; if(pos>end) return pos; cc=data[pos-1]; if((cc&0xC0)!=0x80) return pos; sb.append((char)(((c&0x1f)<<6)|(cc&0x3f))); } else if(i==14) { // 1110 xxxx 10xx xxxx 10xx // xxxx pos+=3; if(pos>end) return pos; cc=data[pos-2]; ccc=data[pos-1]; if(((cc&0xC0)!=0x80)||((ccc&0xC0)!=0x80)) return pos; sb.append((char)(((c&0x0f)<<12)|((cc&0x3f)<<6)|(ccc&0x3f))); } else // 10xx xxxx 1111 xxxx return pos; } return 0; } /** 获得指定的字符串转换为UTF8格式的字节数据的长度 */ public static int getUTFLength(String str,int index,int len) { int utfLen=0; int c; for(int i=index;i<len;i++) { c=str.charAt(i); if((c>=0x0001)&&(c<=0x007f)) utfLen++; else if(c>0x07ff) utfLen+=3; else utfLen+=2; } return utfLen; } /** 在字节数组中指定位置写入一个短整型数值 */ public static void writeShort(byte[] bytes,int pos,int s) { bytes[pos]=(byte)(s>>>8); bytes[pos+1]=(byte)s; } /** 在字节数组中指定位置写入一个字节 */ public static void writeByte(byte[] bytes,int pos,int b) { bytes[pos]=(byte)b; } /** 获得指定的字符数组转换为UTF8格式的字节数据的长度 */ public static int getUTFLength(char[] chars,int index,int len) { int utfLen=0; int c; for(int i=index;i<len;i++) { c=chars[i]; if((c>=0x0001)&&(c<=0x007f)) utfLen++; else if(c>0x07ff) utfLen+=3; else utfLen+=2; } return utfLen; } /** 将指定的字符串转换为UTF8格式的字节数据 */ public static byte[] writeUTF(String str) { return writeUTF(str,0,str.length()); } /** 将指定的字符串转换为UTF8格式的字节数据 */ public static byte[] writeUTF(String str,int index,int len) { byte[] data=new byte[getUTFLength(str,index,len)]; writeUTF(str,index,len,data,0); return data; } /** 将指定的字符串转换为UTF8格式的字节数据 */ public static void writeUTF(String str,int index,int len,byte[] data, int pos) { int c; for(int i=index;i<len;i++) { c=str.charAt(i); if((c>=0x0001)&&(c<=0x007f)) { data[pos++]=(byte)c; } else if(c>0x07ff) { data[pos++]=(byte)(0xe0|((c>>12)&0x0f)); data[pos++]=(byte)(0x80|((c>>6)&0x3f)); data[pos++]=(byte)(0x80|(c&0x3f)); } else { data[pos++]=(byte)(0xc0|((c>>6)&0x1f)); data[pos++]=(byte)(0x80|(c&0x3f)); } } } /** 将指定的字符数组转换为UTF8格式的字节数据 */ public static void writeUTF(char[] chars,int index,int len,byte[] data, int pos) { int c; for(int i=index;i<len;i++) { c=chars[i]; if((c>=0x0001)&&(c<=0x007f)) { data[pos++]=(byte)c; } else if(c>0x07ff) { data[pos++]=(byte)(0xe0|((c>>12)&0x0f)); data[pos++]=(byte)(0x80|((c>>6)&0x3f)); data[pos++]=(byte)(0x80|(c&0x3f)); } else { data[pos++]=(byte)(0xc0|((c>>6)&0x1f)); data[pos++]=(byte)(0x80|(c&0x3f)); } } } /* constructors */ private ByteKit() { } }
使用方法:
for(Map.Entry<String, Map<String, String>> entry : gateConfig.entrySet()) { String host = entry.getValue().get("host"); String port = entry.getValue().get("port"); PollServer poll = new PollServer(host,Integer.parseInt(port)); Thread gateThread = new Thread(poll); gateThread.setName(entry.getKey()); gateThread.start(); }
可能调整成mina库,还有其他的一个原因,就是在服务器端会无辜收到一个rst标识导致服务器断开。起初以为是代码问题,后来经过很长时间的排查和咨询,发现服务器用的是南方电信的网络,而一些北方网通的客户端在访问的时候,就会随机出现rst 连接复位现象!查询了好久,最后还是运维的大哥给的思维!当然在起初解决这个问题的时候,我还是本着代码的问题;顺便还去专门研究了Tcp/ip协议详解,翻出了大学里面学的计算机网络这本书。无论如何解决了就好!欢迎拍砖!