Netty In Action中文版 - 第四章:Transports(传输)

本章内容

  • Transports(传输)
  • NIO(non-blocking IO,New IO), OIO(Old IO,blocking IO), Local(本地), Embedded(嵌入式)
  • Use-case(用例)
  • APIs(接口)

网络应用程序一个非常重要的工作是数据传输。

数据传输的过程不一样取决是使用哪种交通工具,可是传输的方式是一样的:都是以字节码传输。Java开发网络程序数据传输的过程和方式是被抽象了的。我们不须要关注底层接口。仅仅须要使用Java API或其它网络框架如Netty就能达到数据传输的目的。发送数据和接收数据都是字节码。Nothing more,nothing less。

假设你以前使用Java提供的网络接口工作过,你可能已经遇到过想从堵塞传输切换到非堵塞传输的情况,这样的切换是比較困难的。由于堵塞IO和非堵塞IO使用的API有非常大的差异。Netty提供了上层的传输实现接口使得这样的情况变得简单。

我们能够让所写的代码尽可能通用。而不会依赖一些实现相关的APIs。当我们想切换传输方式的时候不须要花非常大的精力和时间来重构代码。

本章将介绍统一的API以及怎样使用它们,会拿Netty的API和Java的API做比較来告诉你为什么Netty能够更easy的使用。本章也提供了一些优质的用例代码,以便最佳使用Netty。

使用Netty不须要其它的网络框架或网络编程经验。若有则仅仅是对理解netty有帮助,但不是必要的。

以下让我们来看看真是世界里的传输工作。

4.1 案例研究:切换传输方式

为了让你想象怎样运输,我会从一个简单的应用程序開始,这个应用程序什么都不做,仅仅是接受client连接并发送“Hi!”字符串消息到client,发送完了就断开连接。我不会具体解说这个过程的实现,它仅仅是一个样例。

4.1.1 使用Java的I/O和NIO

我们将不用Netty实现这个样例。以下代码是使用堵塞IO实现的样例:

package netty.in.action;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;

/**
 * Blocking networking without Netty
 * @author c.k
 *
 */
public class PlainOioServer {

	public void server(int port) throws Exception {
		//bind server to port
		final ServerSocket socket = new ServerSocket(port);
		try {
			while(true){
				//accept connection
				final Socket clientSocket = socket.accept();
				System.out.println("Accepted connection from " + clientSocket);
				//create new thread to handle connection
				new Thread(new Runnable() {
					@Override
					public void run() {
						OutputStream out;
						try{
							out = clientSocket.getOutputStream();
							//write message to connected client
							out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));
							out.flush();
							//close connection once message written and flushed
							clientSocket.close();
						}catch(IOException e){
							try {
								clientSocket.close();
							} catch (IOException e1) {
								e1.printStackTrace();
							}
						}
					}
				}).start();//start thread to begin handling
			}
		}catch(Exception e){
			e.printStackTrace();
			socket.close();
		}
	}

}

上面的方式非常简洁。可是这样的堵塞模式在大连接数的情况就会有非常严重的问题。如客户端连接超时,服务器响应严重延迟。为了解决这样的情况,我们能够使用异步网络处理全部的并发连接,但问题在于NIO和OIO的API是全然不同的,所以一个用OIO开发的网络应用程序想要使用NIO重构代码差点儿是又一次开发。

以下代码是使用Java NIO实现的样例:

package netty.in.action;

import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
/**
 * Asynchronous networking without Netty
 * @author c.k
 *
 */
public class PlainNioServer {

