Netty专题(四)-----Bootstrap、ByteBuf、编码解码

引导Bootstrap

我们把前面的用例称作引导一个服务器,后面的用例称作引导一个客户端。虽然这个术语简单方便,但是它略微掩盖了一个重要的事实,即“服务器”和“客户端”实际上表示了不同的网络行为;换句话说,是监听传入的连接还是建立到一个或者多个进程的连接。因此,有两种类型的引导:一种用于

客户端(简单地称为Bootstrap),而另一种(ServerBootstrap)用于服务器。无论你的应用程序使用哪种协议或者处理哪种类型的数据,唯一决定它使用哪种引导类的是它是作为一个客户端还是作为一个服务器。

比较Bootstrap类

ServerBootstrap 将绑定到一个端口,因为服务器必须要监听连接,而Bootstrap 则是由想要连接到远程节点的客户端应用程序所使用的。

第二个区别可能更加明显。引导一个客户端只需要一个EventLoopGroup,但是一个ServerBootstrap 则需要两个(也可以是同一个实例)。

因为服务器需要两组不同的Channel。第一组将只包含一个ServerChannel,代表服务器自身的已绑定到某个本地端口的正在监听的套接字。而第二组将包含所有已创建的用来处理传

入客户端连接(对于每个服务器已经接受的连接都有一个)的Channel。

与ServerChannel 相关联的EventLoopGroup 将分配一个负责为传入连接请求创建Channel 的EventLoop。一旦连接被接受,第二个EventLoopGroup 就会给它的Channel分配一个EventLoop。

在引导过程中添加多个ChannelHandler

Netty 提供了一个特殊的ChannelInboundHandlerAdapter 子类:

public abstract class ChannelInitializer<C extends Channel> ext ends ChannelInboundHandlerAdapter

它定义了下面的方法:

protect ed abstract void initChannel(C ch) throws Exception;

这个方法提供了一种将多个ChannelHandler 添加到一个ChannelPipeline 中的简便方法。你只需要简单地向Bootstrap 或ServerBootstrap 的实例提供你的ChannelInitializer 实现即可,并且一旦Channel 被注册到了它的EventLoop 之后,就会调用你的initChannel()版本。在该方法返回之后,

ChannelInitializer 的实例将会从ChannelPipeline 中移除它自己。

ChannelOption

ChannelOption的各种属性在套接字选项中都有对应。

1、ChannelOption.SO_BACKLOG

ChannelOption.SO_BACKLOG对应的是tcp/ip协议listen函数中的backlog参数,函数listen(int socketfd,int backlog)用来初始化服务端可连接队列,服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小。

2、ChannelOption.SO_REUSEADDR

ChanneOption.SO_REUSEADDR对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口,比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR就无法正常使用该端口。

3、ChannelOption.SO_KEEPALIVE

Channeloption.SO_KEEPALIVE参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报

文。

4、ChannelOption.SO_SNDBUF和ChannelOption.SO_RCVBUF

ChannelOption.SO_SNDBUF参数对应于套接字选项中的SO_SNDBUF,ChannelOption.SO_RCVBUF参数对应于套接字选项中的SO_RCVBUF这两个参数用于操作接收缓冲区和发送缓冲区

的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

5、ChannelOption.SO_LINGER

ChannelOption.SO_LINGER参数对应于套接字选项中的SO_LINGER,Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全

发送。

6、ChannelOption.TCP_NODELAY

ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网

络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,于TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

ByteBuf

ByteBuf API 的优点:

  • 它可以被用户自定义的缓冲区类型扩展;
  • 通过内置的复合缓冲区类型实现了透明的零拷贝;
  • 容量可以按需增长(类似于JDK 的StringBuilder);
  • 在读和写这两种模式之间切换不需要调用ByteBuffer 的flip()方法;
  • 读和写使用了不同的索引;
  • 支持方法的链式调用;
  • 支持引用计数;
  • 支持池化。

ByteBuf 维护了两个不同的索引,名称以read 或者write 开头的ByteBuf 方法,将会推进其对应的索引,而名称以set 或者get 开头的操作则不会 如果打算读取字节直到readerIndex 达到和writerIndex 同样的值时会发生什么。在那时,你将会到达“可以读取的”数据的末尾。就如同试图读取超出数组末尾的数据一样,试图读取超出该点的数据将会触发一个IndexOutOf-BoundsException。可以指定ByteBuf 的最大容量。试图移动写索引(即writerIndex)超过这个值将会触发一个异常。(默认的限制是Integer.MAX_VALUE。)

