Netty 使用

最近研究Android Socket通讯,遇到Socket write timeout写超时现象,具体是:

OutputStream outStream = mSocket.getOutputStream();

outStream.write(data); //卡在这一行;

outStream.flush();

查了很多资料,大概清楚一些,flush()方法作用是“Flushes this output stream and forces any buffered output bytes to be written out.”,强制刷新输出流,并将缓存数据写出去。但是这个方法无法保证数据发送成功,所以,当flush成功,但是缓存区被写满之后,write方法就无法将data继续写入,从而到处卡住,直到缓存区的数据发出去,苦于无法解决这个问题,尝试去使用java的通讯框架。

Netty和Apache Mina大概是Java世界非常知名的通讯框架。它们出自同一个作者,Mina诞生略早,现在属于Apache基金会,而Netty开始在Jboss名下,3.0之前叫JBoss Netty,后来出来自立门户netty.io。关于Mina已有@FrankHui的Mina系列文章,我正好最近也要做一些网络方面的开发,就研究一下Netty的源码,顺便分享出来了。

Netty目前有三个分支:5.x、4.x和3.x。新的分支重写了很多东西,并对项目进行了分包,规模比较庞大,入手会困难一些,而3.x版本则已经被广泛使用。本系列文章针对netty 3.10.1 final。

Getting Started

我推荐大家从官方的UserGuide开始阅读 http://netty.io/3.8/guide/#preface.2,或者可以直接阅读源码,Java世界的框架普遍比较庞大,但是功能覆盖全面,个人爱好。

理解Netty的关键点在哪呢?我觉得,除了NIO的相关知识,另一个就是事件驱动的设计思想。什么叫事件驱动?我们回头看看EchoServerHandler的代码,其中的参数:public void messageReceived(ChannelHandlerContext ctx, MessageEvent e),MessageEvent就是一个事件。这个事件携带了一些信息,例如这里e.getMessage()就是消息的内容,而EchoServerHandler则描述了处理这种事件的方式。一旦某个事件触发,相应的Handler则会被调用,并进行处理。这种事件机制在UI编程里广泛应用,而Netty则将其应用到了网络编程领域。

在Netty里,所有事件都来自ChannelEvent接口,这些事件涵盖监听端口、建立连接、读写数据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。当弄明白其中的奥妙时,不得不佩服这种设计!

Netty的包结构如下:

org
└── jboss
    └── netty
        ├── bootstrap 配置并启动服务的类
        ├── buffer 缓冲相关类,对NIO Buffer做了一些封装
        ├── channel 核心部分,处理连接
        ├── container 连接其他容器的代码
        ├── example 使用示例
        ├── handler 基于handler的扩展部分,实现协议编解码等附加功能
        ├── logging 日志
        └── util 工具类

图中可以看到,除了之前说到的事件驱动机制之外,Netty的核心功能还包括两部分:

  • Zero-Copy-Capable Rich Byte Buffer

    零拷贝的Buffer。为什么叫零拷贝?因为在数据传输时,最终处理的数据会需要对单个传输层的报文,进行组合或者拆分。NIO原生的ByteBuffer要做到这件事,需要对ByteBuffer内容进行拷贝,产生新的ByteBuffer,而Netty通过提供Composite(组合)和Slice(切分)两种Buffer来实现零拷贝。这部分代码在org.jboss.netty.buffer包中。

  • Universal Communication API

    统一的通讯API。因为Java的Old I/O和New I/O,使用了互不兼容的API,而Netty则提供了统一的API(org.jboss.netty.channel.Channel)来封装这两种I/O模型。这部分代码在org.jboss.netty.channel包中。

此外,Protocol Support功能通过handler机制实现。

下面开始一个简单的类似helloworld的程序:

服务端:

DiscardServer.java
 1 import java.net.InetSocketAddress;
 2 import java.util.concurrent.Executors;
 3
 4 import org.jboss.netty.bootstrap.ServerBootstrap;
 5 import org.jboss.netty.channel.ChannelFactory;
 6 import org.jboss.netty.channel.ChannelPipeline;
 7 import org.jboss.netty.channel.ChannelPipelineFactory;
 8 import org.jboss.netty.channel.Channels;
 9 import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