	public void server(int port) throws Exception {
		System.out.println("Listening for connections on port " + port);
		//open Selector that handles channels
		Selector selector = Selector.open();
		//open ServerSocketChannel
		ServerSocketChannel serverChannel = ServerSocketChannel.open();
		//get ServerSocket
		ServerSocket serverSocket = serverChannel.socket();
		//bind server to port
		serverSocket.bind(new InetSocketAddress(port));
		//set to non-blocking
		serverChannel.configureBlocking(false);
		//register ServerSocket to selector and specify that it is interested in new accepted clients
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
		while (true) {
			//Wait for new events that are ready for process. This will block until something happens
			int n = selector.select();
			if (n > 0) {
				//Obtain all SelectionKey instances that received events
				Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
				while (iter.hasNext()) {
					SelectionKey key = iter.next();
					iter.remove();
					try {
						//Check if event was because new client ready to get accepted
						if (key.isAcceptable()) {
							ServerSocketChannel server = (ServerSocketChannel) key.channel();
							SocketChannel client = server.accept();
							System.out.println("Accepted connection from " + client);
							client.configureBlocking(false);
							//Accept client and register it to selector
							client.register(selector, SelectionKey.OP_WRITE, msg.duplicate());
						}
						//Check if event was because socket is ready to write data
						if (key.isWritable()) {
							SocketChannel client = (SocketChannel) key.channel();
							ByteBuffer buff = (ByteBuffer) key.attachment();
							//write data to connected client
							while (buff.hasRemaining()) {
								if (client.write(buff) == 0) {
									break;
								}
							}
							client.close();//close client
						}
					} catch (Exception e) {
						key.cancel();
						key.channel().close();
					}
				}
			}
		}
	}

}

如你所见,即使它们实现的功能是一样,可是代码全然不同。

以下我们将用Netty来实现同样的功能。

4.1.2 Netty中使用I/O和NIO

以下代码是使用Netty作为网络框架编写的一个堵塞IO样例:

package netty.in.action;

import java.net.InetSocketAddress;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import io.netty.util.CharsetUtil;

public class NettyOioServer {

	public void server(int port) throws Exception {
		final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
		//事件循环组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			//用来引导server配置
			ServerBootstrap b = new ServerBootstrap();
			//使用OIO堵塞模式
			b.group(group).channel(OioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
			//指定ChannelInitializer初始化handlers
					.childHandler(new ChannelInitializer<Channel>() {
						@Override
						protected void initChannel(Channel ch) throws Exception {
							//加入一个“入站”handler到ChannelPipeline
							ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
								@Override
								public void channelActive(ChannelHandlerContext ctx) throws Exception {
									//连接后。写消息到client。写完后便关闭连接
									ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
								}
							});
						}
					});
			//绑定server接受连接
			ChannelFuture f = b.bind().sync();
			f.channel().closeFuture().sync();
		} catch (Exception e) {
			//释放全部资源
			group.shutdownGracefully();
		}
	}

}

上面代码实现功能一样,但结构清晰明了,这仅仅是Netty的优势之中的一个。

4.1.3 Netty中实现异步支持

以下代码是使用Netty实现异步,能够看出使用Netty由OIO切换到NIO是很的方便。

package netty.in.action;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.CharsetUtil;

import java.net.InetSocketAddress;

public class NettyNioServer {

	public void server(int port) throws Exception {
		final ByteBuf buf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hi!\r\n", CharsetUtil.UTF_8));
		// 事件循环组
		EventLoopGroup group = new NioEventLoopGroup();
		try {
			// 用来引导server配置
			ServerBootstrap b = new ServerBootstrap();
			// 使用NIO异步模式
			b.group(group).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
			// 指定ChannelInitializer初始化handlers
					.childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							// 加入一个“入站”handler到ChannelPipeline
							ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
								@Override
								public void channelActive(ChannelHandlerContext ctx) throws Exception {
									// 连接后。写消息到client,写完后便关闭连接
									ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);
								}
							});
						}
					});
			// 绑定server接受连接
			ChannelFuture f = b.bind().sync();
			f.channel().closeFuture().sync();
		} catch (Exception e) {
			// 释放全部资源
			group.shutdownGracefully();
		}
	}
}

由于Netty使用同样的API来实现每一个传输,它并不关心你使用什么来实现。

Netty通过操作Channel接口和ChannelPipeline、ChannelHandler来实现传输。

4.2 Transport API

传输API的核心是Channel接口。它用于全部出站的操作。Channel接口的类层次结构例如以下

如上图所看到的,每一个Channel都会分配一个ChannelPipeline和ChannelConfig。ChannelConfig负责设置并存储配置,并同意在执行期间更新它们。传输一般有特定的配置设置,仅仅作用于传输,没有其它的实现。

ChannelPipeline容纳了使用的ChannelHandler实例,这些ChannelHandler将处理通道传递的“入站”和“出站”数据。ChannelHandler的实现同意你改变数据状态和数据传输。本书有章节具体解说ChannelHandler,ChannelHandler是Netty的重点概念。

