《java.util.concurrent 包源码阅读》27 Phaser 第一部分

Phaser是JDK7新添加的线程同步辅助类,作用同CyclicBarrier,CountDownLatch类似,但是使用起来更加灵活:

1. Parties是动态的。

2. Phaser支持树状结构,即Phaser可以有一个父Phaser。

Phaser的构造函数涉及到两个参数:父Phaser和初始的parties,因此提供了4个构造函数:

public Phaser();
public Phaser(int parties);

public Phaser(Phaser parent);
public Phaser(Phaser parent, int parties);

因为Phaser的特色在在于动态的parties,因此首先来看如何动态更新parties是如何实现的。

Phaser提供了两个方法:register和bulkRegister,前者会添加一个需要同步的线程,后者会添加parties个需要同步的线。

    public int register() {
        return doRegister(1);
    }

    // 增加了参数的检查
    public int bulkRegister(int parties) {
        if (parties < 0)
            throw new IllegalArgumentException();
        if (parties == 0)
            return getPhase();
        return doRegister(parties);
    }

两个方法都调用了doRegister方法,因此接下来就来看看doRegister方法。

在分析doRegister之前先来说说Phaser的成员变量:state,它存储了Phaser的状态信息:

private volatile long state;

1. state的最高位是一个标志位,1表示Phaser的状态为终止,0表示运行状态

2. state的低32位中,低16位表示没有到达的线程数量,高16位表示Parties值

3. state的高32位除了最高位之外的其他31位表示的Phaser的phase,可以理解为第多少次同步(从0开始计算)。

介绍完了state,来看方法doRegister:

    private int doRegister(int registrations) {
        // 把registrations值同时加到parties值和还未达到的线程数量中去
        long adj = ((long)registrations << PARTIES_SHIFT) | registrations;
        final Phaser parent = this.parent;
        int phase;
        for (;;) {
            long s = state;
            int counts = (int)s;
            int parties = counts >>> PARTIES_SHIFT;
            int unarrived = counts & UNARRIVED_MASK;
            // 超过了允许的最大parties
            if (registrations > MAX_PARTIES - parties)
                throw new IllegalStateException(badRegister(s));
            // 最高位为1,表示Phaser的线程同步已经结束
            else if ((phase = (int)(s >>> PHASE_SHIFT)) < 0)
                break;
            // Phaser中的parties不是0
            else if (counts != EMPTY) {
                // 如果当前Phaser没有父Phaser,或者如果有父Phaser,
                // 刷新自己的state值,如果刷新后的state没有变化。
                // 这里刷新子Phaser的原因在于,会出现父Phaser已经进入下一个phase
                // 而子Phaser却没有及时进入下一个phase的延迟现象
                if (parent == null || reconcileState() == s) {
                    // 如果所有线程都到达了,等待Phaser进入下一次同步开始
                    if (unarrived == 0)
                        root.internalAwaitAdvance(phase, null);
                    // 更新state成功,跳出循环完成注册
                    else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                       s, s + adj))
                        break;
                }
            }
            // 第一次注册,且不是子Phaser
            else if (parent == null) {
                // 更新当前Phaser的state值成功则完成注册
                long next = ((long)phase << PHASE_SHIFT) | adj;
                if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                    break;
            }
            // 第一次注册到子Phaser
            else {
                // 锁定当前Phaser对象
                synchronized (this) {
                    // 再次检查state值,确保没有被更新
                    if (state == s) {
                        // 注册到父Phaser中去
                        parent.doRegister(1);
                        do { // 获取当前phase值
                            phase = (int)(root.state >>> PHASE_SHIFT);
                        } while (!UNSAFE.compareAndSwapLong
                                 (this, stateOffset, state,
                                  ((long)phase << PHASE_SHIFT) | adj));// 更新当前Phaser的state值
                        break;
                    }
                }
            }
        }
        return phase;
    }

看完了注册,那么来看同步操作的arrive,这里也涉及到两个方法:arrive和arriveAndDeregister,前者会等待其他线程的到达,后者则会立刻返回:

    public int arrive() {
        return doArrive(false);
    }

    public int arriveAndDeregister() {
        return doArrive(true);
    }

