最近研究Android Socket通讯,遇到Socket write timeout写超时现象,具体是:
OutputStream outStream = mSocket.getOutputStream();
outStream.write(data); //卡在这一行;
outStream.flush();
查了很多资料,大概清楚一些,flush()方法作用是“Flushes this output stream and forces any buffered output bytes to be written out.”,强制刷新输出流,并将缓存数据写出去。但是这个方法无法保证数据发送成功,所以,当flush成功,但是缓存区被写满之后,write方法就无法将data继续写入,从而到处卡住,直到缓存区的数据发出去,苦于无法解决这个问题,尝试去使用java的通讯框架。
Netty和Apache Mina大概是Java世界非常知名的通讯框架。它们出自同一个作者,Mina诞生略早,现在属于Apache基金会,而Netty开始在Jboss名下,3.0之前叫JBoss Netty,后来出来自立门户netty.io。关于Mina已有@FrankHui的Mina系列文章,我正好最近也要做一些网络方面的开发,就研究一下Netty的源码,顺便分享出来了。
Netty目前有三个分支:5.x、4.x和3.x。新的分支重写了很多东西,并对项目进行了分包,规模比较庞大,入手会困难一些,而3.x版本则已经被广泛使用。本系列文章针对netty 3.10.1 final。
Getting Started
我推荐大家从官方的UserGuide开始阅读 http://netty.io/3.8/guide/#preface.2,或者可以直接阅读源码,Java世界的框架普遍比较庞大,但是功能覆盖全面,个人爱好。
理解Netty的关键点在哪呢?我觉得,除了NIO的相关知识,另一个就是事件驱动的设计思想。什么叫事件驱动?我们回头看看EchoServerHandler
的代码,其中的参数:public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
,MessageEvent就是一个事件。这个事件携带了一些信息,例如这里e.getMessage()
就是消息的内容,而EchoServerHandler
则描述了处理这种事件的方式。一旦某个事件触发,相应的Handler则会被调用,并进行处理。这种事件机制在UI编程里广泛应用,而Netty则将其应用到了网络编程领域。
在Netty里,所有事件都来自ChannelEvent
接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler
,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler
来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。当弄明白其中的奥妙时,不得不佩服这种设计!
Netty的包结构如下:
org
└── jboss
└── netty
├── bootstrap 配置并启动服务的类
├── buffer 缓冲相关类,对NIO Buffer做了一些封装
├── channel 核心部分,处理连接
├── container 连接其他容器的代码
├── example 使用示例
├── handler 基于handler的扩展部分,实现协议编解码等附加功能
├── logging 日志
└── util 工具类
图中可以看到,除了之前说到的事件驱动机制之外,Netty的核心功能还包括两部分:
- Zero-Copy-Capable Rich Byte Buffer
零拷贝的Buffer。为什么叫零拷贝?因为在数据传输时,最终处理的数据会需要对单个传输层的报文,进行组合或者拆分。NIO原生的ByteBuffer要做到这件事,需要对ByteBuffer内容进行拷贝,产生新的ByteBuffer,而Netty通过提供Composite(组合)和Slice(切分)两种Buffer来实现零拷贝。这部分代码在
org.jboss.netty.buffer
包中。 - Universal Communication API
统一的通讯API。因为Java的Old I/O和New I/O,使用了互不兼容的API,而Netty则提供了统一的API(
org.jboss.netty.channel.Channel
)来封装这两种I/O模型。这部分代码在org.jboss.netty.channel
包中。
此外,Protocol Support功能通过handler机制实现。
下面开始一个简单的类似helloworld的程序:
服务端:
DiscardServer.java
1 import java.net.InetSocketAddress; 2 import java.util.concurrent.Executors; 3 4 import org.jboss.netty.bootstrap.ServerBootstrap; 5 import org.jboss.netty.channel.ChannelFactory; 6 import org.jboss.netty.channel.ChannelPipeline; 7 import org.jboss.netty.channel.ChannelPipelineFactory; 8 import org.jboss.netty.channel.Channels; 9 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; 10 11 12 13 public class DiscardServer { 14 15 public static void main(String[] args) throws Exception{ 16 // TODO Auto-generated method stub 17 ChannelFactory factory = new NioServerSocketChannelFactory( 18 Executors.newCachedThreadPool(), 19 Executors.newCachedThreadPool()); 20 21 ServerBootstrap bootstrap = new ServerBootstrap(factory); 22 23 bootstrap.setPipelineFactory(new ChannelPipelineFactory(){ 24 25 @Override 26 public ChannelPipeline getPipeline() { 27 ChannelPipeline pipeline = Channels.pipeline(); 28 pipeline .addLast("handler", new DiscardServerHandler()); 29 return pipeline; 30 } 31 }); 32 33 bootstrap.setOption("child.tcpNoDelay", true); 34 bootstrap.setOption("child.keepAlive", true); 35 36 bootstrap.bind(new InetSocketAddress(18081)); 37 } 38 39 }
DiscardServerHandler.java
1 import org.jboss.netty.buffer.ChannelBuffer; 2 import org.jboss.netty.channel.Channel; 3 import org.jboss.netty.channel.ChannelHandlerContext; 4 import org.jboss.netty.channel.ExceptionEvent; 5 import org.jboss.netty.channel.MessageEvent; 6 import org.jboss.netty.channel.SimpleChannelHandler; 7 8 9 public class DiscardServerHandler extends SimpleChannelHandler { 10 @Override 11 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 12 throws Exception { 13 // TODO Auto-generated method stub 14 15 ChannelBuffer buf = (ChannelBuffer) e.getMessage(); 16 while(buf.readable()) { 17 System.out.println(buf.readByte()); 18 System.out.flush(); 19 } 20 21 super.messageReceived(ctx, e); 22 } 23 24 @Override 25 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 26 throws Exception { 27 // TODO Auto-generated method stub 28 e.getCause().printStackTrace(); 29 30 Channel ch = e.getChannel(); 31 ch.close(); 32 33 super.exceptionCaught(ctx, e); 34 } 35 }
客户端:
NettyHandler.java
1 public class NettyHandler { 2 private final static String TAG = "NettyHandler"; 3 4 ChannelFactory mFactory = null; 5 private ClientBootstrap mBootstrap = null; 6 private Channel mChannel = null; 7 private boolean isConnect = false; 8 9 private String mHost; 10 private int mPort; 11 12 13 public NettyHandler(String host, int port) { 14 mHost = host; 15 mPort = port; 16 } 17 18 public boolean isConnected() { 19 20 return isConnect; 21 } 22 23 public void connect() { 24 Log.d(TAG, "connect to:" + mHost); 25 mFactory = new NioClientSocketChannelFactory( 26 Executors.newCachedThreadPool(), 27 Executors.newCachedThreadPool()); 28 29 mBootstrap = new ClientBootstrap(mFactory); 30 mBootstrap.setPipelineFactory(new ChannelPipelineFactory() { 31 public ChannelPipeline getPipeline() { 32 ChannelPipeline pipeline = Channels.pipeline(); 33 pipeline.addLast("handler",new ClientHandler()); 34 return pipeline; 35 } 36 }); 37 mBootstrap.setOption("tcpNoDelay" , true); 38 mBootstrap.setOption("keepAlive", true); 39 40 41 ChannelFuture future = mBootstrap.connect (new InetSocketAddress(mHost, mPort)); 42 future.awaitUninterruptibly(); 43 mChannel = future.awaitUninterruptibly().getChannel(); 44 } 45 46 public void sendData(String message) { 47 Log.d(TAG, "send data to PC: [" + message + "]"); 48 49 ChannelBuffer buffer = ChannelBuffers.buffer(message.length()); 50 buffer.writeBytes(message.getBytes()); 51 52 if (isConnect) { 53 if (mChannel.isConnected() && mChannel.isWritable()) { 54 mStartTime = System.currentTimeMillis(); 55 mChannel.write(buffer); 56 } 57 } 58 } 59 60 public class ClientHandler extends SimpleChannelUpstreamHandler { 61 @Override 62 public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { 63 Log.d(TAG, "Socket is connected."); 64 isConnect = true; 65 66 } 67 68 @Override 69 public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) { 70 Log.d(TAG, "Socket is disconnected."); 71 isConnect = false; 72 } 73 74 @Override 75 public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 76 Log.d(TAG, "Socket get message: " + e.getMessage()); 77 } 78 79 @Override 80 public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 81 Log.d(TAG, "Socket get exceptionCaught: " + e.getCause().toString()); 82 83 isConnect = false; 84 } 85 86 @Override 87 public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) { 88 } 89 }
实例化 NettyHandler,调用connect与sendData即可发送消息,这只是一个最简单的例子,接下来我们将深入理解netty的结构。