如今我们能够使用ChannelHandler做以下一些事情:

  • 数据传输时。将数据从一种格式转换到还有一种格式
  • 异常通知
  • Channel变为有效或无效时获得通知
  • Channel被注冊或从EventLoop中注销时获得通知
  • 通知用户特定事件

这些ChannelHandler实例加入到ChannelPipeline中,在ChannelPipeline中按顺序逐个运行。它类似于一个链条,有使用过Servlet的读者可能会更easy理解。

ChannelPipeline实现了拦截过滤器模式,这意味着我们连接不同的ChannelHandler来拦截并处理经过ChannelPipeline的数据或事件。能够把ChannelPipeline想象成UNIX管道。它同意不同的命令链(ChannelHandler相当于命令)。

你还能够在执行时依据须要加入ChannelHandler实例到ChannelPipeline或从ChannelPipeline中删除,这能帮助我们构建高度灵活的Netty程序。

此外,訪问指定的ChannelPipeline和ChannelConfig,你能在Channel自身上进行操作。Channel提供了非常多方法。例如以下列表:

  • eventLoop(),返回分配给Channel的EventLoop
  • pipeline()。返回分配给Channel的ChannelPipeline
  • isActive(),返回Channel是否激活,已激活说明与远程连接对等
  • localAddress(),返回已绑定的本地SocketAddress
  • remoteAddress(),返回已绑定的远程SocketAddress
  • write()。写数据到远程client,数据通过ChannelPipeline传输过去

后面会越来越熟悉这些方法,如今仅仅须要记住我们的操作都是在同样的接口上执行,Netty的高灵活性让你能够以不同的传输实现进行重构。

写数据到远程已连接client能够调用Channel.write()方法。例如以下代码:

Channel channel = ...
//Create ByteBuf that holds data to write
ByteBuf buf = Unpooled.copiedBuffer("your data", CharsetUtil.UTF_8);
//Write data
ChannelFuture cf = channel.write(buf);
//Add ChannelFutureListener to get notified after write completes
cf.addListener(new ChannelFutureListener() {
	@Override
	public void operationComplete(ChannelFuture future) {
		//Write operation completes without error
		if (future.isSuccess()) {
			System.out.println(.Write successful.);
		} else {
			//Write operation completed but because of error
			System.err.println(.Write error.);
			future.cause().printStacktrace();
		}
	}
});

Channel是线程安全(thread-safe)的,它能够被多个不同的线程安全的操作,在多线程环境下。全部的方法都是安全的。正由于Channel是安全的,我们存储对Channel的引用,并在学习的时候使用它写入数据到远程已连接的client。使用多线程也是如此。以下的代码是一个简单的多线程样例:

final Channel channel = ...
//Create ByteBuf that holds data to write
final ByteBuf buf = Unpooled.copiedBuffer("your data",CharsetUtil.UTF_8);
//Create Runnable which writes data to channel
Runnable writer = new Runnable() {
	@Override
	public void run() {
		channel.write(buf.duplicate());
	}
};
//Obtain reference to the Executor which uses threads to execute tasks
Executor executor = Executors.newChachedThreadPool();
// write in one thread
//Hand over write task to executor for execution in thread
executor.execute(writer);
// write in another thread
//Hand over another write task to executor for execution in thread
executor.execute(writer);

此外。这样的方法保证了写入的消息以同样的顺序通过写入它们的方法。想了解全部方法的使用能够參考Netty API文档。

4.3 Netty包括的传输实现

Netty自带了一些传输协议的实现,尽管没有支持全部的传输协议,可是其自带的已足够我们来使用。Netty应用程序的传输协议依赖于底层协议。本节我们将学习Netty中的传输协议。

Netty中的传输方式有例如以下几种:

  • NIO,io.netty.channel.socket.nio,基于java.nio.channels的工具包,使用选择器作为基础的方法。
  • OIO,io.netty.channel.socket.oio,基于java.net的工具包,使用堵塞流。
  • Local,io.netty.channel.local,用来在虚拟机之间本地通信。
  • Embedded,io.netty.channel.embedded,嵌入传输。它同意在没有真正网络的运输中使用ChannelHandler。能够很实用的来測试ChannelHandler的实现。

4.3.1 NIO - Nonblocking I/O

NIO传输是眼下最经常使用的方式,它通过使用选择器提供了全然异步的方式操作全部的I/O,NIO从Java 1.4才被提供。

