SocketChannel 例子(转)

Socket通信比较常见的问题有如下几种: 
1、设置收发超时; 
2、正确的每一个bit的收发; 
3、物理线路故障的保护; 
4、始终能正常工作; 
5、尽量少占系统资源; 
n、…… 
而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是:通信。 
为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案。 
本方案在正常的局域网连接中测试过几百万次没什么问题。缺乏更艰苦的环境,所以如果使用这些代码发生任何风险的话…… 
(TcpChannel代码为Brian Wellington所做,原名为TCPClient,经本人稍作改动)

Java代码

  1. // Copyright (c) 2005 Brian Wellington ([email protected])
  2. package asynchronizedchannel;
  3. import java.io.*;
  4. import java.net.*;
  5. import java.nio.*;
  6. import java.nio.channels.*;
  7. final class TcpChannel
  8. {
  9. private long endTime;
  10. private SelectionKey key;
  11. public TcpChannel(SelectableChannel channel, long endTime, int op) throws IOException
  12. {
  13. boolean done = false;
  14. Selector selector = null;
  15. this.endTime = endTime;
  16. try {
  17. selector = Selector.open();
  18. channel.configureBlocking(false);
  19. key = channel.register(selector, op);
  20. done = true;
  21. } finally {
  22. if (!done && selector != null) {
  23. selector.close();
  24. }
  25. if (!done) {
  26. channel.close();
  27. }
  28. }
  29. }
  30. static void blockUntil(SelectionKey key, long endTime) throws IOException
  31. {
  32. long timeout = endTime - System.currentTimeMillis();
  33. int nkeys = 0;
  34. if (timeout > 0) {
  35. nkeys = key.selector().select(timeout);
  36. } else if (timeout == 0) {
  37. nkeys = key.selector().selectNow();
  38. }
  39. if (nkeys == 0) {
  40. throw new SocketTimeoutException();
  41. }
  42. }
  43. void cleanup()
  44. {
  45. try {
  46. key.selector().close();
  47. key.channel().close();
  48. } catch (IOException ex) {
  49. ex.printStackTrace();
  50. }
  51. }
  52. void bind(SocketAddress addr) throws IOException
  53. {
  54. SocketChannel channel = (SocketChannel) key.channel();
  55. channel.socket().bind(addr);
  56. }
  57. void connect(SocketAddress addr) throws IOException
  58. {
  59. SocketChannel channel = (SocketChannel) key.channel();
  60. if (channel.connect(addr))
  61. return;
  62. key.interestOps(SelectionKey.OP_CONNECT);
  63. try {
  64. while (!channel.finishConnect()) {
  65. if (!key.isConnectable()) {
  66. blockUntil(key, endTime);
  67. }
  68. }
  69. } finally {
  70. if (key.isValid()) {
  71. key.interestOps(0);
  72. }
  73. }
  74. }
  75. void send(ByteBuffer buffer) throws IOException
  76. {
  77. Send.operate(key, buffer, endTime);
  78. }
  79. void recv(ByteBuffer buffer) throws IOException
  80. {
  81. Recv.operate(key, buffer, endTime);
  82. }
  83. }
  84. interface Operator
  85. {
  86. class Operation
  87. {
  88. static void operate(final int op, final SelectionKey key, final ByteBuffer buffer, final long endTime, final Operator optr) throws IOException
  89. {
  90. final SocketChannel channel = (SocketChannel) key.channel();
  91. final int total = buffer.capacity();
  92. key.interestOps(op);
  93. try {
  94. while (buffer.position() < total) {
  95. if (System.currentTimeMillis() > endTime) {
  96. throw new SocketTimeoutException();
  97. }
  98. if ((key.readyOps() & op) != 0) {
  99. if (optr.io(channel, buffer) < 0) {
  100. throw new EOFException();
  101. }
  102. } else {
  103. TcpChannel.blockUntil(key, endTime);
  104. }
  105. }
  106. } finally {
  107. if (key.isValid()) {
  108. key.interestOps(0);
  109. }
  110. }
  111. }
  112. }
  113. int io(SocketChannel channel, ByteBuffer buffer) throws IOException;
  114. }
  115. class Send implements Operator
  116. {
  117. public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
  118. {
  119. return channel.write(buffer);
  120. }
  121. public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
  122. {
  123. Operation.operate(SelectionKey.OP_WRITE, key, buffer, endTime, operator);
  124. }
  125. public static final Send operator = new Send();
  126. }
  127. class Recv implements Operator
  128. {
  129. public int io(SocketChannel channel, ByteBuffer buffer) throws IOException
  130. {
  131. return channel.read(buffer);
  132. }
  133. public static final void operate(final SelectionKey key, final ByteBuffer buffer, final long endTime) throws IOException
  134. {
  135. Operation.operate(SelectionKey.OP_READ, key, buffer, endTime, operator);
  136. }
  137. public static final Recv operator = new Recv();
  138. }