分配

堆缓冲区

最常用的ByteBuf 模式是将数据存储在JVM 的堆空间中。这种模式被称为支撑数组(backing array),它能在没有使用池化的情况下提供快速的分配和释放。可以由hasArray()来判断检查ByteBuf 是否由数组支撑。如果不是,则这是一个直接缓冲区

直接缓冲区

直接缓冲区是另外一种ByteBuf 模式。直接缓冲区的主要缺点是,相对于基于堆的缓冲区,它们的分配和释放都较为昂贵。如果你正在处理遗留代码,你也可能会遇到另外一个缺点:因为数据不是在堆上,所以你不得不进行一次复制。 显然,与使用支撑数组相比,这涉及的工作更多。因此,如果事先知道容器中的数据将会被作为数组来访问,你可能更愿意使用堆内存。

ByteBufAllocator

Netty 通过interface ByteBufAllocator分配我们所描述过的任意类型的ByteBuf 实例。

可以通过Channel(每个都可以有一个不同的ByteBufAllocator 实例)或者绑定到ChannelHandler 的ChannelHandlerContext 获取一个到ByteBufAllocator 的引用。

Netty提供了两种ByteBufAllocator的实现:PooledByteBufAllocator和Unpooled-ByteBufAllocator。前者池化了ByteBuf的实例以提高性能并最大限度地减少内存碎片。后者的实现不池化ByteBuf实例,并且在每次它被调用时都会返回一个新的实例。Netty4.1默认使用了PooledByteBufAllocator。

Unpooled 缓冲区

可能某些情况下,你未能获取一个到ByteBufAllocator 的引用。对于这种情况,Netty 提供了一个简单的称为Unpooled 的工具类,它提供了静态的辅助方法来创建未池化的ByteBuf实例。

buffer()  返回一个未池化的基于堆内存存储的ByteBuf

directBuffer()返回一个未池化的基于直接内存存储的ByteBuf

wrappedBuffer() 返回一个包装了给定数据的ByteBuf

copiedBuffer() 返回一个复制了给定数据的ByteBuf

Unpooled 类还使得ByteBuf 同样可用于那些并不需要Netty 的其他组件的非网络项目,使得其能得益于高性能的可扩展的缓冲区API。

随机访问索引/顺序访问索引/读写操作

如同在普通的Java 字节数组中一样,ByteBuf 的索引是从零开始的:第一个字节的索引是0,最后一个字节的索引总是capacity() - 1。使用那些需要一个索引值参数(随机访问,也即是数组下标)的方法(的其中)之一来访问数据既不会改变readerIndex 也不会改变writerIndex。如果有需要,也可以通过调用readerIndex(index)或者writerIndex(index)来手动移动这两者。顺序访问通过索引访问

有两种类别的读/写操作:

n get()和set()操作,从给定的索引开始,并且保持索引不变;get+数据字长(bool.byte,int,short,long,bytes)

n read()和write()操作,从给定的索引开始,并且会根据已经访问过的字节数对索引进行调整。

更多的操作

isReadable() 如果至少有一个字节可供读取,则返回true

isWritable() 如果至少有一个字节可被写入,则返回true

readableBytes() 返回可被读取的字节数

writableBytes() 返回可被写入的字节数

capacity() 返回ByteBuf 可容纳的字节数。在此之后,它会尝试再次扩展直到达到maxCapacity()

maxCapacity() 返回ByteBuf 可以容纳的最大字节数

hasArray() 如果ByteBuf 由一个字节数组支撑,则返回true

array() 如果 ByteBuf 由一个字节数组支撑则返回该数组;否则,它将抛出一个UnsupportedOperationException 异常

可丢弃字节