两个方法都调用了doArrive方法,区别在于参数一个是false,一个是true。那么来看doArrive:

    private int doArrive(boolean deregister) {
        // arrive需要把未到达的线程数减去1,
        // deregister为true,需要把parties值也减去1
        int adj = deregister ? ONE_ARRIVAL|ONE_PARTY : ONE_ARRIVAL;
        final Phaser root = this.root;
        for (;;) {
            // 如果是有父Phaser,首先刷新自己的state
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            int counts = (int)s;
            int unarrived = (counts & UNARRIVED_MASK) - 1;
            // 最高位为1,表示同步已经结束,返回phase值
            if (phase < 0)
                return phase;
            // 如果parties为0或者在此次arrive之前所有线程到达
            else if (counts == EMPTY || unarrived < 0) {
                // 对于非子Phaser来说,上述情况的arrive肯定是非法的
                // 对于子Phaser首先刷新一下状态再做检查
                if (root == this || reconcileState() == s)
                    throw new IllegalStateException(badArrive(s));
            }
            // 正常情况下,首先更新state
            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adj)) {
                // 所有线程都已经到达
                if (unarrived == 0) {
                    // 计算parties作为下一个phase的未到达的parties
                    long n = s & PARTIES_MASK;
                    int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                    // 调用父Phaser的doArrive
                    if (root != this)
                        // 如果下一个phase的未到达的parties为0,则需要向
                        // 父Phaser取消注册
                        return parent.doArrive(nextUnarrived == 0);
                    // 正在进入下一个Phase,默认的实现是nextUnarrived为0
                    // 表示正在进入下一个Phase,因为下一个phase的parties
                    // 为0,需要等待parties不为0
                    if (onAdvance(phase, nextUnarrived))
                        // 正在等待下一个phase,设置状态为终止
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        // 下一个phase的parties为0,更新未到达的parties的值
                        n |= EMPTY;
                    else
                        // 更新下一个phase的未到达的parties的值
                        n |= nextUnarrived;
                    // phase值加1
                    n |= (long)((phase + 1) & MAX_PHASE) << PHASE_SHIFT;

                    // 更新state值
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);

                    // 唤醒等待的线程
                    releaseWaiters(phase);
                }
                return phase;
            }
        }
    }

关于arrive还有一个方法:arriveAndAwaitAdvance。这个方法会等到下一个phase开始再返回,相等于doArrive方法添加了awaitAdvance方法的功能。基本逻辑和上面说的doArrive方法类似:

    public int arriveAndAwaitAdvance() {
        final Phaser root = this.root;
        for (;;) {
            long s = (root == this) ? state : reconcileState();
            int phase = (int)(s >>> PHASE_SHIFT);
            int counts = (int)s;
            int unarrived = (counts & UNARRIVED_MASK) - 1;
            if (phase < 0)
                return phase;
            else if (counts == EMPTY || unarrived < 0) {
                // 对于非子Phaser来说,因为可以等待下一个phase,
                // 所以不是非法arrive
                if (reconcileState() == s)
                    throw new IllegalStateException(badArrive(s));
            }
            else if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                               s -= ONE_ARRIVAL)) {
                // 还有其他线程没有达到,就会等待直到下一个phase开始
                if (unarrived != 0)
                    return root.internalAwaitAdvance(phase, null);
                if (root != this)
                    return parent.arriveAndAwaitAdvance();
                long n = s & PARTIES_MASK;  // base of next state
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    n |= nextUnarrived;
                int nextPhase = (phase + 1) & MAX_PHASE;
                n |= (long)nextPhase << PHASE_SHIFT;
                if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                    return (int)(state >>> PHASE_SHIFT);
                releaseWaiters(phase);
                return nextPhase;
            }
        }
    }

这一部分主要讲了Phaser的动态更新parties以及线程的arrive,下一部分将会分析线程等待的实现。

时间: 2024-08-06 14:47:24

《java.util.concurrent 包源码阅读》27 Phaser 第一部分的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe