[netty4][netty-common]Future与Promise分析

接口与类结构体系

-- [I]java.util.concurrent.Future<V>
---- [I]io.netty.util.concurrent.Future<V>
------ [AC]AbstractFuture, [I]ChannelFuture, [I]Promise 

-- [AC]AbstractFuture, [I]Promise         -- [I]ChannelFuture, [I]Promise
---- DefaultPromise                       ---- [I]ChannelPromise

-- [I]ChannelPromise, [I]FlushCheckpoint, [C]DefaultPromise
---- [C]DefaultChannelPromise

JDK的Future提供特性

  1. 是否完成
  2. 是否取消
  3. 结果获取
  4. 取消执行

netty的Future增加的特性

  1. 是否成功(完成后的结果,完成不一定成功)
  2. 是否能被取消
  3. 如果失败时的异常获取
  4. 支持监听器,监听操作完成的回调
  5. sync 阻塞等待直至完成 // 跟get有什么区别?A: 只阻塞,不取结果,在一些实现逻辑中而有是否死锁等检查。
  6. await 阻塞等待直至完成 // 跟get有什么区别?跟sync有什么区别?A: 在一些实现逻辑中多了在调完await之后再调用rethrowIfFailed(字面意思)。
  7. getNow,非阻塞获取结果(可以理解成JDK Future的是否完成和结果获取的组合)
  8. 重新覆写cancel行为定义

Promise增加的特性

整体定位是一个支持可写的Future,可以理解成:可以通过API设置结果的成功还是失败。对应netty的Future的特性1。

  1. 设置是否成功的结果,并触发监听器operationComplete事件。操作失败抛异常
  2. 尝试设置是否成功的结果, 并触发监听器operationComplete事件。操作失返回false
  3. 设置是否失败的结果,并触发监听器operationComplete事件。操作失败抛异常
  4. 尝试设置是否失败的结果, 并触发监听器operationComplete事件。操作失返回false
  5. 设置是否可以取消
  6. 覆写返回Future的方法签名为返回Promise

注意: 是否可以取消,是否成功等,在DefaultPromise实现中,是用一个result字段来实现的。并且用AtomicReferenceFieldUpdater结合volatile来完成在并发情况下字段的正确设置值。

ChannelPromise增加的特性
  1. 覆写返回ChannelFuture的方法签名为返回ChannelPromise
  2. unvoid行为(如果是void的则返回新的ChannelPromise,否则返回当前实例)
AbstractFuture完成的逻辑

完成get的实现逻辑,或者说定义的行为顺序,包含超时的get与一直等的get

               +-----------+
               |   await   |
               +-----+-----+
                     |
                     |
            +--------v-------+
            |  get cause     |
       +----+ cause == null  +---+
       |    +----------------+   |
       |                         |
       |                         |
+------v------+           +------v------+
| throw exp   |           |  getNow     |
+-------------+           +-------------+

DefaultPromise完成的逻辑

实现是线程安全的

  1. 实现监听器添加、删除与触发逻辑。引入EventExecutor实例,一对一。 用于触发监听器时使用。触发监听器逻辑有栈深度保护策略。
  2. 通过volatile Object result字段完成是否成功,是否取消的状态描述。
  3. 实现设置(含尝试)成功,设置失败(含尝试),设置不可取消的逻辑
  4. 实现是否成功,是否取消的判断逻辑
  5. 异常的获取,结果的获取
  6. await逻辑的实现。依靠Object的wait方法实现。同时用short waiters来描述wait的次数。
  7. 获取执行结果。执行结果也是volatile Object result字段承载。
  8. 取消逻辑实现。设置result字段为CANCELLATION_CAUSE_HOLDER。notifyAll wait。notify所有监听器。
  9. 是否取消的判断逻辑实现。比对result字段值,比对cause字段的异常类型
  10. 是否完成的判断逻辑实现。与是否取消逻辑类似。
  11. sync逻辑实现。在调完await之后再调用rethrowIfFailed(字面意思)。
  12. 死锁检测逻辑实现。如果executor在eventLoop则死锁(executor.inEventLoop)。死锁扔异常BlockingOperationException

await逻辑

                  +----------------+
       +----Y-----+     isDone     |
  +----v---+      +----------------+
  | return |              N
  +------+-+              |
         ^        +-------v--------+
         +----Y---+  interrupted   |
                  +----------------+
                          N
                          |
+----------+      +-------v--------+
| throw exp<--Y---+  checkDeadLock |
+----------+      +----------------+
                          N
                          +---------synchronized----------+
                          |                               |
                          |                               |
                  +-------v--------+                      |
       while-loop-+   !isDone      +-----------+          |
          |       +----------------+   +-------v------+   |
          |                            |  incWaiters  |   |
          |                            +-------+------+   |
          |       +----------------+           |          |
          |       |     wait       <-----------+          |
          |       +-------+--------+                      |
          |               |            +--------------+   |
          |               +------------> decWaiters   |   |
          |                            +--------------+   |
          +-------------------------------------------+   |
                                                          |
                          +-------------------------------+

如果有其他人做了notify 但是此时任务还没有done,那么则会继续wait,因为这是一个while loop!