为可丢弃字节的分段包含了已经被读过的字节。通过调用discardRead-Bytes()方法,可以丢弃它们并回收空间。这个分段的初始大小为0,存储在readerIndex 中,会随着read 操作的执行而增加(get*操作不会移动readerIndex)。缓冲区上调用discardReadBytes()方法后,可丢弃字节分段中的空间已经变为可写的了。频繁地调用discardReadBytes()方法以确保可写分段的最大化,但是请注意,这将极有可能会导致内存复制,因为可读字节必须被移动到缓冲区的开始位置。建议只在有真正需要的时候才这样做,例如,当内存非常宝贵的时候。

可读字节

ByteBuf 的可读字节分段存储了实际数据。新分配的、包装的或者复制的缓冲区的默认的readerIndex 值为0。

可写字节

可写字节分段是指一个拥有未定义内容的、写入就绪的内存区域。新分配的缓冲区的writerIndex 的默认值为0。任何名称以write 开头的操作都将从当前的writerIndex 处开始写数据,并将它增加已经写入的字节数。

索引管理

调用markReaderIndex()、markWriterIndex()、resetWriterIndex()和resetReaderIndex()来标记和重置ByteBuf 的readerIndex 和writerIndex。也可以通过调用readerIndex(int)或者writerIndex(int)来将索引移动到指定位置。试图将任何一个索引设置到一个无效的位置都将导致一个IndexOutOfBoundsException。可以通过调用clear()方法来将readerIndex 和writerIndex 都设置为0。注意,这并不会清除内存中的内容。

查找操作

在ByteBuf中有多种可以用来确定指定值的索引的方法。最简单的是使用indexOf()方法。较复杂的查找可以通过调用forEach Byte()。代码展示了一个查找回车符(\r)的例子。

派生缓冲区

派生缓冲区为ByteBuf 提供了以专门的方式来呈现其内容的视图。这类视图是通过以下方法被创建的:

n duplicate()

n slice()

slice(int, int)

n Unpooled.unmodifiableBuffer(…)

n order(ByteOrder)

n readSlice(int)

每个这些方法都将返回一个新的ByteBuf 实例,它具有自己的读索引、写索引和标记索引。其内部存储和JDK 的ByteBuffer 一样也是共享的。

ByteBuf 复制 如果需要一个现有缓冲区的真实副本,请使用copy()或者copy(int, int)方法。不同于派生缓冲区,由这个调用所返回的ByteBuf 拥有独立的数据副本。

引用计数

引用计数是一种通过在某个对象所持有的资源不再被其他对象引用时释放该对象所持有的资源来优化内存使用和性能的技术。Netty 在第4 版中为ByteBuf引入了引用计数技术, interface ReferenceCounted。

工具类

ByteBufUtil 提供了用于操作ByteBuf 的静态的辅助方法。因为这个API 是通用的,并且和池化无关,所以这些方法已然在分配类的外部实现。这些静态方法中最有价值的可能就是hexdump()方法,它以十六进制的表示形式打印ByteBuf 的内容。这在各种情况下都很有用,例如,出于调试的目的记

录ByteBuf 的内容。十六进制的表示通常会提供一个比字节值的直接表示形式更加有用的日志条目,此外,十六进制的版本还可以很容易地转换回实际的字节表示。另一个有用的方法是boolean equals(ByteBuf, ByteBuf),它被用来判断两个ByteBuf实例的相等性。

TCP粘包半包

什么是TCP粘包半包?

改造程序,客户端发送100遍消息

假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。