使用演示见以下代码。 
大致说明一下,Server端开5656侦听,Client端开若干线程测试Socket通信。每次发送240字节信息+16字节MD5校验。服务端收到信息之后做MD5检查,正确的,发送“.xxxx”表示认可,否则发送“?xxxx”表示故障。 
正式应用中可以再设置tryout尝试n次。 
Server端,代码演示:

Java代码

  1. package asynchronizedchannel;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.security.MessageDigest;
  10. import java.util.Iterator;
  11. public class Server
  12. {
  13. /**
  14. * 服务端通信范例程序主函数
  15. *
  16. * @param args
  17. * @throws IOException
  18. */
  19. public static void main(String[] args) throws IOException
  20. {
  21. // Create the selector
  22. final Selector selector = Selector.open();
  23. final ServerSocketChannel server = ServerSocketChannel.open();
  24. server.configureBlocking(false);
  25. server.socket().bind(new InetSocketAddress("xx.xx.xx.xx", 5656), 5);
  26. // Register both channels with selector
  27. server.register(selector, SelectionKey.OP_ACCEPT);
  28. new Thread(new Daemon(selector)).start();
  29. }
  30. }
  31. class Daemon implements Runnable
  32. {
  33. private final Selector selector;
  34. Daemon(Selector selector)
  35. {
  36. this.selector = selector;
  37. }
  38. public void run()
  39. {
  40. while (true) {
  41. try {
  42. // Wait for an event
  43. selector.select();
  44. // Get list of selection keys with pending events
  45. Iterator<SelectionKey> it = selector.selectedKeys().iterator();
  46. // Process each key
  47. while (it.hasNext()) {
  48. // Get the selection key
  49. SelectionKey selKey = it.next();
  50. // Remove it from the list to indicate that it is being processed
  51. it.remove();
  52. // Check if it‘s a connection request
  53. if (selKey.isAcceptable()) {
  54. // Get channel with connection request
  55. ServerSocketChannel server = (ServerSocketChannel) selKey.channel();
  56. // Accept the connection request.
  57. // If serverSocketChannel is blocking, this method blocks.
  58. // The returned channel is in blocking mode.
  59. SocketChannel channel = server.accept();
  60. // If serverSocketChannel is non-blocking, sChannel may be null
  61. if (channel != null) {
  62. // Use the socket channel to communicate with the client
  63. new Thread(new ServerHandler(channel)).start();
  64. } else {
  65. System.out.println("---No Connection---");
  66. // There were no pending connection requests; try again later.
  67. // To be notified of connection requests,
  68. }
  69. }
  70. }
  71. } catch (Exception ex) {
  72. ex.printStackTrace();
  73. }
  74. }
  75. }
  76. }
  77. class ServerHandler implements Runnable
  78. {
  79. private static final long timeout = 30 * 1000; // 设置超时时间为30秒
  80. private static int counter = 0;
  81. private final TcpChannel channel;
  82. private final MessageDigest md;
  83. ServerHandler(SocketChannel channel) throws Exception
  84. {
  85. this.channel = new TcpChannel(channel, System.currentTimeMillis() + timeout, SelectionKey.OP_READ);
  86. md = MessageDigest.getInstance("md5");
  87. }
  88. public void run()
  89. {
  90. try {
  91. while (true) {
  92. work();
  93. synchronized (ServerHandler.class) {
  94. if ((++counter & 65535) == 0) {
  95. System.out.println(counter);
  96. }
  97. }
  98. }
  99. } catch (Exception e) {
  100. e.printStackTrace();
  101. } finally {
  102. channel.cleanup();
  103. }
  104. }
  105. private void work() throws IOException
  106. { // 模拟工作流程
  107. byte[] cache = new byte[256], reply = new byte[5];
  108. read(cache, reply);
  109. }
  110. private void read(byte[] cache, byte[] reply) throws IOException
  111. { // 从套接字读入数据
  112. channel.recv(ByteBuffer.wrap(cache));
  113. md.reset();
  114. md.update(cache, 0, 240);
  115. byte[] md5 = md.digest(); // 使用前240字节产生MD5校验码
  116. if (!ExtArrays.partialEquals(md5, 0, cache, 240, 16)) { // 与后16字节比较
  117. reply[0] = ‘?‘;
  118. System.out.println("MISMATCH!");
  119. } else {
  120. reply[0] = ‘.‘;
  121. }
  122. channel.send(ByteBuffer.wrap(reply)); // 返回接收结果
  123. }
  124. }
  125. final class ExtArrays
  126. {
  127. private ExtArrays()
  128. {
  129. }
  130. public static boolean partialEquals(byte[] a, int offset_a, byte[] b, int offset_b, int len)
  131. { // 字节数组的部分比较
  132. if (a == null || b == null) {
  133. return false;
  134. }
  135. if (offset_a + len > a.length || offset_b + len > b.length) {
  136. return false;
  137. }
  138. for (int i = offset_a, j = offset_b, k = len; k > 0; i++, j++, k--) {
  139. if (a[i] != b[j]) {
  140. return false;
  141. }
  142. }
  143. return true;
  144. }
  145. }

