Exchanger 源码分析

Exchanger

此类提供对外的操作是同步的;
用于成对出现的线程之间交换数据【主场景】;
可以视作双向的同步队列;
可应用于基因算法、流水线设计、数据校对等场景

创建实例

    /**
     * arena 数组中两个已使用的 slot 之间的索引距离,将它们分开以避免错误的共享
     */
    private static final int ASHIFT = 5;

    /**
     * arena 数组的最大索引值,最大 size 值为 0xff+1
     */
    private static final int MMASK = 0xff;

    /**
     * Unit for sequence/version bits of bound field. Each successful
     * change to the bound also adds SEQ.
     */
    private static final int SEQ = MMASK + 1;

    /** JVM 的 CPU 核数,用于自旋和扩容控制 */
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     *  arena 的最大索引值:原则上可以让所有线程不发生竞争
     */
    static final int FULL = NCPU >= MMASK << 1 ? MMASK : NCPU >>> 1;

    /**
     * 当前线程阻塞等待匹配节点前的自旋次数,CPU==1 时不进行自旋
     */
    private static final int SPINS = 1 << 10;

    /**
     * Value representing null arguments/returns from public methods.
     * 旧 API 不支持 null 值所以需要适配。
     */
    private static final Object NULL_ITEM = new Object();

    /**
     *  交换超时的返回值对象
     */
    private static final Object TIMED_OUT = new Object();

    @jdk.internal.vm.annotation.Contended static final class Node {
        // Arena index
        int index;
        // Last recorded value of Exchanger.bound
        int bound;
        // Number of CAS failures at current bound
        int collides;
        // 自旋的伪随机数
        int hash;
        // 线程的当前数据对象
        Object item;
        // 匹配线程的数据对象
        volatile Object match;
        // 驻留阻塞线程
        volatile Thread parked;
    }

    /** 参与者 */
    static final class Participant extends ThreadLocal<Node> {
        @Override
        public Node initialValue() { return new Node(); }
    }

    /**
     *  每个线程的状态
     */
    private final Participant participant;

    /**
     *  消除数组,只在出现竞争时初始化。
     */
    private volatile Node[] arena;

    /**
     *  未发生竞争时使用的 slot
     */
    private volatile Node slot;

    /**
     * The index of the largest valid arena position, OR‘ed with SEQ
     * number in high bits, incremented on each update.  The initial
     * update from 0 to SEQ is used to ensure that the arena array is
     * constructed only once.
     */
    private volatile int bound;

    /**
     * Creates a new Exchanger.
     */
    public Exchanger() {
        participant = new Participant();
    }