NIO中,我们能够注冊一个通道或获得某个通道的改变的状态,通道状态有以下几种改变:

  • 一个新的Channel被接受并已准备好
  • Channel连接完毕
  • Channel中有数据并已准备好读取
  • Channel发送数据出去

处理完改变的状态后需又一次设置他们的状态,用一个线程来检查是否有已准备好的Channel。假设有则运行相关事件。在这里可能仅仅同一时候一个注冊的事件而忽略其它的。

选择器所支持的操作在SelectionKey中定义,详细例如以下:

  • OP_ACCEPT,有新连接时得到通知
  • OP_CONNECT。连接完毕后得到通知
  • OP_READ,准备好读取数据时得到通知
  • OP_WRITE。写入数据到通道时得到通知

Netty中的NIO传输就是基于这种模型来接收和发送数据,通过封装将自己的接口提供给用户使用。这全然隐藏了内部实现。如前面所说,Netty隐藏内部的实现细节。将抽象出来的API暴露出来供使用。以下是处理流程图:

NIO在处理过程也会有一定的延迟,若连接数不大的话。延迟一般在毫秒级,可是其吞吐量依旧比OIO模式的要高。Netty中的NIO传输是“zero-file-copy”,也就是零文件复制,这样的机制能够让程序速度更快,更高效的从文件系统中传输内容,零复制就是我们的应用程序不会将发送的数据先拷贝到JVM堆栈在进行处理,而是直接从内核空间操作。

接下来我们将讨论OIO传输,它是堵塞的。

4.3.2 OIO - Old blocking I/O

OIO就是java中提供的Socket接口,java最開始仅仅提供了堵塞的Socket。堵塞会导致程序性能低。

以下是OIO的处理流程图,若想具体了解。能够參阅其它相关资料。

4.3.3 Local - In VM transport

Netty包括了本地传输。这个传输实现使用同样的API用于虚拟机之间的通信,传输是全然异步的。每一个Channel使用唯一的SocketAddress,client通过使用SocketAddress进行连接,在server会被注冊为长期执行,一旦通道关闭,它会自己主动注销。client无法再使用它。

连接到本地传输server的行为与其它的传输实现差点儿是同样的,须要注意的一个重点是仅仅能在本地的server和client上使用它们。

Local未绑定不论什么Socket。值提供JVM进程之间的通信。

4.3.4 Embedded transport

Netty还包含嵌入传输,与之前讲述的其它传输实现比較,它是不是一个真的传输呢?若不是一个真的传输。我们用它能够做什么呢?Embedded transport同意更easy的使用不同的ChannelHandler之间的交互。这也更easy嵌入到其它的ChannelHandler实例并像一个辅助类一样使用它们。它一般用来測试特定的ChannelHandler实现,也能够在ChannelHandler中又一次使用一些ChannelHandler来进行扩展。为了实现这种目的。它自带了一个详细的Channel实现,即:EmbeddedChannel。

4.4 每种传输方式在什么时候使用?

不多加赘述,看以下列表:

  • OIO,在低连接数、须要低延迟时、堵塞时使用
  • NIO,在高连接数时使用
  • Local,在同一个JVM内通信时使用
  • Embedded,測试ChannelHandler时使用
时间: 2024-10-15 01:20:57

Netty In Action中文版 - 第四章:Transports(传输)的相关文章

《Netty In Action中文版》第二章:第一个Netty程序

注:本篇内容出自<Netty In Action>一书:         注:本人原创译文,转载请注明出处! 本章介绍 获取Netty4最新版本 设置运行环境来构建和运行netty程序 创建一个基于Netty的服务器和客户端 拦截和处理异常 编写和运行Netty服务器和客户端 本章将简单介绍Netty的核心概念,这个狠心概念就是学习Netty是如何拦截和处理异常,对于刚开始学习netty的读者,利用netty的异常拦截机制来调试程序问题很有帮助.本章还会介绍其他一些核心概念,如服务器和客户端的

Netty In Action中文版 - 第六章:ChannelHandler

本章介绍 ChannelPipeline ChannelHandlerContext ChannelHandler Inbound vs outbound(入站和出站) 接受连接或创建他们仅仅是你的应用程序的一部分,尽管这些不论什么非常重要,可是一个网络应用程序旺旺是更复杂的,须要很多其它的代码编写,如处理传入和传出的数据.Netty提供了一个强大的处理这些事情的功能,同意用户自己定义ChannelHandler的实现来处理数据.使得ChannelHandler更强大的是能够连接每一个Chann

