Netty中的Future

先看下Future的整个继承体系,还有一个ChannelFuture不在里面;

在并发编程中,我们通常会用到一组非阻塞的模型:Promise,Future 和 Callback。其中的 Future 表示一个可能还没有实际完成的异步任务的结果,针对这个结果可以添加 Callback 以便在任务执行成功或失败后做出对应的操作,而 Promise 交由任务执行者,任务执行者通过 Promise 可以标记任务完成或者失败。 可以说这一套模型是很多异步非阻塞架构的基础。

这一套经典的模型在 Scala、C# 中得到了原生的支持,但 JDK 中暂时还只有无 Callback 的 Future 出现,当然也并非在 JAVA 界就没有发展了,比如 Guava 就提供了ListenableFuture 接口,而 Netty 4+ 更是提供了完整的 Promise、Future 和 Listener 机制,在 Netty 的官方文档 Using as a generic library 中也介绍了将 Netty 作为一个 lib 包依赖,并且使用 Listenable futures 的示例。在实际的项目使用中,发现 Netty 的 EventLoop 机制不一定适用其他场景,因此想去除对 EventLoop 的依赖,实现一个简化版本。

Netty自己实现了一套并发库,Future是其中的一块,下篇文章讲下他的并发库的线程池实现。Netty的Future的特性

  • Future<V>的V为异步结果的返回类型
  • getNow 是无阻塞调用,返回异步执行结果,如果未完成那么返回null
  • await 是阻塞调用,等到异步执行完成
  • isSuccess 执行成功是否成功
  • sync  阻塞调用,等待这个future直到isDone(可能由于正常终止、异常或取消而完成)返回true; 如果该future失败,重新抛出失败的原因。 和await区别就是返回结果不同,它返回一个Future对象,通过这个Future知道任务执行结果。
  • 添加GenericFutureListener, 执行完成(future可能由于正常终止、异常或取消而完成)后调用该监听器。

如果我实现这个Future怎么实现:1:任务执行器,这样我们可以控制任务的执行了。2:执行结果,用于返回。3:监听器集合。4:执行结果状态

5:触发监听器的函数。那么怎么执行完成后自定触发监听器,应该在Future里面又启动了一个线程去执行这个触发机制。这些都是我的猜想。看下他的子类怎么实现的:

AbstractFuture实现了JDK中的get方法,调用netty中future的方法,一个模板模式出现了。所有的Future都会继承该类

Promise:任务执行者可以标记任务执行成功或失败,添加了setFailure(Throwable)可以知道Promise的子类需要有个成员变量来保存异常,添加了setSuccess(V) 方法,setUncancellable()方法, 和tryFailure和trySuccess方法。这样所有的操作都可以返回一个Promise,你自己检测是否执行成功,好处是啥,接口统一吗?

ScheduledFuture:一个定时任务的执行结果

ProgressiveFuture:可以跟踪任务的执行进度。

下面看下几个类的具体实现:

DefaultPromise:成员变量如下:

  1. private final EventExecutor executor; //任务执行器
  2. private volatile Object result; //不仅仅是结果,也有可能是异常
  3. * 一个或多个监听器,可能是GenericFutureListener或者DefaultFutureListeners。如果是NULL有两种可能* 1:没有添加触发器* 2:已经出发了

  4. private Object listeners;
  5. private LateListeners lateListeners;
  6. private short waiters;

看来异常也是结果。

看一个重要方法的实现:

isDone:可能由于正常终止、异常或取消而完成

  1. private static boolean isDone0(Object result) {
  2. return result != null && result != UNCANCELLABLE;
  3. }

isSuccess: 任务执行成功

  1. public boolean isSuccess() {
  2. Object result = this.result;
  3. if (result == null || result == UNCANCELLABLE) {
  4. return false;
  5. }
  6. return !(result instanceof CauseHolder);
  7. }

getNow:返回执行结果

  1. public V getNow() {
  2. Object result = this.result;
  3. if (result instanceof CauseHolder || result == SUCCESS) {
  4. return null;
  5. }
  6. return (V) result;
  7. }

sync:同步阻塞方法

  1. @Override
  2. public Promise<V> sync() throws InterruptedException {
  3. await();
  4. rethrowIfFailed();
  5. return this;
  6. }

看来比await多了抛出异常。