线程间交换数据

    /**
     *  阻塞等待其他线程到达交换点后执行数据交换,支持中断
     */
    @SuppressWarnings("unchecked")
    public V exchange(V x) throws InterruptedException {
        Object v;
        // 将目标对象 v 进行编码
        final Object item = x == null ? NULL_ITEM : x; // translate null args
        /**
         * 1)arena==null,表示未出现线程竞争,则使用 slot 进行数据交换
         * 2)线程已经中断,则抛出 InterruptedException
         * 3)arena!=null,则使用 arena 中的 slot 进行数据交换
         */
        if ((arena != null ||
                (v = slotExchange(item, false, 0L)) == null) &&
                (Thread.interrupted() || // disambiguates null return
                        (v = arenaExchange(item, false, 0L)) == null)) {
            throw new InterruptedException();
        }
        // 解码目标对象
        return v == NULL_ITEM ? null : (V)v;
    }

    /**
     *  未出现竞争时的数据交换方式
     * @param item  需要交换的目标对象
     * @param timed 是否是超时模式
     * @param ns    超时的纳秒数
     * @return
     *  1)目标线程的数据对象
     *  2)null slot 交换出现竞争、线程被中断
     *  3)TIMED_OUT 交换超时
     */
    private final Object slotExchange(Object item, boolean timed, long ns) {
        // 读取参与者节点
        final Node p = participant.get();
        // 读取当前线程
        final Thread t = Thread.currentThread();
        // 线程被设置了中断标识,则返回 null
        if (t.isInterrupted()) {
            return null;
        }

        for (Node q;;) {
            // 1)已经有线程在阻塞等待交换数据
            if ((q = slot) != null) {
                // 将 slot 置为 null
                if (SLOT.compareAndSet(this, q, null)) {
                    // 读取目标对象
                    final Object v = q.item;
                    // 写入交换对象
                    q.match = item;
                    // 如果线程在阻塞等待
                    final Thread w = q.parked;
                    if (w != null) {
                        // 则唤醒交换线程
                        LockSupport.unpark(w);
                    }
                    // 返回交换到的对象
                    return v;
                }
                /**
                 * NCPU > 1 多核 CPU 才会启用竞技场 &&
                 * 设置最大有效的 arena 索引值
                 */
                if (NCPU > 1 && bound == 0 &&
                        BOUND.compareAndSet(this, 0, SEQ)) {
                    // 创建竞技场
                    arena = new Node[FULL + 2 << ASHIFT];
                }
            }
            // 2)启用了 arena
            else if (arena != null) {
                return null; // caller must reroute to arenaExchange
            // 3)slot 为空 && 未启用 arena
            } else {
                // 写入交换数据
                p.item = item;
                // 将 Node 写入 slot,成功则退出循环
                if (SLOT.compareAndSet(this, null, p)) {
                    break;
                }
                // 出现竞争,则重试
                p.item = null;
            }
        }

        // 等待释放
        int h = p.hash;
        // 计算截止时间
        final long end = timed ? System.nanoTime() + ns : 0L;
        // 计算自旋次数,多核 CPU 为 1024
        int spins = NCPU > 1 ? SPINS : 1;
        Object v;
        // 只要没有匹配的交换数据
        while ((v = p.match) == null) {
            // 1)自旋还未完成
            if (spins > 0) {
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0) {
                    h = SPINS | (int)t.getId();
                } else if (h < 0 && (--spins & (SPINS >>> 1) - 1) == 0) {
                    Thread.yield();
                }
            }
            // 2)slot 已经更新
            else if (slot != p) {
                spins = SPINS;
            /**
             * 3)线程未中断 && 未启用竞技场 && 不是超时模式;
             *  如果是超时模式,则计算剩余时间,当前还未超时
             */
            } else if (!t.isInterrupted() && arena == null &&
                    (!timed || (ns = end - System.nanoTime()) > 0L)) {
                // 写入驻留线程
                p.parked = t;
                // 如果 slot 未更新,没有线程来进行数据交换
                if (slot == p) {
                    // 1)阻塞等待
                    if (ns == 0L) {
                        LockSupport.park(this);
                    // 2)超时阻塞等待
                    } else {
                        LockSupport.parkNanos(this, ns);
                    }
                }
                // 线程释放后,清空 parked
                p.parked = null;
            }
            // 如果线程被中断或已经超时,则将 slot 清空
            else if (SLOT.compareAndSet(this, p, null)) {
                // 如果是超时,则返回 TIMED_OUT;线程中断,则返回 null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        // 清空 match
        MATCH.setRelease(p, null);
        // 清空 item
        p.item = null;
        p.hash = h;
        // 返回交换到的数据对象
        return v;
    }

    /**
     * Exchange function when arenas enabled. See above for explanation.
     *
     * @param item  需要交换的目标对象
     * @param timed 是否是超时模式
     * @param ns    超时的纳秒数
     * @return
     *  1)目标线程的数据对象
     *  2)null 线程被中断
     *  3)TIMED_OUT 交换超时
     */
    private final Object arenaExchange(Object item, boolean timed, long ns) {
        // 读取 arena
        final Node[] a = arena;
        // 读取数组长度
        final int alen = a.length;
        // 读取当前线程的参与者,初始值为 0
        final Node p = participant.get();
        for (int i = p.index;;) { // access slot at i
            int b, m, c;
            // 一般为 31
            int j = (i << ASHIFT) + (1 << ASHIFT) - 1;
            if (j < 0 || j >= alen) {
                j = alen - 1;
            }
            // 读取指定 slot 的 Node
            final Node q = (Node) AA.getAcquire(a, j);
            // 1)目标 slot 已经有线程在等待交换数据,则尝试清空 slot
            if (q != null && AA.compareAndSet(a, j, q, null)) {
                // 读取目标对象
                final Object v = q.item; // release
                // 写入交换对象
                q.match = item;
                final Thread w = q.parked;
                if (w != null) {
                    // 唤醒驻留线程
                    LockSupport.unpark(w);
                }
                // 返回交换到的值
                return v;
            // 2)目标索引 i 在有效索引范围内 && slot 为 null
            } else if (i <= (m = (b = bound) & MMASK) && q == null) {
                // 写入 item
                p.item = item; // offer
                // 写入节点
                if (AA.compareAndSet(a, j, null, p)) {
                    // 计算截止时间
                    final long end = timed && m == 0 ? System.nanoTime() + ns : 0L;
                    // 读取当前线程
                    final Thread t = Thread.currentThread(); // wait
                    // 读取自旋次数 1024
                    for (int h = p.hash, spins = SPINS;;) {
                        // 读取匹配数据
                        final Object v = p.match;
                        // 1)已经有线程将交换数据写入
                        if (v != null) {
                            MATCH.setRelease(p, null);
                            p.item = null; // clear for next use
                            p.hash = h;
                            return v;
                        // 2)自旋还未结束
                        } else if (spins > 0) {
                            h ^= h << 1;
                            h ^= h >>> 3;
                            h ^= h << 10; // xorshift
                            if (h == 0) {
                                h = SPINS | (int) t.getId();
                            } else if (h < 0 && // approx 50% true
                                    (--spins & (SPINS >>> 1) - 1) == 0) {
                                Thread.yield(); // two yields per wait
                            }
                        // 3)slot 已经更新
                        } else if (AA.getAcquire(a, j) != p) {
                            spins = SPINS; // releaser hasn‘t set match yet
                        // 4) 线程未中断、未超时
                        } else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {
                            // 写入驻留线程
                            p.parked = t; // minimize window
                            // 如果 slot 未更新,则线程被阻塞
                            if (AA.getAcquire(a, j) == p) {
                                if (ns == 0L) {
                                    LockSupport.park(this);
                                } else {
                                    LockSupport.parkNanos(this, ns);
                                }
                            }
                            p.parked = null;
                        // 5)slot 未更新 && 线程超时或中断,则清空 slot
                        } else if (AA.getAcquire(a, j) == p && AA.compareAndSet(a, j, p, null)) {
                            if (m != 0) {
                                BOUND.compareAndSet(this, b, b + SEQ - 1);
                            }
                            p.item = null;
                            p.hash = h;
                            i = p.index >>>= 1; // descend
                            // 线程被中断
                            if (Thread.interrupted()) {
                                return null;
                            }
                            // 线程超时
                            if (timed && m == 0 && ns <= 0L) {
                                return TIMED_OUT;
                            }
                            break; // expired; restart
                        }
                    }
                // 2)写入 slot 出现竞争
                } else {
                    p.item = null; // clear offer
                }
            } else {
                if (p.bound != b) { // stale; reset
                    p.bound = b;
                    p.collides = 0;
                    i = i != m || m == 0 ? m : m - 1;
                } else if ((c = p.collides) < m || m == FULL || !BOUND.compareAndSet(this, b, b + SEQ + 1)) {
                    p.collides = c + 1;
                    i = i == 0 ? m : i - 1; // cyclically traverse
                } else {
                    i = m + 1; // grow
                }
                p.index = i;
            }
        }
    }

原文地址:https://www.cnblogs.com/zhuxudong/p/10124099.html

时间: 2024-10-16 23:00:55

Exchanger 源码分析的相关文章

JDK源码分析之concurrent包(一) -- Executor架构

Java5新出的concurrent包中的API,是一些并发编程中实用的的工具类.在高并发场景下的使用非常广泛.笔者在这做了一个针对concurrent包中部分常用类的源码分析系列.本系列针对的读者是已经对并发包中的Executor框架和工具类有所了解并懂得如何使用的人群,如果对并发包还不了解的朋友,请先做些了解.网上对这方面的讲述有丰富的资源. 本篇博文是第一期,首先对Executor架构做一个概述.这里只简单介绍接口和类的继承.使用关系. 盗用一张类图来描述结构: 解析: Executor是

Dubbo 源码分析 - 服务导出全过程解析

1.服务导出过程 本篇文章,我们来研究一下 Dubbo 导出服务的过程.Dubbo 服务导出过程始于 Spring 容器发布刷新事件,Dubbo 在接收到事件后,会立即执行服务导出逻辑.整个逻辑大致可分为三个部分,第一是前置工作,主要用于检查参数,组装 URL.第二是导出服务,包含导出服务到本地 (JVM),和导出服务到远程两个过程.第三是向注册中心注册服务,用于服务发现.本篇文章将会对这三个部分代码进行详细的分析,在分析之前,我们先来了解一下服务的导出过程. Dubbo 支持两种服务导出方式,

TeamTalk源码分析之login_server

login_server是TeamTalk的登录服务器,负责分配一个负载较小的MsgServer给客户端使用,按照新版TeamTalk完整部署教程来配置的话,login_server的服务端口就是8080,客户端登录服务器地址配置如下(这里是win版本客户端): 1.login_server启动流程 login_server的启动是从login_server.cpp中的main函数开始的,login_server.cpp所在工程路径为server\src\login_server.下表是logi

Android触摸屏事件派发机制详解与源码分析二(ViewGroup篇)

1 背景 还记得前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>中关于透过源码继续进阶实例验证模块中存在的点击Button却触发了LinearLayout的事件疑惑吗?当时说了,在那一篇咱们只讨论View的触摸事件派发机制,这个疑惑留在了这一篇解释,也就是ViewGroup的事件派发机制. PS:阅读本篇前建议先查看前一篇<Android触摸屏事件派发机制详解与源码分析一(View篇)>,这一篇承接上一篇. 关于View与ViewGroup的区别在前一篇的A

HashMap与TreeMap源码分析

1. 引言     在红黑树--算法导论(15)中学习了红黑树的原理.本来打算自己来试着实现一下,然而在看了JDK(1.8.0)TreeMap的源码后恍然发现原来它就是利用红黑树实现的(很惭愧学了Java这么久,也写过一些小项目,也使用过TreeMap无数次,但到现在才明白它的实现原理).因此本着"不要重复造轮子"的思想,就用这篇博客来记录分析TreeMap源码的过程,也顺便瞅一瞅HashMap. 2. 继承结构 (1) 继承结构 下面是HashMap与TreeMap的继承结构: pu

Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7)【转】

