【Java并发编程】22、Exchanger源码解析

Exchanger是双向的数据传输,2个线程在一个同步点,交换数据。先到的线程会等待第二个线程执行exchange
SynchronousQueue,是2个线程之间单向的数据传输,一个put,一个take。

先举个例子说明一下如何使用

public class ExchangerDemo {
    public static void main(String[] args) {
        Exchanger<List<Integer>> exchanger = new Exchanger<>();
        new Consumer(exchanger).start();
        //方便调试,让consumer先执行exchange
        try {
            Thread.sleep(1000 * 5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Producer(exchanger).start();
    }

    static class Consumer extends Thread {
        List<Integer> list = new ArrayList<>();
        Exchanger<List<Integer>> exchanger = null;

        public Consumer(Exchanger<List<Integer>> exchanger) {
            super();
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            for (int i = 0; i < 1; i++) {
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.print(list.get(0) + ", ");
                System.out.print(list.get(1) + ", ");
                System.out.print(list.get(2) + ", ");
                System.out.print(list.get(3) + ", ");
                System.out.println(list.get(4) + ", ");
            }
        }
    }

    static class Producer extends Thread {
        List<Integer> list = new ArrayList<>();
        Exchanger<List<Integer>> exchanger = null;

        public Producer(Exchanger<List<Integer>> exchanger) {
            super();
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            Random rand = new Random();
            for (int i = 0; i < 1; i++) {
                list.clear();
                list.add(rand.nextInt(10000));
                list.add(rand.nextInt(10000));
                list.add(rand.nextInt(10000));
                list.add(rand.nextInt(10000));
                list.add(rand.nextInt(10000));
                try {
                    list = exchanger.exchange(list);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

再看一下内部结构

private static final class Node extends AtomicReference<Object> {
    /** 创建这个节点的线程提供的用于交换的数据。 */
    public final Object item;
    /** 等待唤醒的线程 */
    public volatile Thread waiter;
    /**
     * Creates node with given item and empty hole.
     * @param item the item
     */
    public Node(Object item) {
        this.item = item;
    }
}  

/**
 * 一个Slot就是一对线程交换数据的地方。
 * 这里对Slot做了缓存行填充,能够避免伪共享问题。
 * 虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
 */
private static final class Slot extends AtomicReference<Object> {
    // Improve likelihood of isolation on <= 64 byte cache lines
    long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}  

/**
 * Slot数组,在需要时才进行初始化。
 * 用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
 */
private volatile Slot[] arena = new Slot[CAPACITY];
/**
 * arena(Slot数组)的容量。设置这个值用来避免竞争。
 */
private static final int CAPACITY = 32;
/**
 * 正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,
 * 这个值会递增;当一个线程自旋等待超时后,这个值会递减。
 */
private final AtomicInteger max = new AtomicInteger();  

关键技术点1:CacheLine填充

交换数据的场所就是Slot,每个要进行数据交换的线程在内部会用一个Node来表示。Slot其实是一个AtomicReference

Slot其实就是一个AtomicReference,其里面的q0, q1,..qd那些变量,都是多余的,不用的,起到了cache line填充的作用,避免了伪共享问题;

伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。

关键技术点2:锁分离

同ConcurrentHashMap类型,Exchange没有只定义一个slot,而是定义了一个slot的数组。这样在多线程调用exchange的时候,可以各自在不同的slot里面进行匹配。

exchange的基本思路如下: 
(1)根据每个线程的thread id, hash计算出自己所在的slot index; 
(2)如果运气好,这个slot被人占着(slot里面有node),并且有人正在等待交换,那就和它进行交换; 
(3)slot为空的(slot里面没有node),自己占着,等人交换。没人交换,向前挪个位置,把当前slot里面内容取消,index减半,再看有没有交换; 
(4)挪到0这个位置,还没有人交互,那就阻塞,一直等着。别的线程,也会一直挪动,直到0这个位置。

所以0这个位置,是一个交易的“终结点”位置!别的位置上找不到人交易,最后都会到0这个位置。

/**
 * 等待其他线程到达交换点,然后与其进行数据交换。
 *
 * 如果其他线程到来,那么交换数据,返回。
 *
 * 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
 *   1.有其他线程来进行数据交换。
 *   2.当前线程被中断。
 */
public V exchange(V x) throws InterruptedException {
    if (!Thread.interrupted()) {//检测当前线程是否被中断。
        //进行数据交换。
        Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
        if (v == NULL_ITEM)
            return null; //检测结果是否为null。
        if (v != CANCEL) //检测是否被取消。
            return (V)v;
        Thread.interrupted(); // 清除中断标记。
    }
    throw new InterruptedException();
}
/**
 * 等待其他线程到达交换点,然后与其进行数据交换。
 *
 * 如果其他线程到来,那么交换数据,返回。
 *
 * 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
 *   1.有其他线程来进行数据交换。
 *   2.当前线程被中断。
 *   3.超时。
 */
public V exchange(V x, long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    if (!Thread.interrupted()) {
        Object v = doExchange(x == null? NULL_ITEM : x,
                              true, unit.toNanos(timeout));
        if (v == NULL_ITEM)
            return null;
        if (v != CANCEL)
            return (V)v;
        if (!Thread.interrupted())
            throw new TimeoutException();
    }
    throw new InterruptedException();
}  

上面的方法都调用了doExchange方法,主要逻辑在这个方法里,分析下这个方法:

private Object doExchange(Object item, boolean timed, long nanos) {
        Node me = new Node(item);
        int index = hashIndex();                  //根据thread id计算出自己要去的那个交易位置(slot)
        int fails = 0;                            

        for (;;) {
            Object y;
            Slot slot = arena[index];
            if (slot == null)
                createSlot(index);     //slot = null,创建一个slot,然后会回到for循环,再次开始
            else if ((y = slot.get()) != null &&  //slot里面有人等着(有Node),则尝试和其交换
                     slot.compareAndSet(y, null)) { //关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
                Node you = (Node)y;
                if (you.compareAndSet(null, item)) {//把Node里面的东西,换成自己的
                    LockSupport.unpark(you.waiter); //唤醒对方
                    return you.item; //自己把对方的东西拿走
                } //关键点2:如果你运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始
            }
            else if (y == null &&                 //slot里面为空(没有Node),则自己把位置占住
                     slot.compareAndSet(null, me)) {
                if (index == 0)                   //如果是0这个位置,自己阻塞,等待别人来交换
                    return timed? awaitNanos(me, slot, nanos): await(me, slot);
                Object v = spinWait(me, slot);    //不是0这个位置,自旋等待
                if (v != CANCEL)  //自旋等待的时候,运气好,有人来交换了,返回
                    return v;
                me = new Node(item);     //自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环
                int m = max.get();
                if (m > (index >>>= 1))
                    max.compareAndSet(m, m - 1);
            }
            else if (++fails > 1) { //失败 case1: slot有人,要交互,但被人家抢了  case2: slot没人,自己要占位置,又被人家抢了
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
                    index = m + 1;   //3次匹配失败,把index扩大,再次开始for循环
                else if (--index < 0)
                    index = m;
            }
        }
    }

这里形象的理解一下:

其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。

2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。

3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上...),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。

4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。

5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

看一下awaitNanos方法:

/**
 * 在下标为0的Slot上等待获取其他线程填充的值。
 * 如果在Slot被填充之前超时或者被中断,那么操作失败。
 */
private Object awaitNanos(Node node, Slot slot, long nanos) {
    int spins = TIMED_SPINS;
    long lastTime = 0;
    Thread w = null;
    for (;;) {
        Object v = node.get();
        if (v != null)
            //如果已经被其他线程填充了值,那么返回这个值。
            return v;
        long now = System.nanoTime();
        if (w == null)
            w = Thread.currentThread();
        else
            nanos -= now - lastTime;
        lastTime = now;
        if (nanos > 0) {
            if (spins > 0)
                --spins; //先自旋几次。
            else if (node.waiter == null)
                node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
            else if (w.isInterrupted())
                tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
            else
                LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
        }
        else if (tryCancel(node, slot) && !w.isInterrupted())
            //超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
            return scanOnTimeout(node);
    }
}  

awaitNanos中的自旋次数为TIMED_SPINS,这里说明一下自旋次数:

/**
 * 单核处理器下这个自旋次数为0
 * 多核情况下,这个值设置为大多数系统中上下文切换时间的平均值。
 */
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
/**
 * 在有超时情况下阻塞等待之前自旋的次数。.
 * 超时等待的自旋次数之所以更少,是因为检测时间也需要耗费时间。
 * 这里的值是一个经验值。
 */
private static final int TIMED_SPINS = SPINS / 20; 

最后看一下arena(Slot数组),默认的容量和实际使用的下标最大值:

private static final int CAPACITY = 32;
/**
 * The value of "max" that will hold all threads without
 * contention.  When this value is less than CAPACITY, some
 * otherwise wasted expansion can be avoided.
 */
private static final int FULL =
    Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);  

前面说过arena容量默认为32,目的是为了减少线程的竞争,但实际上对arena的使用不会超过FULL这个值(避免一些空间浪费)。这个值取的是32(默认CAPACITY)和CPU核心数量的一半,这两个数的较小值在减1的数和0的较大值.... 也就是说,如果CPU核很多的情况下,这个值最大也就是31,;如果是单核或者双核CPU,这个值就是0,也就是说只能用arena[0]。这也是为什么前面的hashIndex方法里面会做的(近似)取模操作比较复杂,因为实际的能使用的Slot数组范围可能不是2的幂。

出处:

http://blog.csdn.net/chunlongyu/article/details/52504895
http://brokendreams.iteye.com/blog/2253956

原文地址:https://www.cnblogs.com/wangzhongqiu/p/8568689.html

时间: 2024-08-01 07:47:36

【Java并发编程】22、Exchanger源码解析的相关文章

Java并发编程之CountDownLatch源码解析

一.导语 最近在学习并发编程原理,所以准备整理一下自己学到的知识,先写一篇CountDownLatch的源码分析,之后希望可以慢慢写完整个并发编程. 二.什么是CountDownLatch CountDownLatch是java的JUC并发包里的一个工具类,可以理解为一个倒计时器,主要是用来控制多个线程之间的通信.比如有一个主线程A,它要等待其他4个子线程执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了. 三.简单使用 public static void mai

Java并发编程:Concurrent锁机制解析

.title { text-align: center } .todo { font-family: monospace; color: red } .done { color: green } .tag { background-color: #eee; font-family: monospace; padding: 2px; font-size: 80%; font-weight: normal } .timestamp { color: #bebebe } .timestamp-kwd

Java并发系列[2]----AbstractQueuedSynchronizer源码分析之独占模式

在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模

Java并发系列[5]----ReentrantLock源码分析

在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

死磕 java同步系列之ReentrantReadWriteLock源码解析

问题 (1)读写锁是什么? (2)读写锁具有哪些特性? (3)ReentrantReadWriteLock是怎么实现读写锁的? (4)如何使用ReentrantReadWriteLock实现高效安全的TreeMap? 简介 读写锁是一种特殊的锁,它把对共享资源的访问分为读访问和写访问,多个线程可以同时对共享资源进行读访问,但是同一时间只能有一个线程对共享资源进行写访问,使用读写锁可以极大地提高并发量. 特性 读写锁具有以下特性: 是否互斥 读 写 读 否 是 写 是 是 可以看到,读写锁除了读读

死磕 java同步系列之Phaser源码解析

问题 (1)Phaser是什么? (2)Phaser具有哪些特性? (3)Phaser相对于CyclicBarrier和CountDownLatch的优势? 简介 Phaser,翻译为阶段,它适用于这样一种场景,一个大任务可以分为多个阶段完成,且每个阶段的任务可以多个线程并发执行,但是必须上一个阶段的任务都完成了才可以执行下一个阶段的任务. 这种场景虽然使用CyclicBarrier或者CountryDownLatch也可以实现,但是要复杂的多.首先,具体需要多少个阶段是可能会变的,其次,每个阶

死磕 java同步系列之Semaphore源码解析

问题 (1)Semaphore是什么? (2)Semaphore具有哪些特性? (3)Semaphore通常使用在什么场景中? (4)Semaphore的许可次数是否可以动态增减? (5)Semaphore如何实现限流? 简介 Semaphore,信号量,它保存了一系列的许可(permits),每次调用acquire()都将消耗一个许可,每次调用release()都将归还一个许可. 特性 Semaphore通常用于限制同一时间对共享资源的访问次数上,也就是常说的限流. 下面我们一起来学习Java

死磕 java同步系列之CountDownLatch源码解析

??欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)CountDownLatch是什么? (2)CountDownLatch具有哪些特性? (3)CountDownLatch通常运用在什么场景中? (4)CountDownLatch的初始次数是否可以调整? 简介 CountDownLatch,可以翻译为倒计时器,但是似乎不太准确,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作. Cou

死磕 java同步系列之StampedLock源码解析

问题 (1)StampedLock是什么? (2)StampedLock具有什么特性? (3)StampedLock是否支持可重入? (4)StampedLock与ReentrantReadWriteLock的对比? 简介 StampedLock是java8中新增的类,它是一个更加高效的读写锁的实现,而且它不是基于AQS来实现的,它的内部自成一片逻辑,让我们一起来学习吧. StampedLock具有三种模式:写模式.读模式.乐观读模式. ReentrantReadWriteLock中的读和写都是

死磕 java同步系列之CyclicBarrier源码解析——有图有真相

问题 (1)CyclicBarrier是什么? (2)CyclicBarrier具有什么特性? (3)CyclicBarrier与CountDownLatch的对比? 简介 CyclicBarrier,回环栅栏,它会阻塞一组线程直到这些线程同时达到某个条件才继续执行.它与CountDownLatch很类似,但又不同,CountDownLatch需要调用countDown()方法触发事件,而CyclicBarrier不需要,它就像一个栅栏一样,当一组线程都到达了栅栏处才继续往下走. 使用方法 pu