await:等待任务执行完成

  1. @Override
  2. public Promise<V> await() throws InterruptedException {
  3. if (isDone()) {
  4. return this;
  5. }
  6. if (Thread.interrupted()) {
  7. throw new InterruptedException(toString());
  8. }
  9. synchronized (this) {
  10. while (!isDone()) {
  11. checkDeadLock(); //判断当前线程是否是执行线程。如果是抛出异常。
  12. incWaiters(); //添加等待个数
  13. try {
  14. wait(); //释放锁,等待唤醒,阻塞该线程
  15. } finally {
  16. decWaiters();
  17. }
  18. }
  19. }
  20. return this;
  21. }

执行器所在的线程不能调用await(),只能是调用者所在的线程才可以,waiters有什么用呢?

cancle方法使用:

  1. @Override
  2. public boolean cancel(boolean mayInterruptIfRunning) {
  3. Object result = this.result;
  4. if (isDone0(result) || result == UNCANCELLABLE) {
  5. return false;
  6. }
  7. synchronized (this) {
  8. // Allow only once.
  9. result = this.result;
  10. if (isDone0(result) || result == UNCANCELLABLE) {
  11. return false;
  12. }
  13. this.result = CANCELLATION_CAUSE_HOLDER;
  14. if (hasWaiters()) {
  15. notifyAll();
  16. }
  17. }
  18. notifyListeners();
  19. return true;
  20. }

当我们修改DefaultPromise的状态时,要触发监听器。

notifyListeners:

  1. /**
  2. * 该方法不需要异步,为啥呢
  3. * 1:这个方法在同步代码块里面调用,因此任何监听器列表的改变都happens-before该方法
  4. * 2:该方法只有isDone==true的时候调用,一但 isDone==true 那么监听器列表将不会改变
  5. */
  6. private void notifyListeners() {
  7. Object listeners = this.listeners;
  8. if (listeners == null) {
  9. return;
  10. }
  11. EventExecutor executor = executor();
  12. if (executor.inEventLoop()) {
  13. final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
  14. final int stackDepth = threadLocals.futureListenerStackDepth();
  15. if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
  16. threadLocals.setFutureListenerStackDepth(stackDepth + 1);
  17. try {
  18. if (listeners instanceof DefaultFutureListeners) {
  19. notifyListeners0(this, (DefaultFutureListeners) listeners);
  20. } else {
  21. final GenericFutureListener<? extends Future<V>> l =
  22. (GenericFutureListener<? extends Future<V>>) listeners;
  23. notifyListener0(this, l);
  24. }
  25. } finally {
  26. this.listeners = null;
  27. threadLocals.setFutureListenerStackDepth(stackDepth);
  28. }
  29. return;
  30. }
  31. }
  32. if (listeners instanceof DefaultFutureListeners) {
  33. final DefaultFutureListeners dfl = (DefaultFutureListeners) listeners;
  34. execute(executor, new Runnable() {
  35. @Override
  36. public void run() {
  37. notifyListeners0(DefaultPromise.this, dfl);
  38. DefaultPromise.this.listeners = null;
  39. }
  40. });
  41. } else {
  42. final GenericFutureListener<? extends Future<V>> l =
  43. (GenericFutureListener<? extends Future<V>>) listeners;
  44. execute(executor, new Runnable() {
  45. @Override
  46. public void run() {
  47. notifyListener0(DefaultPromise.this, l);
  48. DefaultPromise.this.listeners = null;
  49. }
  50. });
  51. }
  52. }

任务永远不能在主线程中执行,需要放到执行器所在的线程执行。DefaultFutureListeners和GenericFutureListener,一个是容器,一个是元素

还有几个修改状态的方法:

  1. @Override
  2. public boolean setUncancellable() {
  3. Object result = this.result;
  4. if (isDone0(result)) {
  5. return !isCancelled0(result);
  6. }
  7. synchronized (this) {
  8. // Allow only once.
  9. result = this.result;
  10. if (isDone0(result)) {
  11. return !isCancelled0(result);
  12. }
  13. this.result = UNCANCELLABLE;
  14. }
  15. return true;
  16. }
  17. private boolean setFailure0(Throwable cause) {
  18. if (cause == null) {
  19. throw new NullPointerException("cause");
  20. }
  21. if (isDone()) {
  22. return false;
  23. }
  24. synchronized (this) {
  25. // Allow only once.
  26. if (isDone()) {
  27. return false;
  28. }
  29. result = new CauseHolder(cause);
  30. if (hasWaiters()) {
  31. notifyAll();
  32. }
  33. }
  34. return true;
  35. }
  36. private boolean setSuccess0(V result) {
  37. if (isDone()) {
  38. return false;
  39. }
  40. synchronized (this) {
  41. // Allow only once.
  42. if (isDone()) {
  43. return false;
  44. }
  45. if (result == null) {
  46. this.result = SUCCESS;
  47. } else {
  48. this.result = result;
  49. }
  50. if (hasWaiters()) {
  51. notifyAll();
  52. }
  53. }
  54. return true;
  55. }