原文地址:Linux内核源码分析--内核启动之(5)Image内核启动(rest_init函数)(Linux-3.0 ARMv7) 作者:tekkamanninja 转自:http://blog.chinaunix.net/uid-25909619-id-4938395.html 前面粗略分析start_kernel函数,此函数中基本上是对内存管理和各子系统的数据结构初始化.在内核初始化函数start_kernel执行到最后,就是调用rest_init函数,这个函数的主要使命就是创建并启动内核线

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三)

Solr4.8.0源码分析(22)之 SolrCloud的Recovery策略(三) 本文是SolrCloud的Recovery策略系列的第三篇文章,前面两篇主要介绍了Recovery的总体流程,以及PeerSync策略.本文以及后续的文章将重点介绍Replication策略.Replication策略不但可以在SolrCloud中起到leader到replica的数据同步,也可以在用多个单独的Solr来实现主从同步.本文先介绍在SolrCloud的leader到replica的数据同步,下一篇

zg手册 之 python2.7.7源码分析(4)-- pyc字节码文件

什么是字节码 python解释器在执行python脚本文件时,对文件中的python源代码进行编译,编译的结果就是byte code(字节码) python虚拟机执行编译好的字节码,完成程序的运行 python会为导入的模块创建字节码文件 字节码文件的创建过程 当a.py依赖b.py时,如在a.py中import b python先检查是否有b.pyc文件(字节码文件),如果有,并且修改时间比b.py晚,就直接调用b.pyc 否则编译b.py生成b.pyc,然后加载新生成的字节码文件 字节码对象