接上一篇( http://my.oschina.net/haogrgr/blog/489320 )
6. HashedWheelTimeout源码走读.
//任务的包装类, 链表结构, 负责保存deadline, 轮数, 等 //继承MpscLinkedQueueNode, 是因为timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理(并发优化) private static final class HashedWheelTimeout extends MpscLinkedQueueNode<Timeout> implements Timeout { private static final int ST_INIT = 0; private static final int ST_CANCELLED = 1; private static final int ST_EXPIRED = 2; private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER; static { AtomicIntegerFieldUpdater<HashedWheelTimeout> updater = PlatformDependent.newAtomicIntegerFieldUpdater(HashedWheelTimeout.class, "state"); if (updater == null) { updater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state"); } STATE_UPDATER = updater; } private final HashedWheelTimer timer; //timer引用 private final TimerTask task; //要执行的任务引用 private final long deadline; //Timer启动时间 - 任务执行时间(任务加入时间+任务延迟时间) @SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) private volatile int state = ST_INIT; //离任务执行还要等待的轮数, 当任务加入到wheel中时计算该值, 并在Worker中, 每过一轮, 该值减一. long remainingRounds; //双链表, 因为只有Worker这一个线程访问, 所以不需要synchronization / volatile. HashedWheelTimeout next; HashedWheelTimeout prev; //HashedWheelTimeout 所在的 wheel HashedWheelBucket bucket; HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) { this.timer = timer; this.task = task; this.deadline = deadline; } @Override public Timer timer() { return timer; } @Override public TimerTask task() { return task; } @Override public boolean cancel() { // only update the state it will be removed from HashedWheelBucket on next tick. //这里只修改状态从ST_INIT到ST_CANCELLED if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { return false; } //如果状态修改成功, 则表示第一次调用cancel方法, 将HashedWheelTimeout从bucked中移除的操作封装, //加入到cancelled队列, 等待下一次tick再移除, 跟踪下了源码历史发现之所以这么做, 是为了对GC友好, 以前取消任务要等到下一轮才会被处理, //于是, 改成将cancel的任务放在timeous队列里, 然后统一处理, timeous队列是MpscLinkedQueue, 里面对MpscLinkedQueueNode有特殊处理, //然而, 后面又发现有锁的问题, 因为timeous这个队列可能被多个线程操作(HashedWheelTimer.newTimeout()), 开始是加锁的, //于是, 将cancel任务另外存一个队列, 这样, 就不需要使用锁了, 具体见: //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1 //https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1 timer.cancelledTimeouts.add(new Runnable() { @Override public void run() { HashedWheelBucket bucket = HashedWheelTimeout.this.bucket; if (bucket != null) { bucket.remove(HashedWheelTimeout.this); } } }); return true; } public boolean compareAndSetState(int expected, int state) { return STATE_UPDATER.compareAndSet(this, expected, state); } public int state() { return state; } @Override public boolean isCancelled() { return state() == ST_CANCELLED; } @Override public boolean isExpired() { return state() == ST_EXPIRED; } @Override public HashedWheelTimeout value() { return this; } //到期, 执行任务 public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ‘.‘, t); } } } }
果然是大牛, 各种优化, 看了下源码的提交记录, 截取几段:
1) https://github.com/netty/netty/commit/1f68479e3cd94deb3172edd3c01aa74f35032b9b (以前wheel用的HashSet, 改成了数组)
Motivation: At the moment there are two issues with HashedWheelTimer: * the memory footprint of it is pretty heavy (250kb fon an empty instance) * the way how added Timeouts are handled is inefficient in terms of how locks etc are used and so a lot of context-switching / condition can happen. Modification: Rewrite HashedWheelTimer to use an optimized bucket implementation to store the submitted Timeouts and a MPSC queue to handover the timeouts. So volatile writes are reduced to a minimum and also the memory foot-print of the buckets itself is reduced a lot as the bucket uses a double-linked-list. Beside this we use Atomic*FieldUpdater where-ever possible to improve the memory foot-print and performance. Result: Lower memory-footprint and better performance
2) https://github.com/netty/netty/commit/c4d585420f948164c53a8406a9fc67f8d3bfb8a1
Motivation: At the moment the HashedWheelTimer will only remove the cancelled Timeouts once the HashedWheelBucket is processed again. Until this the instance will not be able to be GC‘ed as there are still strong referenced to it even if the user not reference it by himself/herself. This can cause to waste a lot of memory even if the Timeout was cancelled before. Modification: Add a new queue which holds CancelTasks that will be processed on each tick to remove cancelled Timeouts. Because all of this is done only by the WorkerThread there is no need for synchronization and only one extra object creation is needed when cancel() is executed. For addTimeout(...) no new overhead is introduced. Result: Less memory usage for cancelled Timeouts.
3) https://github.com/netty/netty/commit/44ea769f537bf16b833d03db844b1f3067b3acd7
Motivation: Due some race-condition while handling canellation of TimerTasks it was possibleto corrupt the linked-list structure that is represent by HashedWheelBucket and so produce a NPE. Modification: Fix the problem by adding another MpscLinkedQueue which holds the cancellation tasks and process them on each tick. This allows to use no synchronization / locking at all while introduce a latency of max 1 tick before the TimerTask can be GC‘ed. Result: No more NPE
回到主题, 代码并不复杂, 开始看的时候, 发现继承了MpscLinkedQueueNode, 但是又没有地方用到, 后面看了下, 发现MpscLinkedQueue对其有特殊处理.
可以看到HashedWheelTimeout就是对Timeout任务的包装, 链表结构方便加入wheel, 记录deadline, remainingRounds, state等信息,
7. HashedWheelBucket 源码走读.
//用来存放HashedWheelTimeout, 结构有点像linked-list, 方便移除操作. private static final class HashedWheelBucket { //链表结构 private HashedWheelTimeout head; private HashedWheelTimeout tail; //添加HashedWheelTimeout, 链表操作, 不多说~~~ public void addTimeout(HashedWheelTimeout timeout) { assert timeout.bucket == null; timeout.bucket = this; if (head == null) { head = tail = timeout; } else { tail.next = timeout; timeout.prev = tail; tail = timeout; } } //当tick到该wheel的时候, Worker会调用这个方法, 根据deadline来判断任务是否过期(remainingRounds是否为0), //任务到期就执行, 没到期, 就timeout.remainingRounds--, 因为走到这里, 表示改wheel里的任务又过了一轮了. public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; //遍历链表 while (timeout != null) { boolean remove = false; if (timeout.remainingRounds <= 0) {//任务已到执行点 if (timeout.deadline <= deadline) { timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } remove = true; } else if (timeout.isCancelled()) { remove = true; } else {//没到期, 剩余轮数减一 timeout.remainingRounds --; } //先保存next, 因为移除后, 再获取timeout.next会为空. HashedWheelTimeout next = timeout.next; if (remove) {//当以到期, 或者被取消, 就将timeou从链表中移除 remove(timeout); } timeout = next; } } //链表移除, 不多说 public void remove(HashedWheelTimeout timeout) { HashedWheelTimeout next = timeout.next; if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; } if (timeout == head) { if (timeout == tail) { tail = null; head = null; } else { head = next; } } else if (timeout == tail) { tail = timeout.prev; } timeout.prev = null; timeout.next = null; timeout.bucket = null; } //Clear this bucket and return all not expired / cancelled {@link Timeout}s. public void clearTimeouts(Set<Timeout> set) { for (;;) { HashedWheelTimeout timeout = pollTimeout(); if (timeout == null) { return; } if (timeout.isExpired() || timeout.isCancelled()) { continue; } set.add(timeout); } } //链表的poll private HashedWheelTimeout pollTimeout() { HashedWheelTimeout head = this.head; if (head == null) { return null; } HashedWheelTimeout next = head.next; if (next == null) { tail = this.head = null; } else { this.head = next; next.prev = null; } head.next = null; head.prev = null; head.bucket = null; return head; } }
可以看到, 代码也不复杂, 主要是提供一个类似于LinkedList的容器, 用来存放HashedWheelTimeout, 并提供expireTimeouts(long deadline) 方法来处理该wheel中的任务. 具体处理看注释.
字数限制... 接第二篇..., 还剩最后的Worker的代码.
时间: 2024-10-11 00:22:55