Client端,代码演示:

Java代码

  1. package asynchronizedchannel;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.SocketChannel;
  7. import java.security.DigestException;
  8. import java.security.MessageDigest;
  9. import java.util.Random;
  10. public class Client
  11. {
  12. private static int id = 0;
  13. /**
  14. * 客户端通信范例程序主函数
  15. *
  16. * @param args
  17. * @throws Exception
  18. */
  19. public static void main(String[] args) throws Exception
  20. {
  21. new Thread(new ClientHandler(id++)).start();
  22. new Thread(new ClientHandler(id++)).start();
  23. new Thread(new ClientHandler(id++)).start();
  24. new Thread(new ClientHandler(id++)).start();
  25. new Thread(new ClientHandler(id++)).start();
  26. }
  27. }
  28. class ClientHandler implements Runnable
  29. {
  30. private static final long timeout = 30 * 1000; // 设置超时时间为30秒
  31. private final TcpChannel channel;
  32. private final int id;
  33. private final MessageDigest md;
  34. private final Random rand;
  35. ClientHandler(int id) throws Exception
  36. {
  37. this.id = id;
  38. channel = new TcpChannel(SocketChannel.open(), System.currentTimeMillis() + timeout, SelectionKey.OP_WRITE);
  39. md = MessageDigest.getInstance("md5");
  40. rand = new Random();
  41. }
  42. @Override
  43. public void run()
  44. {
  45. try {
  46. channel.connect(new InetSocketAddress("xx.xx.xx.xx", 5656));
  47. int i = 0;
  48. while (true) {
  49. work();
  50. if ((++i & 16383) == 0) {
  51. System.out.println(String.format("client(%1$d): %2$d", id, i));
  52. }
  53. Thread.yield();
  54. }
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. } finally {
  58. channel.cleanup();
  59. }
  60. }
  61. private void work() throws IOException, DigestException
  62. {
  63. byte[] cache = new byte[256], reply = new byte[5];
  64. write(cache, reply);
  65. }
  66. private void write(byte[] cache, byte[] reply) throws DigestException, IOException
  67. {
  68. rand.nextBytes(cache); // 只用前面的240字节
  69. md.reset();
  70. md.update(cache, 0, 240);
  71. md.digest(cache, 240, 16); // MD5校验码占后面16字节
  72. ByteBuffer buffer = ByteBuffer.wrap(cache);
  73. channel.send(buffer);
  74. buffer = ByteBuffer.wrap(reply);
  75. channel.recv(buffer);
  76. if (reply[0] != ‘.‘) { // 若接收的结果不正确,可以考虑尝试再次发送
  77. System.out.println("MISMATCH!");
  78. }
  79. }
  80. }

重点说明:

发多少,收多少。要么固定发送和接收的字节数,要么在发送的时候带有发送字节数的信息,接收的时候根据该信息接收完整然后再处理。