Netty In Action中文版 学习第一章的重点知识

第一章 大多数都是一些名词,SSL/TLS和Starttls支持,回调,聚合,分散,这些我也不是很明白,也百度了一下这些基础概念,大家也可以去看看实际的内容以及相关的代码,由于本人也是新手对这些词也理解不深刻不在此细说. 下面是一个jar文件搜索的比较好的网址拿出来分享给大家 http://www.findjar.com/index.x?query=xlstojson jar包的搜索地 下面就是工具,作为新人大多数都说用Eclipse就够了,我由于想看开源的源码所以选择了MyEclipse 10

Java Persistence with MyBatis 3(中文版) 第四章 使用注解配置SQL映射器

在上一章,我们看到了我们是怎样在映射器Mapper XML配置文件中配置映射语句的.MyBatis也支持使用注解来配置映射语句.当我们使用基于注解的映射器接口时,我们不再需要在XML配置文件中配置了.如果你愿意,你也可以同时使用基于XML和基于注解的映射语句. 本章将涵盖以下话题: l 在映射器Mapper接口上使用注解 l 映射语句 @Insert,@Update,@Delete,@SeelctStatements l 结果映射 一对一映射 一对多映射 l 动态SQL @SelectProvi

Netty In Action中国版 - 第二章:第一Netty程序

本章介绍 获得Netty4最新的版本号 设置执行环境,以构建和执行netty程序 创建一个基于Netty的server和client 拦截和处理异常 编制和执行Nettyserver和client 本章将简介Netty的核心概念,这个狠心概念就是学习Netty是怎样拦截和处理异常.对于刚開始学习netty的读者.利用netty的异常拦截机制来调试程序问题非常有帮助.本章还会介绍其它一些核心概念.如server和client的启动以及分离通道的处理程序.本章学习一些基础以便后面章节的深入学习. 本

Netty in Action (十四) 第五章节 第三部分 ByteBufHolder,ByteBuf分配,计数引用

5.4 Interface ByteBufHolder 我们经常在ByteBuf中存储一些正常数据之外,我们有时候还需要增加一些各式各样的属性值,一个Http响应体就是一个很好的例子,除了按照字节传输过来的主体内容,还有状态码,cookie等信息 Netty提供了ByteBufHolder来处理这些常用的用户案例,ByteBufHolder还提供了Netty一些其他的先进特性,例如缓存池,缓存池可以是ByteBuf中直接"借出"获取,如果有需要,"借出"的ByteB

Learning Spark中文版--第四章--使用键值对(2)

Actions Available on Pair RDDs (键值对RDD可用的action) ??和transformation(转换)一样,键值对RDD也可以使用基础RDD上的action(开工),并且键值对RDD有一些利用键值对数据特性的的action,如下表: 表4-3 键值对RDD上的action 函数名 描述 例子 结果 countByKey() 计算每个键元素的总数 rdd.countByKey() {(1,1),(3,2)} collectAsMap() 结果收集成一个map便

第四章:传输层

传输层协议有: TCP: 传输控制协议 UDP: 用户数据报协议 TCP协议介绍 是面向连接的.全双工的.可靠的进程到进程通信的协议 TCP建立连接的过程称为三次握手 PC1给客户端发送请求连接SYN报文此时SEQ=X SYN=1 客户端收到后回复ACK确认报文和SYN报文 TCP断开连接的四次挥手 TCP常用端口及功能 UDP协议 缺点: 无连接.不可靠的传输协议 优点: 花费的开销小 UDP报文的首部格式 UDP长度:用来指出UDP的总长度,为首部加上数据 校验和:用来完成对UDP数据的差错

Java Persistence with MyBatis 3(中文版) 第五章 与Spring集成

MyBatis-Spring是MyBatis框架的子模块,用来提供与当前流行的依赖注入框架Spring的无缝集成. Spring框架是一个基于依赖注入(Dependency Injection)和面向切面编程(Aspect Oriented Programming,AOP)的Java框架,鼓励使用基于POJO的编程模型.另外,Spring提供了声明式和编程式的事务管理能力,可以很大程度上简化应用程序的数据访问层(data access layer)的实现.在本章中,我们将看到在基于Spring的