(1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

(2)服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;

(3)服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;

(4)服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包。

如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。

TCP粘包/半包发生的原因

由于TCP协议本身的机制(面向连接的可靠地协议-三次握手机制)客户端与服务器会维持一个连接(Channel),数据在连接不断开的情况下,可以持续不断地将多个数据包发往服务器,但是如果发送的网络数据包太小,那么他本身会启用Nagle算法(可配置是否启用)对较小的数据包进行合并(基于此,TCP的网络延迟要UDP的高些)然后再发送(超时或者包大小足够)。那么这样的话,服务器在接收到消息(数据流)的时候就无法区分哪些数据包是客户端自己分开发送的,这样产生了粘包;服务器在接收到数据库后,放到缓冲区中,如果消息没有被及时从缓存区取走,下次在取数据的时候可能就会出现一次取出多个数据包的情况,造成粘包现象

UDP:本身作为无连接的不可靠的传输协议(适合频繁发送较小的数据包),他不会对数据包进行合并发送(也就没有Nagle算法之说了),他直接是一端发送什么数据,直接就发出去了,既然他不会对数据合并,每一个数据包都是完整的(数据+UDP头+IP头等等发一次数据封装一次)也就没有粘包一说了。

分包产生的原因就简单的多:可能是IP分片传输导致的,也可能是传输过程中丢失部分包导致出现的半包,还有可能就是一个包可能被分成了两次传输,在取数据的时候,先取到了一部分(还可能与接收的缓冲区大小有关系),总之就是一个数据包被分成了多次接收。

更具体的原因有三个,分别如下。

1. 应用程序写入数据的字节大小大于套接字发送缓冲区的大小

2. 进行MSS大小的TCP分段。MSS是最大报文段长度的缩写。MSS是TCP报文段中的数据字段的最大长度。数据字段加上TCP首部才等于整个的TCP报文段。所以MSS并不是TCP报文段的最大长度,而是:MSS=TCP报文段长度-TCP首部长度

3. 以太网的payload大于MTU进行IP分片。MTU指:一种通信协议的某一层上面所能通过的最大数据包大小。如果IP层有一个数据包要传,而且数据的长度比链路层的MTU大,那么IP层就会进行分片,把数据包分成托干片,让每一片都不超过MTU。注意,IP分片可以发生在原始发送端主机上,也可以发生在中间路由器上。

解决粘包半包问题

由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

(1)在包尾增加分割符,比如回车换行符进行分割,例如FTP协议;linebase包和delimiter包下,分别使用 LineBasedFrameDecoder和DelimiterBasedFrameDecoder

(2)消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;fixed包下,使用FixedLengthFrameDecoder

(3)将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度,LengthFieldBasedFrameDecoder;。

编解码器框架

什么是编解码器

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。那么它们的区别是什么呢?

如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列—它的数据。那么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。我们前面所学的解决粘包半包的其实也是编解码器框架的一部分。

解码器

将字节解码为消息——ByteToMessageDecoder

n将一种消息类型解码为另一种——MessageToMessageDecoder。

因为解码器是负责将入站数据从一种格式转换到另一种格式的,所以Netty 的解码器实现了ChannelInboundHandler。

什么时候会用到解码器呢?很简单:每当需要为ChannelPipeline 中的下一个Channel-InboundHandler 转换入站数据时会用到。此外,得益于ChannelPipeline 的设计,可以将多个解码器链接在一起,以实现任意复杂的转换逻辑。

将字节解码为消息

抽象类ByteToMessageDecoder

将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于Netty 为它提供了一个抽象的基类:ByteToMessageDecoder。由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理。

它最重要方法

decode(ChannelHandlerContext ctx,ByteBuf in,List<Object> out)

这是你必须实现的唯一抽象方法。decode()方法被调用时将会传入一个包含了传入数据的ByteBuf,以及一个用来添加解码消息的List。对这个方法的调用将会重复进行,直到确定没有新的元素被添加到该List,或者该ByteBuf 中没有更多可读取的字节时为止。然后,如果该List 不为空,那么它的内容将会被传递给ChannelPipeline 中的下一个ChannelInboundHandler。

将一种消息类型解码为另一种

在两个消息格式之间进行转换(例如,从String->Integer)

decode(ChannelHandlerContext ctx,I msg,List<Object> out)

对于每个需要被解码为另一种格式的入站消息来说,该方法都将会被调用。解码消息随后会被传递给ChannelPipeline中的下一个ChannelInboundHandler

MessageToMessageDecoder<T>,T代表源数据的类型

TooLongFrameException

由于Netty 是一个异步框架,所以需要在字节可以解码之前在内存中缓冲它们。因此,不能让解码器缓冲大量的数据以至于耗尽可用的内存。为了解除这个常见的顾虑,Netty 提供了TooLongFrameException 类,其将由解码器在帧超出指定的大小限制时抛出。

为了避免这种情况,你可以设置一个最大字节数的阈值,如果超出该阈值,则会导致抛出一个TooLongFrameException(随后会被ChannelHandler.exceptionCaught()方法捕获)。然后,如何处理该异常则完全取决于该解码器的用户。某些协议(如HTTP)可能允许你返回一个特殊的响应。而在其他的情况下,唯一的选择可能就是关闭对应的连接。

编码器

解码器的功能正好相反。Netty 提供了一组类,用于帮助你编写具有以下功能的编码器:

n 将消息编码为字节;MessageToByteEncoder

n 将消息编码为消息:MessageToMessageEncoder<T>,T代表源数据的类型

将消息编码为字节

encode(ChannelHandlerContext ctx,I msg,ByteBuf out)

encode()方法是你需要实现的唯一抽象方法。它被调用时将会传入要被该类编码为ByteBuf 的(类型为I 的)出站消息。该ByteBuf 随后将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

将消息编码为消息

encode(ChannelHandlerContext ctx,I msg,List<Object> out)

这是你需要实现的唯一方法。每个通过write()方法写入的消息都将会被传递给encode()方法,以编码为一个或者多个出站消息。随后,这些出站消息将会被转发给ChannelPipeline中的下一个ChannelOutboundHandler

编解码器类

我们一直将解码器和编码器作为单独的实体讨论,但是你有时将会发现在同一个类中管理入站和出站数据和消息的转换是很有用的。Netty 的抽象编解码器类正好用于这个目的,因为它们每个都将捆绑一个解码器/编码器对,以处理我们一直在学习的这两种类型的操作。这些类同时实现了ChannelInboundHandler 和ChannelOutboundHandler 接口。

为什么我们并没有一直优先于单独的解码器和编码器使用这些复合类呢?因为通过尽可能地将这两种功能分开,最大化了代码的可重用性和可扩展性,这是Netty 设计的一个基本原则。

相关的类:

抽象类ByteToMessageCodec

抽象类MessageToMessageCodec

Netty内置的编解码器和ChannelHandler

Netty 为许多通用协议提供了编解码器和处理器,几乎可以开箱即用,这减少了你在那些相当繁琐的事务上本来会花费的时间与精力。

通过SSL/TLS 保护Netty 应用程序

SSL和TLS这样的安全协议,它们层叠在其他协议之上,用以实现数据安全。我们在访问安全网站时遇到过这些协议,但是它们也可用于其他不是基于HTTP的应用程序,如安全SMTP(SMTPS)邮件服务器甚至是关系型数据库系统。

为了支持SSL/TLS,Java 提供了javax.net.ssl 包,它的SSLContext 和SSLEngine类使得实现解密和加密相当简单直接。Netty 通过一个名为SslHandler 的ChannelHandler实现利用了这个API,其中SslHandler 在内部使用SSLEngine 来完成实际的工作。

Netty 还提供了使用OpenSSL 工具包(www.openssl.org)的SSLEngine 实现。这个OpenSsl-Engine 类提供了比JDK 提供的SSLEngine 实现更好的性能。

如果OpenSSL库可用,可以将Netty 应用程序(客户端和服务器)配置为默认使用OpenSslEngine。如果不可用,Netty 将会回退到JDK 实现。

在大多数情况下,SslHandler 将是ChannelPipeline 中的第一个ChannelHandler。

HTTP 系列

HTTP 是基于请求/响应模式的:客户端向服务器发送一个HTTP 请求,然后服务器将会返回一个HTTP 响应。Netty 提供了多种编码器和解码器以简化对这个协议的使用。

一个HTTP 请求/响应可能由多个数据部分组成,并且它总是以一个LastHttpContent 部分作为结束。FullHttpRequest 和FullHttpResponse 消息是特殊的子类型,分别代表了完整的请求和响应。所有类型的HTTP 消息(FullHttpRequest、LastHttpContent等等)都实现了HttpObject 接口。

HttpRequestEncoder 将HttpRequest、HttpContent 和LastHttpContent 消息编码为字节

HttpResponseEncoder 将HttpResponse、HttpContent 和LastHttpContent 消息编码为字节

HttpRequestDecoder 将字节解码为HttpRequest、HttpContent 和LastHttpContent 消息

HttpResponseDecoder 将字节解码为HttpResponse、HttpContent 和LastHttpContent 消息

聚合HTTP 消息

由于HTTP 的请求和响应可能由许多部分组成,因此你需要聚合它们以形成完整的消息。为了消除这项繁琐的任务,Netty 提供了一个聚合器,它可以将多个消息部分合并为FullHttpRequest 或者FullHttpResponse 消息。通过这样的方式,你将总是看到完整的消息内容。

HTTP 压缩

当使用HTTP 时,建议开启压缩功能以尽可能多地减小传输数据的大小。虽然压缩会带来一些CPU 时钟周期上的开销,但是通常来说它都是一个好主意,特别是对于文本数据来说。Netty 为压缩和解压缩提供了ChannelHandler 实现,它们同时支持gzip 和deflate 编码。

使用HTTPS

启用HTTPS 只需要将SslHandler 添加到ChannelPipeline 的ChannelHandler 组合中。

通过SSL/TLS 保护Netty 应用程序代码实战

目的

实现一个简单的支持https的服务端以及客户端

代码

服务端

1、HttpServer

package cn.enjoyedu.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;

public class HttpServer {
    public static final int port = 6789; //设置服务端端口
    private static EventLoopGroup group = new NioEventLoopGroup();
    private static ServerBootstrap b = new ServerBootstrap();
    private static final boolean SSL = false;

    public static void main(String[] args) throws Exception {
        final SslContext sslCtx;
        if (SSL) {
            //netty为我们提供的ssl加密,缺省
            SelfSignedCertificate ssc = new SelfSignedCertificate();
            sslCtx = SslContextBuilder.forServer(ssc.certificate(),
                    ssc.privateKey()).build();
        } else {
            sslCtx = null;
        }
        try {
            b.group(group);
            b.channel(NioServerSocketChannel.class);
            b.childHandler(new ServerHandlerInit(sslCtx));
            // 服务器绑定端口监听
            ChannelFuture f = b.bind(port).sync();
            System.out.println("服务端启动成功,端口是:"+port);
            // 监听服务器关闭监听
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

2、ServerHandlerInit

package cn.enjoyedu.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.ssl.SslContext;

public class ServerHandlerInit extends ChannelInitializer<SocketChannel> {

    private final SslContext sslCtx;

    public ServerHandlerInit(SslContext sslCtx) {
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline ph = ch.pipeline();
        if (sslCtx != null) {
            ph.addLast(sslCtx.newHandler(ch.alloc()));
        }
        //http响应编码
        ph.addLast("encode",new HttpResponseEncoder());
        //http请求编码
        ph.addLast("decode",new HttpRequestDecoder());
        //聚合http请求
        ph.addLast("aggre",
                new HttpObjectAggregator(10*1024*1024));
        //启用http压缩
        ph.addLast("compressor",new HttpContentCompressor());
        //自己的业务处理
        ph.addLast("busi",new BusiHandler());

    }
}

3、BusiHandler

package cn.enjoyedu.server;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.*;
import io.netty.util.CharsetUtil;

public class BusiHandler extends ChannelInboundHandlerAdapter {
    private String result="";

    private void send(String content, ChannelHandlerContext ctx,
                      HttpResponseStatus status){
        FullHttpResponse response =
                new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,status,
                        Unpooled.copiedBuffer(content,CharsetUtil.UTF_8));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE,
                "text/plain;charset=UTF-8");
        ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);

    }

    /*
     * 收到消息时,返回信息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        String result="";
        //接收到完成的http请求
        FullHttpRequest httpRequest = (FullHttpRequest)msg;

        try{
            String path = httpRequest.uri();
            String body = httpRequest.content().toString(CharsetUtil.UTF_8);
            HttpMethod method = httpRequest.method();
            if(!"/test".equalsIgnoreCase(path)){
                result = "非法请求:"+path;
                send(result,ctx,HttpResponseStatus.BAD_REQUEST);
                return;
            }

            //处理http GET请求
            if(HttpMethod.GET.equals(method)){
                System.out.println("body:"+body);
                result="Get request,Response="+RespConstant.getNews();
                send(result,ctx,HttpResponseStatus.OK);
            }

            //处理http POST请求
            if(HttpMethod.POST.equals(method)){
                //.....

            }

        }catch(Exception e){
            System.out.println("处理请求失败!");
            e.printStackTrace();
        }finally{
            httpRequest.release();
        }
    }

    /*
     * 建立连接时,返回消息
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
        System.out.println("连接的客户端地址:"
                + ctx.channel().remoteAddress());
    }
}

4、RespConstant

package cn.enjoyedu.server;

import java.util.Random;

public class RespConstant {
    private static final String[] NEWS = {
            "她那时候还太年轻,不知道所有命运赠送的礼物,早已在暗中标好了" +
                    "价格。——斯蒂芬·茨威格《断头皇后》",
            "这是一个最好的时代,也是一个最坏的时代;" +
            "这是一个智慧的年代,这是一个愚蠢的年代;\n" +
            "这是一个信任的时期,这是一个怀疑的时期;" +
            "这是一个光明的季节,这是一个黑暗的季节;\n" +
            "这是希望之春,这是失望之冬;" +
            "人们面前应有尽有,人们面前一无所有;\n" +
            "人们正踏上天堂之路,人们正走向地狱之门。 —— 狄更斯《双城记》",
            };

    private static final Random R = new Random();

    public static String getNews(){
        return NEWS[R.nextInt(NEWS.length)];
    }

}

客户端

1、HttpClient

package cn.enjoyedu.client;

import cn.enjoyedu.server.HttpServer;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.*;

import java.net.URI;

public class HttpClient {
    private static final boolean SSL = false;
    public void connect(String host, int port) throws Exception {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch)
                        throws Exception {
                    ch.pipeline().addLast(new HttpClientCodec());
                    ch.pipeline().addLast("aggre",
                            new HttpObjectAggregator(10*1024*1024));
                    ch.pipeline().addLast("decompressor",new HttpContentDecompressor());
                    ch.pipeline().addLast("busi",new HttpClientInboundHandler());
                }
            });

            // Start the client.
            ChannelFuture f = b.connect(host, port).sync();

            URI uri = new URI("/test");
            String msg = "Hello";
            DefaultFullHttpRequest request =
                    new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,
                    HttpMethod.GET,
                    uri.toASCIIString(),
                    Unpooled.wrappedBuffer(msg.getBytes("UTF-8")));

            // 构建http请求
            request.headers().set(HttpHeaderNames.HOST, host);
            request.headers()
                    .set(HttpHeaderNames.CONNECTION,
                            HttpHeaderValues.KEEP_ALIVE);
            request.headers()
                    .set(HttpHeaderNames.CONTENT_LENGTH,
                            request.content().readableBytes());
            // 发送http请求
            f.channel().write(request);
            f.channel().flush();
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }

    }

    public static void main(String[] args) throws Exception {
        HttpClient client = new HttpClient();
        client.connect("127.0.0.1", HttpServer.port);
    }
}

2、HttpClientInboundHandler

package cn.enjoyedu.client;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.util.CharsetUtil;

public class HttpClientInboundHandler extends ChannelInboundHandlerAdapter {

    public void channelRead(ChannelHandlerContext ctx, Object msg)
            throws Exception {
        //开始对服务器的响应做处理
        FullHttpResponse httpResponse = (FullHttpResponse)msg;
        System.out.println(httpResponse.headers());
        ByteBuf content = httpResponse.content();
        System.out.println(content.toString(CharsetUtil.UTF_8));
        content.release();

    }
}

原文地址:https://www.cnblogs.com/alimayun/p/12514599.html

时间: 2024-10-18 07:22:53

Netty专题(四)-----Bootstrap、ByteBuf、编码解码的相关文章

理解netty对protocol buffers的编码解码

一,netty+protocol buffers简要说明 Netty是业界最流行的NIO框架之一优点:1)API使用简单,开发门槛低:2)功能强大,预置了多种编解码功能,支持多种主流协议:3)定制能力强,可以通过ChannelHandler对通信框架进行灵活的扩展:4)性能高,通过与其它业界主流的NIO框架对比,Netty的综合性能最优:5)成熟.稳定,Netty修复了已经发现的所有JDK NIO BUG,业务开发人员不需要再为NIO的BUG而烦恼:6)社区活跃,版本迭代周期短,发现的BUG可以

网络编程 -- RPC实现原理 -- Netty -- 迭代版本V3 -- 编码解码

网络编程 -- RPC实现原理 -- 目录 啦啦啦 V2--Netty -- pipeline.addLast(io.netty.handler.codec.MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>) 覆写编码解码方法. pipeline相当于拦截器.在pipeline中添加MessageToMessageCodec接口的实现类,该接口的实现类中的encode()方法自动将发送的Object对象转换为ByteBuf,decode()方法自动将

一个低级错误引发Netty编码解码中文异常

前言 最近在调研Netty的使用,在编写编码解码模块的时候遇到了一个中文字符串编码和解码异常的情况,后来发现是笔者犯了个低级错误.这里做一个小小的回顾. 错误重现 在设计Netty的自定义协议的时候,发现了字符串类型的属性,一旦出现中文就会出现解码异常的现象,这个异常并不一定出现了Exception,而是出现了解码之后字符截断出现了人类不可读的字符.编码和解码器的实现如下: // 实体 @Data public class ChineseMessage implements Serializab

Netty(7)源码-ByteBuf

一.ByteBuf工作原理 1. ByteBuf是ByteBuffer的升级版: jdk中常用的是ByteBuffer,从功能角度上,ByteBuffer可以完全满足需要,但是有以下缺点: ByteBuffer一旦分配完成,长度固定,不能动态扩展和收缩,当需要编码的POJO对象大于分配容量时发生索引越界异常 ByteBuffer只要一个标识位置的指针postion,读写切换比较麻烦,flip rewind等操作 功能有限 ByteBuf依然是Byte数组缓冲区,拥有ByteBuffer的一切功能

[C语言]Base64编码解码

Base64编码解码 一,Base64编码原理 Base64编码的字符数组如下所示 : ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/ 字符串转Base64编码:取3字节的字符串转换为四字节的字符串,依次往后转换.得到Base64编码字符串.具体原理如下: 1,如果需要编码的原串字节数刚好为3的倍数,那么转换规则如下: 以中文字符'严'为例,'严'字的UTF-8编码为:0xE4B8A5 = 11100100  10

Bootstrap HTML编码规范总结

摘自: http://www.runoob.com,纯粹是为了加强Bootstrap框架的学习 语法: 1.用两个空格来代替制表符tab--为了在不同的浏览器和系统作用下展现一致. 2.嵌套元素应当缩进一次(即2个空格): 3.对属性的定义全部用双引号,绝不用单引号. 4.不要在自闭合(self-closing)因素的尾部添加斜线,这个基于HTML5的规范 5.不要省略可选的结束标签(closing tag)如</li> or </body>  第一.  HTML5 DOCTYPE

java中文乱码解决之道(五)—–java是如何编码解码的

原文出处:http://cmsblogs.com/?p=1491 在上篇博客中LZ阐述了java各个渠道转码的过程,阐述了java在运行过程中那些步骤在进行转码,在这些转码过程中如果一处出现问题就很有可能会产生乱码!下面LZ就讲述java在转码过程中是如何来进行编码和解码操作的. 编码&解码 在上篇博客中LZ阐述了三个渠道的编码转换过程,下面LZ将结束java在那些场合需要进行编码和解码操作,并详序中间的过程,进一步掌握java的编码和解码过程.在java中主要有四个场景需要进行编码解码操作:

嵌入式专题: S5PV210 - H264硬件解码(MFC)

先说一下编码的例子好像找不到了,只提供一下解码的例子吧.淡疼的三星要是能以YUV420P为基本图像格式就好了,这样结合FFmpeg来开发,各种应用都比较方法.再设计一个RGB/YUV硬件转码单元,最好. #include <stdio.h> #include <string.h> #include <stdlib.h> #include <unistd.h> #include "../mfc/SsbSipMfcApi.h" #includ

条形码和二维码编码解码工具类源码

有一个好的工具,会让你的开发事半功倍.再将讲这个工具类之前,我先给小白补充一点条形码和二维码(以下基础只是选自,我本科阶段的一本教材:<物联网导论>(刘云浩 编著).有对物联网感兴趣的,可以看看这本书),我们要内外兼修,你说是不是这么个理呢! 多行组成的条形码,不需要连接一个数据库,本身可存储大量数据,应用于:医院.驾驶证.物料管理.货物运输,当条形码受一定破坏时,错误纠正能使条形码能正确解码.二维码,是一个多行.连续 性.可变长.包含大量数据的符号标识.每个条形码有3 - 90行,每一行有一