时间: 2024-11-05 18:29:17

SocketChannel 例子(转)的相关文章

5. 彤哥说netty系列之Java NIO核心组件之Channel

你好,我是彤哥,本篇是netty系列的第五篇. 简介 上一章我们一起学习了如何使用Java原生NIO实现群聊系统,这章我们一起来看看Java NIO的核心组件之一--Channel. 思维转变 首先,我想说的最重要的一个点是,学习NIO思维一定要从BIO那种一个连接一个线程的模式转变成多个连接(Channel)共用一个线程来处理的这种思维. 1个Connection = 1个Socket = 1个Channel,这几个概念可以看作是等价的,都表示一个连接,只不过是用在不同的场景中. 如果单从阻塞

Java NIO SocketChannel客户端例子(支持连接失败后自动重连)

这两天想找找标题里说的这个示例代码,发现网上这么多教程,连怎么样实现自动重连都不讲,所以把自己写的例子贴上来.仅仅使用递归,不使用多线程,就可以实现初步的目的: import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; impor

j.一个NIO与SSLEngine结合的例子

对于BIO通道的程序来讲,建立起SSLServerSocket之后,后续的工作就和普通的ServerSocket没有什么区别了,这是因为JDK中通过JSSE的API,封装了SSL通道的实现逻辑,否则,类似于C程序员如果想要编写一个https的加密程序,那他基本得累个半死,所以,我们应该感谢JAVA. 对于NIO通道来讲,我们一贯的思维也是存在一个SSLServerSocketChannel,然后注册到selector中,后续的操作也和普通的ServerSocketChannel没什么区别了,但是

Java NIO系列教程(八) SocketChannel

Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道.可以通过以下2种方式创建SocketChannel: 打开一个SocketChannel并连接到互联网上的某台服务器. 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel. 打开 SocketChannel 下面是SocketChannel的打开方式: 1 SocketChannel socketChannel = SocketChannel.open(); 2 socke

Java之NIO(二)selector socketChannel

上篇文章对NIO进行了简介,对Channel和Buffer接口的使用进行了说明,并举了一个简单的例子来说明其使用方法. 本篇则重点说明selector,Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件.这样,一个单独的线程可以管理多个channel,从而管理多个网络连接. 与selector联系紧密的是ServerSocketChannel和SocketChannel,他们的使用与上篇文章描述的FileChannel的使用方

健壮的、便捷的、异步的SocketChannel实现(转)

Socket通信比较常见的问题有如下几种: 1.设置收发超时: 2.正确的每一个bit的收发: 3.物理线路故障的保护: 4.始终能正常工作: 5.尽量少占系统资源: n.…… 而Socket编程有一个共性,尽管100个人可能会写出1000种实现,但做的事情却只有一种,就是: 通信. 为此,通过学习dnsjava的通信代码,加上自己在一些项目中的实践,现在给出TCP通信的例子实现如下,希望能够给想偷懒的人一个简单的解决方案. 本方案在正常的局域网连接中测试过几百万次没什么问题.缺乏更艰苦的环境,

Java基础知识强化之IO流笔记79:NIO之 SocketChannel

1. Java NIO中的SocketChannel是一个连接到TCP网络套接字的通道.可以通过以下2种方式创建SocketChannel: 打开一个SocketChannel并连接到互联网上的某台服务器. 一个新连接到达ServerSocketChannel时,会创建一个SocketChannel. 2.   (1)打开 SocketChannel 下面是SocketChannel的打开方式: 1 SocketChannel socketChannel = SocketChannel.open

【JAVA】【NIO】9、Java NIO SocketChannel

Java NIO的SocketChannel是连接tcp网络套接字的channel.有如下两种方式去创建: 1.打开一个SocketChannel,连接到网络上的一个server 2.当ServerSocketChannel收到一个连接,SocketChannel就创建了 打开SocketChannel SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("

Java NIO 之 ServerSocketChannel 与 SocketChannel

ServerSocketChannel ServerSocketChannel作用?就是专职干什么的? 1.监听新进来的TCP链接通道, 2.创建新的SocketChannel ServerSocketChannel 不具备 什么能力 ServerSocketChannel并不能进行数据传输的能力 如何创建ServerSocketChannel实例 ServerSocketChannel socketChannel =ServerSocketChannel.open(); 该对象关联了一个未绑定