触发监听器逻辑有栈深度保护策略
前提是在同一个线程上,如果不是同一个线程就没有意义了。所以要判断executor.inEventLoop()。
在UnpaddedInternalThreadLocalMap中有个字段int futureListenerStackDepth字段来维护FutureListener的栈深度。
在同一个线程上,做notifyListener0之前会将futureListenerStackDepth加1,做完之后恢复原值。
这样如果在同一个线程递归调用到notifyListener0即notifyListener则会触发触发监听器逻辑有栈深度保护策略。
栈深度保护阈值默认是8,可以通过io.netty.defaultPromise.maxListenerStackDepth系统参数配置。

关于Promise的监听器
监听器是用listeners字段,这个字段是Object类型,竟然没有给一个明确的类型。在逻辑实现中有DefaultFutureListeners、GenericProgressiveFutureListener与GenericFutureListener等。
里面包了一个GenericFutureListener数组,达成一个复合的(列表型的)Listener。
GenericProgressiveFutureListener在netty自身里面没有用到具体实现。

安全执行任务的包装

   private static void safeExecute(EventExecutor executor, Runnable task) {
        try {
            executor.execute(task);
        } catch (Throwable t) {
            rejectedExecutionLogger.error("Failed to submit a listener notification task. Event loop shut down?", t);
        }
    }

注意: rejectedExecutionLogger 单独的日志名称,所以可以单独配置。

原文地址:https://www.cnblogs.com/simoncook/p/10924578.html

时间: 2024-08-04 05:58:28

[netty4][netty-common]Future与Promise分析的相关文章

Netty5源码分析(七) -- 异步执行Future和Promise

java.util.concurrent.Future是Java提供的接口,表示异步执行的状态,Future的get方法会判断任务是否执行完成,如果完成就返回结果,否则阻塞线程,直到任务完成. // Java FutureTask.get() public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); r

Future和Promise

Future用于获取异步操作的结果,而Promise则比较抽象,无法直接猜测出其功能. Future Future最早来源于JDK的java.util.concurrent.Future,它用于代表异步操作的结果. 可以通过get方法获取操作结果,如果操作尚未完成,则会同步阻塞当前调用的线程:如果不允许阻塞太长时间或者无限期阻塞,可以通过带超时时间的get方法获取结果:如果到达超时时间操作仍然没有完成,则抛出TimeoutException.通过isDone()方法可以判断当前的异步操作是否完成

future and promise

Future and Promise Future is a sort of placeholder object that you create for a result that does not exist. Generally, the result of the Future is computed concurrently and can be later collected. Composing concurrent tasks in this way tends to faste

7. Netty源码分析之Future和Promise

一.Future Future源自java.util.concurrent.Future,用于获取异步操作的结果,它通过get()方法获取异步操作结果,操作尚未完成,则阻塞. Netty认为这是一个很不好的设计,操作结束时间难以确定,何不通过回调的方式获取结果呢.Netty的ChannelFuture通过监听的方式,当操作结束时调用注册在上面的方法获取操作结果. Future的方法定义: public interface Future<V> extends java.util.concurre

netty 5 alph1源码分析(服务端创建过程)

参照<Netty系列之Netty 服务端创建>,研究了netty的服务端创建过程.至于netty的优势,可以参照网络其他文章.<Netty系列之Netty 服务端创建>是 李林锋撰写的netty源码分析的一篇好文,绝对是技术干货.但抛开技术来说,也存在一些瑕疵. 缺点如下 代码衔接不连贯,上下不连贯. 代码片段是截图,对阅读代理不便(可能和阅读习惯有关) 本篇主要内容,参照<Netty系列之Netty 服务端创建>,梳理出自己喜欢的阅读风格. 1.整体逻辑图 整体将服务

高性能NIO通信框架之Netty(4)ChannelHandler分析

一.ChannelHandler的功能说明 ChannelHandler类似Servlet的Filter过滤器,负责对I/O事件或者I/O操作进行拦截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递. ChannelHandler支持注解,目前就支持两种注解: 1)@Sharable :多个ChannelPipeline公用一个ChannelHandler 2)@Skip:被Skip注解的方法不会被调用,直接被忽略 @Target({ElementType.METHO

(转)netty、mina性能对比分析

转自: http://blog.csdn.net/mindfloating/article/details/8622930 流行 NIO Framework netty 和 mina 性能测评与分析 测试方法 采用 mina 和 netty 各实现一个 基于 nio 的EchoServer,测试在不同大小网络报文下的性能表现 测试环境 客户端-服务端: model name: Intel(R) Core(TM) i5-2320 CPU @ 3.00GHz cache size: 6144 KB

Netty轻量级对象池实现分析

什么是对象池技术?对象池应用在哪些地方? 对象池其实就是缓存一些对象从而避免大量创建同一个类型的对象,类似线程池的概念.对象池缓存了一些已经创建好的对象,避免需要时才创建对象,同时限制了实例的个数.池化技术最终要的就是重复的使用池内已经创建的对象.从上面的内容就可以看出对象池适用于以下几个场景: 创建对象的开销大 会创建大量的实例 限制一些资源的使用 如果创建一个对象的开销特别大,那么提前创建一些可以使用的并且缓存起来(池化技术就是重复使用对象,提前创建并缓存起来重复使用就是池化)可以降低创建对

Netty服务端的业务流程分析

Netty的服务端怎么和java NIO联系起来的,一直很好奇这块内容,这里跟下代码,下篇文章看下Channel相关的知识. final ChannelFuture initAndRegister() { final Channel channel = channelFactory().newChannel(); // try { init(channel); } catch (Throwable t) { channel.unsafe().closeForcibly(); //立即关闭通道且不