10
11
12
13 public class DiscardServer {
14
15     public static void main(String[] args) throws Exception{
16         // TODO Auto-generated method stub
17         ChannelFactory factory = new NioServerSocketChannelFactory(
18                 Executors.newCachedThreadPool(),
19                 Executors.newCachedThreadPool());
20
21         ServerBootstrap bootstrap = new ServerBootstrap(factory);
22
23         bootstrap.setPipelineFactory(new ChannelPipelineFactory(){
24
25             @Override
26             public ChannelPipeline getPipeline() {
27                 ChannelPipeline pipeline = Channels.pipeline();
28                 pipeline .addLast("handler", new DiscardServerHandler());
29                 return pipeline;
30             }
31         });
32
33         bootstrap.setOption("child.tcpNoDelay", true);
34         bootstrap.setOption("child.keepAlive", true);
35
36         bootstrap.bind(new InetSocketAddress(18081));
37     }
38
39 }
DiscardServerHandler.java
 1 import org.jboss.netty.buffer.ChannelBuffer;
 2 import org.jboss.netty.channel.Channel;
 3 import org.jboss.netty.channel.ChannelHandlerContext;
 4 import org.jboss.netty.channel.ExceptionEvent;
 5 import org.jboss.netty.channel.MessageEvent;
 6 import org.jboss.netty.channel.SimpleChannelHandler;
 7
 8
 9 public class DiscardServerHandler extends SimpleChannelHandler {
10     @Override
11     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
12             throws Exception {
13         // TODO Auto-generated method stub
14
15         ChannelBuffer buf = (ChannelBuffer) e.getMessage();
16         while(buf.readable()) {
17             System.out.println(buf.readByte());
18             System.out.flush();
19         }
20
21         super.messageReceived(ctx, e);
22     }
23
24     @Override
25     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
26             throws Exception {
27         // TODO Auto-generated method stub
28         e.getCause().printStackTrace();
29
30         Channel ch = e.getChannel();
31         ch.close();
32
33         super.exceptionCaught(ctx, e);
34     }
35 }

客户端:

NettyHandler.java
 1 public class NettyHandler {
 2     private final static String TAG = "NettyHandler";
 3
 4     ChannelFactory mFactory = null;
 5     private ClientBootstrap mBootstrap = null;
 6     private Channel mChannel = null;
 7     private boolean isConnect = false;
 8
 9     private String     mHost;
10     private int        mPort;
11
12
13     public NettyHandler(String host, int port) {
14         mHost = host;
15         mPort = port;
16     }
17
18     public boolean isConnected() {
19
20         return isConnect;
21     }
22
23     public void connect() {
24         Log.d(TAG, "connect to:" + mHost);
25         mFactory = new NioClientSocketChannelFactory(
26                 Executors.newCachedThreadPool(),
27                 Executors.newCachedThreadPool());
28
29         mBootstrap = new ClientBootstrap(mFactory);
30         mBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
31             public ChannelPipeline getPipeline() {
32                 ChannelPipeline pipeline = Channels.pipeline();
33                 pipeline.addLast("handler",new ClientHandler());
34                 return pipeline;
35             }
36         });
37         mBootstrap.setOption("tcpNoDelay" , true);
38         mBootstrap.setOption("keepAlive", true);
39
40
41         ChannelFuture future = mBootstrap.connect (new InetSocketAddress(mHost, mPort));
42         future.awaitUninterruptibly();
43         mChannel = future.awaitUninterruptibly().getChannel();
44     }
45
46     public void sendData(String message) {
47         Log.d(TAG, "send data to PC: [" + message + "]");
48
49         ChannelBuffer buffer = ChannelBuffers.buffer(message.length());
50         buffer.writeBytes(message.getBytes());
51
52         if (isConnect) {
53             if (mChannel.isConnected() && mChannel.isWritable()) {
54                 mStartTime = System.currentTimeMillis();
55                 mChannel.write(buffer);
56             }
57         }
58     }
59
60     public class ClientHandler extends SimpleChannelUpstreamHandler  {
61         @Override
62         public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
63             Log.d(TAG, "Socket is connected.");
64             isConnect = true;
65
66         }
67
68         @Override
69         public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
70             Log.d(TAG, "Socket is disconnected.");
71             isConnect = false;
72         }
73
74         @Override
75         public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
76             Log.d(TAG, "Socket get message: " + e.getMessage());
77         }
78
79         @Override
80         public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
81             Log.d(TAG, "Socket get exceptionCaught: " + e.getCause().toString());
82
83             isConnect = false;
84         }
85
86         @Override
87         public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) {
88         }
89     } 