CompleteFuture的几个子类是状态Promise

PromiseTask:该类继承了RunnableFuture接口,该类表示异步操作的结果也可以异步获得,类似JDK中的FutureTask,实例化该对象时候需要传一个Callable的对象,如果没有该对象可以传递一个Runnable和一个Result构造一个Callable对象。

  1. private static final class RunnableAdapter<T> implements Callable<T> {
  2. final Runnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result) {
  5. this.task = task;
  6. this.result = result;
  7. }
  8. @Override
  9. public T call() {
  10. task.run();
  11. return result;
  12. }
  13. @Override
  14. public String toString() {
  15. return "Callable(task: " + task + ", result: " + result + ‘)‘;
  16. }
  17. }

看下他的run方法。

  1. @Override
  2. public void run() {
  3. try {
  4. if (setUncancellableInternal()) {
  5. V result = task.call();
  6. setSuccessInternal(result);
  7. }
  8. } catch (Throwable e) {
  9. setFailureInternal(e);
  10. }

该类的setFailure,setSuccess等方法都会抛出一异常,而加了internal的方法会成功执行,他们是protected,子类或者同一个package中可以调用。

ScheduledFutureTask:该类是定时任务返回的
ChannelFuture是这个结构中最重要的类,从名称可以知道是通道异步执行的结果:在netty中所有的IO操作都是异步的。这意味这所有的IO调用都会立即返回,且不保证IO操作完成。IO调用会返回一个ChannelFuture的实例,通过该实例可以查看IO操作的结果和状态,ChannelFuture有完成和未完成两种状态,当IO操作开始,就会创建一个ChannelFuture的实例,该实例初始是未完成状态,它不是成功,失败,或者取消,因为IO操作还没有完成,如果IO操作完成了那么将会有成功,失败,和取消状态,
*                                      +---------------------------+*                                      | Completed successfully    |*                                      +---------------------------+*                                 +---->      isDone() = <b>true</b>      |* +--------------------------+    |    |   isSuccess() = <b>true</b>      |* |        Uncompleted       |    |    +===========================+* +--------------------------+    |    | Completed with failure    |* |      isDone() = <b>false</b>    |    |    +---------------------------+* |   isSuccess() = false    |----+---->   isDone() = <b>true</b>         |* | isCancelled() = false    |    |    | cause() = <b>non-null</b>     |* |    cause() = null     |    |    +===========================+* +--------------------------+    |    | Completed by cancellation |*                                 |    +---------------------------+*                                 +---->      isDone() = <b>true</b>      |*                                      | isCancelled() = <b>true</b>      |*                                      +---------------------------+
 该类提供了很多方法用来检查IO操作是否完成,等待完成,和接受IO操作的结果。还可以添加ChannelFutureListener的监听器,这样IO操作完成时就可以得到提醒
 * 强烈建议使用addListener而不是await。
 * addListener是非阻塞的,它简单的添加指定的ChannelFutureListener到ChannelFuture中,
 * IO线程将在当绑定在这个future的IO操作完成时,触发这个触发器,优点是提高效率和资源的利用率
 * await()是一个阻塞方法,一旦调用,调用线程将会阻塞直到IO操作完成。优点是容易实现顺序逻辑



来自为知笔记(Wiz)

时间: 2024-10-05 08:08:59

Netty中的Future的相关文章

Netty中的那些坑

Netty中的那些坑(上篇) 最近开发了一个纯异步的redis客户端,算是比较深入的使用了一把netty.在使用过程中一边优化,一边解决各种坑.儿这些坑大部分基本上是Netty4对Netty3的改进部分引起的. 注:这里说的坑不是说netty不好,只是如果这些地方不注意,或者不去看netty的代码,就有可能掉进去了. 坑1: Netty 4的线程模型转变 在Netty 3的时候,upstream是在IO线程里执行的,而downstream是在业务线程里执行的.比如netty从网络读取一个包传递给

Netty 中 IOException: Connection reset by peer 与 java.nio.channels.ClosedChannelException: null

最近发现系统中出现了很多 IOException: Connection reset by peer 与 ClosedChannelException: null 深入看了看代码, 做了些测试, 发现 Connection reset 会在客户端不知道 channel 被关闭的情况下, 触发了 eventloop 的 unsafe.read() 操作抛出 而 ClosedChannelException 一般是由 Netty 主动抛出的, 在 AbstractChannel 以及 SSLHand

Netty中NioEventLoopGroup的创建源码分析

NioEventLoopGroup的无参构造: 1 public NioEventLoopGroup() { 2 this(0); 3 } 调用了单参的构造: 1 public NioEventLoopGroup(int nThreads) { 2 this(nThreads, (Executor)null); 3 } 继续看到双参构造: 1 public NioEventLoopGroup(int nThreads, Executor executor) { 2 this(nThreads,

netty中使用IdleStateHandler来发起心跳

网络连接中,处理Idle事件是很常见的,一般情况下,客户端与服务端在指定时间内没有任何读写请求,就会认为连接是idle的.此时,客户端需要向服务端发送ping消息,来维持服务端与客户端的链接.那么怎么判断客户端在指定时间里没有任何读写请求呢?netty中为我们提供一个特别好用的IdleStateHandler来干这个苦差事!请看下面代码: public class EchoClient { private final static int readerIdleTimeSeconds = 40;/

【转】Netty那点事(二)Netty中的buffer

[原文]https://github.com/code4craft/netty-learning/blob/master/posts/ch2-buffer.md 上一篇文章我们概要介绍了Netty的原理及结构,下面几篇文章我们开始对Netty的各个模块进行比较详细的分析.Netty的结构最底层是buffer机制,这部分也相对独立,我们就先从buffer讲起. What:buffer二三事 buffer中文名又叫缓冲区,按照维基百科的解释,是"在数据传输时,在内存里开辟的一块临时保存数据的区域&q

理解Netty中的零拷贝(Zero-Copy)机制【转】

理解零拷贝 零拷贝是Netty的重要特性之一,而究竟什么是零拷贝呢? WIKI中对其有如下定义: "Zero-copy" describes computer operations in which the CPU does not perform the task of copying data from one memory area to another. 从WIKI的定义中,我们看到"零拷贝"是指计算机操作的过程中,CPU不需要为数据在内存之间的拷贝消耗资源

Netty(五)序列化protobuf在netty中的使用

protobuf是google序列化的工具,主要是把数据序列化成二进制的数据来传输用的.它主要优点如下: 1.性能好,效率高: 2.跨语言(java自带的序列化,不能跨语言) protobuf参考文档:Protobuf详解 其实,在netty中使用Protobuf需要注意的是: protobufDecoder仅仅负责编码,并不支持读半包,所以在之前,一定要有读半包的处理器. 有三种方式可以选择: 使用netty提供ProtobufVarint32FrameDecoder 继承netty提供的通用

Netty中execution包功能详解

技术点描述 Netty中关于多线程处理的代码很多(netty框架的实现本身就是异步处理机制),此文档仅针对于execution包的功能做详细解说. 以下是整个包的目录结构: 包中的调用关系如下图所示: 实现方案 参考源码包 以下是对此包中的源码的分析(请注意后四个类为此包中最重要的类) ChannelEventRunnableFilter 此接口定义了一个抽象方法: boolean filter(ChannelEventRunnable event); 如果传入的event是由Executor处

Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题

[toc] Netty中LineBasedFrameDecoder解码器使用与分析:解决TCP粘包问题 上一篇文章<Netty中TCP粘包问题代码示例与分析>演示了使用了时间服务器的例子演示了TCP的粘包问题,这里使用LineBasedFrameDecoder就是用来解决这个问题的. 不过需要注意的是,LineBasedFrameDecoder见名知其义,可见其是与行相关的,而在前面演示TCP粘包问题时,作者是有意在发送的消息中都加入了换行符,目的也是为了在后面去讲解LineBasedFram