netty io.netty.channel介绍2

Interface ChannelHandlerContext

上下文对象使得当前channelhandler可以与其所属的channelpipeline以及其他handler进行交互,可以通知所属channelpipeline中的下一个handler,也可动态修改其所属的channelpipeline,具体功能如下:

通知。通过调用channelhandlercontext提供的方法可以调用同一个channelpipeline中的相邻的下一个channelhandler,详情可以参照channelpipeline来了解事件使如何流动的。

修改pipeline  通过调用context的pipeline()方法可以得到当前handler所属的channelpipeline。可以在程序运行时动态增加、删除、替换channelpipeline中的handler。

保持供后面使用 你可以将context对象存储在handler中,共后面使用。比如在handler提供的方法之外触发一个事件,甚至在不同的线程中都可以。实例代码如下:

 public class MyHandler extends ChannelHandlerAdapter {     
     private ChannelHandlerContext ctx;//成员变量

     public void beforeAdd(ChannelHandlerContext ctx) {         
         this.ctx = ctx;//此处保持该ctx
     }

     public void login(String username, password) {//在自定义方法中触发事件
         ctx.write(new LoginMessage(username, password));
     }
     ...
 }

  存储状态信息 AttributeMap.attr(AttributeKey)方法允许你存储和访问与handler和context相关的状态信息。

  一个handler可以有多个context  由于同一个handler实例可以被添加到多个pipeline中,这也就意味着一个handler可以有一个或多个context对象,因此同一个handler实例如果被添加到一个或多个pipeline一次或者多次,当其handle方法被调用的时候,传进来的参数context是不同的。一个handler被添加到pipeline多少次,他就有多少各context对象,不是见到同一个pipeline还是不同的pipeline,示例代码如下:

 @Sharable
 public class FactorialHandler extends ChannelHandlerAdapter {

   private final AttributeKey<Integer> counter = AttributeKey.valueOf("counter");

   // This handler will receive a sequence of increasing integers starting
   // from 1.   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) {     
     Attribute<Integer> attr = ctx.getAttr(counter);
     Integer a = ctx.getAttr(counter).get();

     if (a == null) {
       a = 1;
     }

     attr.set(a * (Integer) msg);
   }
 }

 // Different context objects are given to "f1", "f2", "f3", and "f4" even if
 // they refer to the same handler instance.  Because the FactorialHandler
 // stores its state in a context object (using an AttributeKey), the factorial is
 // calculated correctly 4 times once the two pipelines (p1 and p2) are active.
 FactorialHandler fh = new FactorialHandler(); 
 ChannelPipeline p1 = Channels.pipeline();
 p1.addLast("f1", fh);//一个context
 p1.addLast("f2", fh); //二个context
 ChannelPipeline p2 = Channels.pipeline();
 p2.addLast("f3", fh);//三个context
 p2.addLast("f4", fh);//四个context

Interface ChannelHandlerInvoker

此接口调用ChannelHandler的事件处理方法,如果此接口的默认实现类不满足要求,用户可以指定一个ChannelHandlerInvoker实现一个自定义线程模型,注意这个接口的方法不是给用户调用的。

Interface ChannelId

表示一个channel全局唯一的标识符。标识符是根据下列资源产生的:MAC地址,当前进程ID,System.currentTimeMillis(),System.nanoTime(),一个随机的32-bit Integer和一个顺序增长的32-bit整数。标识符的全局唯一性有赖于MAC地址和进程ID,系统会在类加载的时候自动检测获取,如果这两个获取失败,系统会记录警告信息,同时将使用随机数代替作为标识符,另外,你可以通过系统属性人工指定这两个参数:io.netty.machineId 48bit或者64bit十六进制表示的整数,可以通过冒号后者连字符分割,io.netty.processId 0到65535之间的一个整数。

Interface ChannelPipeline

channelpipeline由一系列channelhandler组成,用来处理channel的inbound events 和 outbound operations。channelpipeline实现了拦截过滤器模式(Intercepting Filter pattern)的一种高级形式。以使用户能够完全控制一个时间(event)怎样被处理,以及channelpipeline中的channelhandler之间如何交互。