实例化 NettyHandler,调用connect与sendData即可发送消息,这只是一个最简单的例子,接下来我们将深入理解netty的结构。

 
时间: 2024-10-16 03:44:38

Netty 使用的相关文章

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

Netty对Protocol Buffer多协议的支持(八)

Netty对Protocol Buffer多协议的支持(八) 一.背景 在上篇博文中笔者已经用代码演示了如何在netty中使用Protocol Buffer,然而细心的用户可能会发现一个明显的不足之处就是,我们的Handler只能处理一种特定的类型,而我们的项目中又不可能只有一种类型,那么这个问题该怎么解决了?多的不说,笔者直接上代码. 二.代码实现 2.1 message的编写 syntax = "proto2"; package com.rsy.netty.protobuf; op

Java Netty (1)

Netty是由JBOSS提供的一个java开源框架,本质上也是NIO,是对NIO的封装,比NIO更加高级,可以说发展的路线是IO->NIO->Netty. ServerBootstrap和ClientBootstrap是Netty中两个比较重要的类,分别用来进行服务器和客户端的初始化. 服务器: // ChannelFactory final ChannelFactory channelFactory = new NioServerSocketChannelFactory( // Boss线程

netty 解决TCP粘包与拆包问题(二)

TCP以流的方式进行数据传输,上层应用协议为了对消息的区分,采用了以下几种方法. 1.消息固定长度 2.第一篇讲的回车换行符形式 3.以特殊字符作为消息结束符的形式 4.通过消息头中定义长度字段来标识消息的总长度 一.采用指定分割符解决粘包与拆包问题 服务端 1 package com.ming.netty.nio.stickpack; 2 3 4 5 import java.net.InetSocketAddress; 6 7 import io.netty.bootstrap.ServerB

用Netty解析Redis网络协议

用Netty解析Redis网络协议 根据Redis官方文档的介绍,学习了一下Redis网络通信协议.然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下! 1.RESP协议 Redis的客户端与服务端采用一种叫做 RESP(REdis Serialization Protocol)的网络通信协议交换数据.RESP的设计权衡了实现简单.解析快速.人类可读这三个因素.Redis客户端通过RESP序列化整数.字符串.数据等数据类型,发送字符串数组表示参数的命

tomcat 、NIO、netty 本质

tomcat 基于 web 浏览器的通信容器 nio 同步非阻塞的I/O模型 netty 通信框架,对 nio 的封装

Netty中的那些坑

Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部分引起的. 注:这里说的坑不是说netty不好,只是如果这些地方不注意,或者不去看netty的代码,就有可能掉进去了. 坑1: Netty 4的线程模型转变 在Netty 3的时候,upstream是在IO线程里执行的,而downstream是在业务线程里执行的.比如netty从网络读取一个包传递给

Netty利用ChannelGroup广播消息

在Netty中提供了ChannelGroup接口,该接口继承Set接口,因此可以通过ChannelGroup可管理服务器端所有的连接的Channel,然后对所有的连接Channel广播消息. Server端: public class BroadCastServer { public static void run(int port) { EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioE

Netty简单应用与线上服务器部署_netty视频

Netty简单应用与线上服务器部署 课程学习地址:http://www.xuetuwuyou.com/course/198 课程出自学途无忧网:http://www.xuetuwuyou.com 一.开发环境 4.1.11.Final   jdk1.8 maven 3.2 Spring 4.3.9 二.适合人群 ①想深入学习java ClassLoader ②想在线上linux服务器上运行netty或Springboot服务 三.课程目标 ①掌控ClassLoader ②学会编写shell脚本

netty中使用IdleStateHandler来发起心跳

网络连接中,处理Idle事件是很常见的,一般情况下,客户端与服务端在指定时间内没有任何读写请求,就会认为连接是idle的.此时,客户端需要向服务端发送ping消息,来维持服务端与客户端的链接.那么怎么判断客户端在指定时间里没有任何读写请求呢?netty中为我们提供一个特别好用的IdleStateHandler来干这个苦差事!请看下面代码: public class EchoClient { private final static int readerIdleTimeSeconds = 40;/