《java.util.concurrent 包源码阅读》18 Exchanger

Exchanger可以看做双向数据传输的SynchronousQueue,即没有生产者和消费者之分,任意两个线程都可以交换数据。

在JDK5中Exchanger被设计成一个容量为1的容器,存放一个等待线程,直到有另外线程到来就会发生数据交换,然后清空容器,等到下一个到来的线程。

从JDK6开始,Exchanger用了类似ConcurrentMap的分段思想,提供了多个slot,增加了并发执行时的吞吐量。

Exchanger不存在公平不公平的模式,因为没有排队的情况发生,只要有两个线程就可以发生数据交换。

直接看核心方法:

    private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);
        // index是线程ID的hash值映射到0到max之间的一个值
        // 一般情况下max为0,这样线程交换数据只会使用第一个slot,
        // 即index是0,而max不为0情况请看下面的循环
        int index = hashIndex();

        // CAS操作失败的次数
        int fails = 0;

        for (;;) {
            // 当前slot中存储的对象,也就是Node
            Object y;
            Slot slot = arena[index];
            // 延迟加载,即只有当slot为null时才创建一个slot
            // 延迟加载后重新循环一次
            if (slot == null)
                createSlot(index);
            // slot中有数据,也就意味着有线程在等待交换数据
            // 这时可以尝试用CAS重置slot(把slot存储的对象设为null)
            // 用slot中存储的对象和当前线程进行数据交换
            // 如果交换成功就通知原先等待的线程
            else if ((y = slot.get()) != null &&
                     slot.compareAndSet(y, null)) {
                Node you = (Node)y;
                if (you.compareAndSet(null, item)) {
                    LockSupport.unpark(you.waiter);
                    return you.item;
                }
                // 如果slot存储的对象已经被重置为null,但是数据交换失败了
                // 这时就意味着这个等待的线程的交换请求被取消了
                // 在分析wait类型的方法代码时会看到如何处理这种情况
            }
            // 如果slot中没有存储对象,那么首先尝试把当前线程存储到slot中
            // 如果存储失败了,就重新循环
            else if (y == null &&
                     slot.compareAndSet(null, me)) {
                // index为0意味着仅仅有当前线程在等待交换数据,因此直接等待即可
                if (index == 0)
                    return timed ?
                        awaitNanos(me, slot, nanos) :
                        await(me, slot);
                // 所谓的spin wait:就是固定次数循环,每次计数减一
                // 对于单核系统来说,spin wait是不做的,因为单核
                // 做wait时需要占用CPU,其他线程是无法使用CPU,因此这样
                // 的等待毫无意义。而多核系统中spin值为2000,也就是会做
                // 2000次循环。
                // 如果循环完成后依然没有得到交换的数据,那么会返回一个
                // CANCEL对象表示请求依旧被取消,并且把Node从slot中清除
                Object v = spinWait(me, slot);
                if (v != CANCEL)
                    return v;
                // 如果取消了,就新建一个Node取消原先取消的Node用于下次循环
                me = new Node(item);
                int m = max.get();
                // index除2,缩小slot的范围
                // 同时如果m过大,减小m
                if (m > (index >>>= 1))
                    max.compareAndSet(m, m - 1);
            }
            // 允许CAS失败两次,因为两个else if中都有CAS,因此这里
            // 允许两个else if的CAS操作都失败过
            else if (++fails > 1) {
                int m = max.get();
                // 失败超过3次,增大m,并且从m处重新索引
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;
                // 当index小于0,回到m,重新循环
                else if (--index < 0)
                    index = m;
            }
        }
    }

这篇文章关于索引index这块弄得不是很清楚,后续会继续研究,及时更新。

时间: 2024-10-02 23:49:58

《java.util.concurrent 包源码阅读》18 Exchanger的相关文章

《java.util.concurrent 包源码阅读》 结束语

<java.util.concurrent 包源码阅读>系列文章已经全部写完了.开始的几篇文章是根据自己的读书笔记整理出来的(当时只阅读了部分的源代码),后面的大部分都是一边读源代码代码,一边写文章. 由于水平有限,在阅读源代码的时候,分析得也比较浅显,也有很多地方自己也没有研究明白,文章有的地方显得语焉不详,只能请各位多多见谅了. 后面会继续写一些关于Java并发编程的文章,希望各位多多指教. 这里整理了一个简单的目录,包含了本系列所有文章的链接: <java.util.concurr

《java.util.concurrent 包源码阅读》13 线程池系列之ThreadPoolExecutor 第三部分

这一部分来说说线程池如何进行状态控制,即线程池的开启和关闭. 先来说说线程池的开启,这部分来看ThreadPoolExecutor构造方法: public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecut

《java.util.concurrent 包源码阅读》06 ArrayBlockingQueue

对于BlockingQueue的具体实现,主要关注的有两点:线程安全的实现和阻塞操作的实现.所以分析ArrayBlockingQueue也是基于这两点. 对于线程安全来说,所有的添加元素的方法和拿走元素的方法都会涉及到,我们通过分析offer方法和poll()方法就能看出线程安全是如何实现的. 首先来看offer方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lo

《java.util.concurrent 包源码阅读》05 BlockingQueue

想必大家都很熟悉生产者-消费者队列,生产者负责添加元素到队列,如果队列已满则会进入阻塞状态直到有消费者拿走元素.相反,消费者负责从队列中拿走元素,如果队列为空则会进入阻塞状态直到有生产者添加元素到队列.BlockingQueue就是这么一个生产者-消费者队列. BlockingQueue是Queue的子接口 public interface BlockingQueue<E> extends Queue<E> BlockingQueue拿走元素时,如果队列为空,阻塞等待会有两种情况:

《java.util.concurrent 包源码阅读》14 线程池系列之ScheduledThreadPoolExecutor 第一部分

ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,同时实现了ScheduledExecutorService接口. public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService ScheduledThreadPoolExecutor的功能主要有两点:在固定的时间点执行(也可以认为是延迟执行),重复执行.

《java.util.concurrent 包源码阅读》10 线程池系列之AbstractExecutorService

AbstractExecutorService对ExecutorService的执行任务类型的方法提供了一个默认实现.这些方法包括submit,invokeAny和InvokeAll. 注意的是来自Executor接口的execute方法是未被实现,execute方法是整个体系的核心,所有的任务都是在这个方法里被真正执行的,因此该方法的不同实现会带来不同的执行策略.这个在后面分析ThreadPoolExecutor和ScheduledThreadPoolExecutor就能看出来. 首先来看su

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

《java.util.concurrent 包源码阅读》11 线程池系列之ThreadPoolExecutor 第一部分

先来看ThreadPoolExecutor的execute方法,这个方法能体现出一个Task被加入到线程池之后都发生了什么: public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* 如果运行中的worker线程数少于设定的常驻线程数,增加worker线程,把task分配给新建的worker线程 */ int c = ctl.get(); if (worker

《java.util.concurrent 包源码阅读》09 线程池系列之介绍篇

concurrent包中Executor接口的主要类的关系图如下: Executor接口非常单一,就是执行一个Runnable的命令. public interface Executor { void execute(Runnable command); } ExecutorService接口扩展了Executor接口,增加状态控制,执行多个任务返回Future. 关于状态控制的方法: // 发出关闭信号,不会等到现有任务执行完成再返回,但是现有任务还是会继续执行, // 可以调用awaitTe