pipeline的创建  每一个channel都一个自己的pipeline,channel创建的时候pipeline自动创建

   事件(event)如何在pipeline中流动 下图展示了一个IO事件被channelpipeline中的channelhandler处理的典型方式,一个IO事件被当前handler处理之后,将被转发到当前handler在channelpipeline中的紧邻的下一个handler处理。如果有必要的话,channelhandler本身也可以触发任意的IO事件。为了触发或者转发IO事件,channelhandler需要调用channelhandlercontext提供的事件传播方法,例如ChannelHandlerContext.fireChannelRead(Object)ChannelHandlerContext.write(Object).

                                                 I/O Request
                                            via Channel or                                        ChannelHandlerContext
                                                      |
  +---------------------------------------------------+---------------+
  |                           ChannelPipeline         |               |
  |                                                  \|/              |
  |    +----------------------------------------------+----------+    |
  |    |                   ChannelHandler  N                     |    |
  |    +----------+-----------------------------------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+-----------------------------------+----------+    |
  |    |                   ChannelHandler N-1                    |    |
  |    +----------+-----------------------------------+----------+    |
  |              /|\                                  .               |
  |               .                                   .               |
  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
  |          [method call]                      [method call]         |
  |               .                                   .               |
  |               .                                  \|/              |
  |    +----------+-----------------------------------+----------+    |
  |    |                   ChannelHandler  2                     |    |
  |    +----------+-----------------------------------+----------+    |
  |              /|\                                  |               |
  |               |                                  \|/              |
  |    +----------+-----------------------------------+----------+    |
  |    |                   ChannelHandler  1                     |    |
  |    +----------+-----------------------------------+----------+    |
  |              /|\                                  |               |
  +---------------+-----------------------------------+---------------+
                  |                                  \|/
  +---------------+-----------------------------------+---------------+
  |               |                                   |               |
  |       [ Socket.read() ]                    [ Socket.write() ]     |
  |                                                                   |
  |  Netty Internal I/O Threads (Transport Implementation)            |
  +-------------------------------------------------------------------+

如上图左边所示,一个inbound事件自底向上的被channelhandler处理,一个inbound事件通常是由IO线程触发(图底部),以便在channel状态发生变化(例如新建链接或者关闭连接)或者从远端读取数据的时候通知channelhandler。如果inbound事件超过了图中最顶端的channelhandler,该事件会被丢弃,并被记录,取决于日志级别。

如上图右边所示,一个outbound事件自顶向下的被channelhandler处理,一个outbound事件通常是由用户的代码调用IO操作触发,例如写请求或者尝试进行链接。如果一个outbound事件超过了最底端的channelhandler,改事件会被与channel关联的IO线程处理,IO线程通常情况下执行实际的输出操作,比如SocketChannel.write(ByteBuffer).

转发事件给下一个handler 就像上面解释的那样,channelhandler通过调用channelhandlercontext的事件传播方法将事件转发给下一个channelhandler,具体方式如下:

下面的代码展示了事件传播的常用做法:

 public class MyInboundHandler extends ChannelHandlerAdapter {     
     @Override
     public void channelActive(ChannelHandlerContext ctx) {
         System.out.println("Connected!");
         ctx.fireChannelActive();//传播事件
     }
 }

 public clas MyOutboundHandler extends ChannelHandlerAdapter {    
     @Override
     public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
         System.out.println("Closing ..");
         ctx.close(promise);//传播事件
     }
 }

    构建pipeline 用户在pipeline中应该有一个或者多个channelhandler来接收IO事件(例如 read)或者请求IO操作(例如write  close)。例如,一个典型的server在每一个channelpipeline中通常有如下channelhandler,但是具体细节可能不一样,这取决于你的协议和业务逻辑的特性和复杂度:

  1. Protocol Decoder - translates binary data (e.g. ByteBuf) into a Java object.解码,将二进制数据转换成java对象
  2. Protocol Encoder - translates a Java object into binary data. 编码,讲java对象转换成二进制数据
  3. Business Logic Handler - performs the actual business logic (e.g. database access).业务逻辑处理(例如数据库存储)

组装pipeline的示例代码如下:

 static final EventExecutorGroup group = new DefaultEventExecutorGroup(16);
 ... 
 ChannelPipeline pipeline = ch.pipeline();

 pipeline.addLast("decoder", new MyProtocolDecoder());
 pipeline.addLast("encoder", new MyProtocolEncoder());

 // Tell the pipeline to run MyBusinessLogicHandler‘s event handler methods
 // in a different thread than an I/O thread so that the I/O thread is not blocked by
 // a time-consuming task.
 // If your business logic is fully asynchronous or finished very quickly, you don‘t
 // need to specify a group.如果业务逻辑是一个耗时任务或者会阻塞,请不要放在IO线程执行。
 pipeline.addLast(group, "handler", new MyBusinessLogicHandler());

    线程安全 channelpipeline中的channelhandler可以在任何时候添加删除,因为channelpipeline是线程安全的。例如你可以在发送敏感信息的时候添加一个加密的handler,在敏感信息发送之后删除这个handler。

Interface ChannelProgressiveFuture

此接口表示一个特殊的channelfuture对象,用来展示FileRegion transfer 的进度

Interface ChannelProgressiveFutureListener

An EventListener listener which will be called once the sending task associated with future is being transferred.

Interface ChannelProgressivePromise

Special ChannelPromise which will be notified once the associated bytes is transferring.

Interface ChannelPromise

Special ChannelFuture which is writable.

Interface EventLoop

一旦channel注册之后,该接口处理所有关于该channel的IO操作,一个EventLoop通常情况下处理一个或多个channel,但是这取决于内部实现细节

Interface EventLoopGroup

他允许注册channel,并在后面eventloop中处理channel

Interface FileRegion

表示通过channel发送的一个文件区块,可以支持zero-copy file transfer.

Interface MessageSizeEstimator

此接口负责计消息的大小,也就是消息在内存中占多大空间。

Interface RecvByteBufAllocator

负责创建一个接收缓冲区,其大小适中,能够存储inbound data 又不浪费空间

Interface ServerChannel

负责接接入的链接请求,并创建一个子channel来接收链接,具体实现可以参照ServerSocketChannel

时间: 2024-10-07 09:11:34

netty io.netty.channel介绍2的相关文章

netty io.netty.buffer简介

io.netty.util.ReferenceCounted 此接口代表一个引用计数的对象,此对象需要显示的释放. 当一个ReferenceCounted对象被实例化的时候,该对象的引用数量就是1,调用retain()方法会增加引用数量,调用 release() 方法会减少引用数量,如果引用数量减少到0,该对象就需要显示释放掉.访问释放掉的对象通常会导致访问冲突. 如果实现ReferenceCounted接口的对象是一个包含同样实现ReferenceCounted接口的对象的容器.当容器的引用数

netty io.netty.channel 简介1

Interface AddressedEnvelope<M,A extends SocketAddress> 此接口将一个消息.发送地址和接收地址封装到了一起 Interface Channel 此接口表示到网络socket或者组件(component)的一个连接,其提供了IO操作的一些功能,比如read, write, connect, and bind.一个channel可以给用户提供如下功能:1.当前channel的状态(open.connected等).2.channel的配置参数(如

填坑netty io.netty.util.internal.OutOfDirectMemoryError

我们有个与外部交互的接口是采用netty http,具体版本netty-4.1.18,为什么使用这个版本,我也不知道,历史原因. 由于netty都是异步请求,所以与外部交互总有些唯一的业务标识需要保存,以便前后数据可以勾兑. 这里先说明下,netty里的ByteBuf在读取channelRead未进行写write操作时,需要自己释放release.这和本次Error关系不大,继续说重点. 查看日志,首先发现了OutOfDirectMemoryError错误,这个错误也是间断性的出现,显然是内存不

netty报错:io.netty.channel.ChannelPipelineException

1.九月 23, 2018 8:35:02 下午 io.netty.channel.ChannelInitializer channelRegistered警告: Failed to initialize a channel. Closing: [id: 0xa09c718b, /127.0.0.1:50509 => /127.0.0.1:9999]io.netty.channel.ChannelPipelineException: com.sxt.netty.first.Server4Hell

io.netty.resolver.dns.DnsNameResolverContext

java.net.UnknownHostException: failed to resolve 'xxx.com' after 3 queries at io.netty.resolver.dns.DnsNameResolverContext.finishResolve(DnsNameResolverContext.java:699) at io.netty.resolver.dns.DnsNameResolverContext.tryToFinishResolve(DnsNameResolv

Java io.netty.util.ReferenceCountUtil 代码实例

原文:https://www.helplib.com/Java_API_Classes/article_64580 以下是展示如何使用io.netty.util.ReferenceCountUtil的最佳示例. 我们使用了代码质量辨别算法从开源项目中提取出了最佳的优秀示例. 实例 1 复制代码 private static void testPerformOpeningHandshake0(boolean subProtocol) { EmbeddedChannel ch = new Embed

记一次netty版本冲突,报java.lang.NoSuchMethodError: io.netty.util.internal.ObjectUtil.checkPositive的问题

elasticsearch 5.6中使用TransportClient初始化抛异常 在引入elasticsearch5.6的transportclient包中,会引入netty进行通信. <!-- transport客户端 --> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version

Netty:Netty 3.x 线程模型

众所周知,Netty使用了主从Reactor模式来完成CONNECT.ACCEPT.READ.WRITE操作.所以这里就从Reactor角度来分析一下Netty3的线程模型. Parent-Reactor 服务端Parent-Reactor设计 客户端Parent-Reactor设计 Sub-Reactor IO Worker设计 Parent-Reactor 服务端Parent-Reactor设计 在了解了Netty 3的源码后,对它的服务端的线程模型做了简单的总结 : Boss Execut

Netty学习——Netty和Protobuf的整合(一)

Netty学习——Netty和Protobuf的整合 Protobuf作为序列化的工具,将序列化后的数据,通过Netty来进行在网络上的传输 1.将proto文件里的java包的位置修改一下,然后再执行一下protoc 异常捕获:启动服务器端正常,在启动客户端的时候,发送消息,报错 警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the l