Java并发编程原理与实战二十九:Exchanger

一、简介

前面三篇博客分别介绍了CyclicBarrier、CountDownLatch、Semaphore,现在介绍并发工具类中的最后一个Exchange。Exchange是最简单的也是最复杂的,简单在于API非常简单,就一个构造方法和两个exchange()方法,最复杂在于它的实现是最复杂的。
在API是这么介绍的:可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给 exchange 方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。Exchanger 可能被视为 SynchronousQueue 的双向形式。Exchanger 可能在应用程序(比如遗传算法和管道设计)中很有用。
Exchanger,它允许在并发任务之间交换数据。具体来说,Exchanger类允许在两个线程之间定义同步点。当两个线程都到达同步点时,他们交换数据结构,因此第一个线程的数据结构进入到第二个线程中,第二个线程的数据结构进入到第一个线程中。

二、例子

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;

/**
 * @Title: ExchangerTest
 * @Description: Test class for Exchanger
 * @Company: CSAIR
 * @Author: lixuanbin
 * @Creation: 2014年12月14日
 * @Version:1.0
 */
public class ExchangerTest {
    protected static final Logger log = Logger.getLogger(ExchangerTest.class);
    private static volatile boolean isDone = false;

    static class ExchangerProducer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 1;
        ExchangerProducer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                for (int i = 1; i <= 3; i++) {
                    try {
                        TimeUnit.SECONDS.sleep(1);
                        data = i;
                        System.out.println("producer before: " + data);
                        data = exchanger.exchange(data);
                        System.out.println("producer after: " + data);
                    } catch (InterruptedException e) {
                        log.error(e, e);
                    }
                }
                isDone = true;
            }
        }
    }

    static class ExchangerConsumer implements Runnable {
        private Exchanger<Integer> exchanger;
        private static int data = 0;
        ExchangerConsumer(Exchanger<Integer> exchanger) {
            this.exchanger = exchanger;
        }

        @Override
        public void run() {
            while (!Thread.interrupted() && !isDone) {
                data = 0;
                System.out.println("consumer before : " + data);
                try {
                    TimeUnit.SECONDS.sleep(1);
                    data = exchanger.exchange(data);
                } catch (InterruptedException e) {
                    log.error(e, e);
                }
                System.out.println("consumer after : " + data);
            }
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        ExecutorService exec = Executors.newCachedThreadPool();
        Exchanger<Integer> exchanger = new Exchanger<Integer>();
        ExchangerProducer producer = new ExchangerProducer(exchanger);
        ExchangerConsumer consumer = new ExchangerConsumer(exchanger);
        exec.execute(producer);
        exec.execute(consumer);
        exec.shutdown();
        try {
            exec.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.error(e, e);
        }
    }
}

这大致可以看作是一个简易的生产者消费者模型,有两个任务类,一个递增地产生整数,一个产生整数0,然后双方进行交易。每次交易前的生产者和每次交易后的消费者都会sleep 1秒来模拟数据处理的消耗,并在交易前后把整数值打印到控制台以便检测结果。在这个例子里交易循环只执行三次,采用一个volatile boolean来控制交易双方线程的退出。
   我们来看看程序的输出:

consumer before : 0
producer before: 1
consumer after : 1
producer after: 0
consumer before : 0
producer before: 2
producer after: 0
consumer after : 2
consumer before : 0
producer before: 3
producer after: 0
consumer after : 3

输出结果验证了以下两件事情:
exchange方法真的帮一对线程交换了数据;
exchange方法真的会阻塞调用方线程直至另一方线程参与交易。
那么在中断和超时两种情况下程序的运行表现会是怎样呢?作为一个小练习,有兴趣的观众可以设想并编写测试用例覆盖验证之。接下来谈谈最近我在生产场景中对Exchanger的应用。

三、实战场景

1.问题描述
   最近接到外部项目组向我组提出的接口需求,需要查询我们业务办理量的统计情况。我们系统目前的情况是,有一个日增长十多万、总数据量为千万级别的业务办理明细表(xxx_info),每人次的业务办理结果会实时写入其中。以往对外提供的业务统计接口是在每次被调用时候在明细表中执行SQL查询(select、count、where、group by等),响应时间很长,对原生产业务的使用也有很大的影响。于是我决定趁着这次新增接口的上线机会对系统进行优化。
2.优化思路
   首先是在明细表之外再建立一个数据统计(xxx_statistics)表,考虑到目前数据库的压力以及公司内部质管流控等因素,暂没有分库存放,仍旧与原明细表放在同一个库。再设置一个定时任务于每日凌晨对明细表进行查询、过滤、统计、排序等操作,把统计结果插入到统计表中。然后对外暴露统计接口查询统计报表。现在的设计与原来的实现相比,虽然牺牲了统计表所占用的少量额外的存储空间(每日新增的十来万条业务办理明细记录经过处理最终会变成几百条统计表的记录),但是却能把select、count这样耗时的数据统计操作放到凌晨时段执行以避开白天的业务办理高峰,分表处理能够大幅降低对生产业务明细表的性能影响,而对外提供的统计接口的查询速度也将得到几个数量级的提升。当然,还有一个缺点是,不能实时提供当天的统计数据,不过这也是双方可以接受的。
3.设计实现
   设计一个定时任务,每日凌晨执行。在定时任务中启动两个线程,一个线程负责对业务明细表(xxx_info)进行查询统计,把统计的结果放置在内存缓冲区,另一个线程负责读取缓冲区中的统计结果并插入到业务统计表(xxx_statistics)中。
   亲,这样的场景是不是听起来很有感觉?没错!两个线程在内存中批量交换数据,这个事情我们可以使用Exchanger去做!我们马上来看看代码如何实现。

生产者线程:

class ExchangerProducer implements Runnable {
    private Exchanger<Set<XXXStatistics>> exchanger;
    private Set<XXXStatistics> holder;
    private Date fltDate;
    private int threshold;

    ExchangerProducer(Exchanger<Set<XXXStatistics>> exchanger,
            Set<XXXStatistics> holder, Date fltDate, int threshold) {
        this.exchanger = exchanger;
        this.holder = holder;
        this.fltDate = fltDate;
        this.threshold = threshold;
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted() && !isDone) {
                List<XXXStatistics> temp1 = null;
                List<XXXStatistics> temp11 = null;
                for (int i = 0; i < allCities.size(); i++) {
                    try {
                        temp1 = xxxDao
                                .findStatistics1(
                                        fltDate, allCities.get(i));
                        temp11 = xxxDao
                                .findStatistics2(
                                        fltDate, allCities.get(i),
                                        internationalList);
                        if (temp1 != null && !temp1.isEmpty()) {
                            calculationCounter.addAndGet(temp1.size());
                            if (temp11 != null && !temp11.isEmpty()) {
                                // merge two lists into temp1
                                mergeLists(temp1, temp11);
                                temp11.clear();
                                temp11 = null;
                            }
                            // merge temp1 into holder set
                            mergeListToSet(holder, temp1);
                            temp1.clear();
                            temp1 = null;
                        }
                    } catch (Exception e) {
                        log.error(e, e);
                    }
                    // Insert every ${threshold} or the last into database.
                    if (holder.size() >= threshold
                            || i == (allCities.size() - 1)) {
                        log.info("data collected: \n" + holder);
                        holder = exchanger.exchange(holder);
                        log.info("data submitted");
                    }
                }
                // all cities are calculated
                isDone = true;
            }
            log.info("calculation job done, calculated: "
                    + calculationCounter.get());
        } catch (InterruptedException e) {
            log.error(e, e);
        }
        exchanger = null;
        holder.clear();
        holder = null;
        fltDate = null;
    }
}

代码说明:
threshold:缓冲区的容量阀值;
allCities:城市列表,迭代这个列表作为入参来执行查询统计;
XXXStatistics:统计数据封装实体类,实现了Serializable和Comparable接口,覆写equals和compareTo方法,以利用TreeSet提供的去重和排序处理;
isDone:volatile boolean,标识统计任务是否完成;
holder:TreeSet<XXXStatistics>,存放统计结果的内存缓冲区,容量达到阀值后提交给Exchanger执行exchange操作;
dao.findStatistics1,dao.findStatistics2:简化的数据库查询统计操作,此处仅供示意;
calculationCounter:AtomicInteger,标记生产端所提交的记录总数;
mergeLists,mergeListToSet:内部私有工具方法,把dao查询返回的列表合并到holder中;

消费者线程:

class ExchangerConsumer implements Runnable {
    private Exchanger<Set<XXXStatistics>> exchanger;
    private Set<XXXStatistics> holder;

    ExchangerConsumer(Exchanger<Set<XXXStatistics>> exchanger,
            Set<XXXStatistics> holder) {
        this.exchanger = exchanger;
        this.holder = holder;
    }

    @Override
    public void run() {
        try {
            List<XXXStatistics> tempList;
            while (!Thread.interrupted() && !isDone) {
                holder = exchanger.exchange(holder);
                log.info("got data: \n" + holder);
                if (holder != null && !holder.isEmpty()) {
                    try {
                        // insert data into database
                        tempList = convertSetToList(holder);
                        insertionCounter.addAndGet(xxxDao
                                .batchInsertXXXStatistics(tempList));
                        tempList.clear();
                        tempList = null;
                    } catch (Exception e) {
                        log.error(e, e);
                    }
                    // clear the set
                    holder.clear();
                } else {
                    log.info("wtf, got an empty list");
                }
                log.info("data processed");
            }
            log.info("insert job done, inserted: " + insertionCounter.get());
        } catch (InterruptedException e) {
            log.error(e, e);
        }
        exchanger = null;
        holder.clear();
        holder = null;
    }
}

代码说明:

  • convertSetToList:由于dao接口的限制,需把交换得到的Set转换为List;
  • batchInsertXXXStatistics:使用jdbc4的batch update而实现的批量插入dao接口;
  • insertionCounter:AtomicInteger,标记消费端插入成功的记录总数;

调度器代码:

public boolean calculateStatistics(Date fltDate) {
    // initialization
    calculationCounter.set(0);
    insertionCounter.set(0);
    isDone = false;
    exec = Executors.newCachedThreadPool();
    Set<XXXStatistics> producerSet = new TreeSet<XXXStatistics>();
    Set<XXXStatistics> consumerSet = new TreeSet<XXXStatistics>();
    Exchanger<Set<XXXStatistics>> xc = new Exchanger<Set<XXXStatistics>>();
    ExchangerProducer producer = new ExchangerProducer(xc, producerSet,
            fltDate, threshold);
    ExchangerConsumer consumer = new ExchangerConsumer(xc, consumerSet);

    // execution
    exec.execute(producer);
    exec.execute(consumer);
    exec.shutdown();
    boolean isJobDone = false;
    try {
        // wait for termination
        isJobDone = exec.awaitTermination(calculationTimeoutMinutes,
                TimeUnit.MINUTES);
    } catch (InterruptedException e) {
        log.error(e, e);
    }
    if (!isJobDone) {
        // force shutdown
        exec.shutdownNow();
        log.error("time elapsed for "
                + calculationTimeoutMinutes
                + " minutes, but still not finished yet, shut it down anyway.");
    }

    // clean up
    exec = null;
    producerSet.clear();
    producerSet = null;
    consumerSet.clear();
    consumerSet = null;
    xc = null;
    producer = null;
    consumer = null;
    System.gc();

    // return the result
    if (isJobDone && calculationCounter.get() > 0
            && calculationCounter.get() == insertionCounter.get()) {
        return true;
    }
    return false;
}

代码说明:
   调度器的代码就四个步骤:初始化、提交任务并等候处理结果、清理、返回。初始化阶段使用了jdk提供的线程池提交生产者和消费者任务,设置了最长等候时间calculationTimeoutMinutes,如果调度器线程被中断或者任务执行超时,awaitTermination会返回false,此时就强行关闭线程池并记录到日志。统计操作每日凌晨执行一次,所以在任务退出前的清理阶段建议jvm执行gc以尽早释放计算时所产生的垃圾对象。在结果返回阶段,如果查询统计出来的记录条数和插入成功的条数相等则返回true,否则返回false。

4.小结
   在这个案例中,使用Exchanger进行批量的双向数据交换可谓恰如其分:生产者在执行新的查询统计任务填入数据到缓冲区的同时,消费者正在批量插入生产者换入的上一次产生的数据,系统的吞吐量得到平滑的提升;计算复杂度、内存消耗、系统性能也能通过相关的参数设置而得到有效的控制(在消费端也可以对holder进行再次分割以控制每次批插入的大小,建议参阅数据库厂商以及数据库驱动包的说明文档以确定jdbc的最优batch update size);代码的实现也很简洁易懂。这些优点,是采用有界阻塞队列所难以达到的。
   程序的输出结果与业务紧密相关,就不打印出来了。可以肯定的是,经过了一段时间的摸索调优,内存消耗、执行速度和处理结果还是比较满意的。

四、源码分析

可以参考:http://brokendreams.iteye.com/blog/2253956

其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

    1. 我先到一个叫做Slot的交易场所交易,发现你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我就进入第5步。
    2. 我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
    3. 我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
    4. 你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
    5. 我跑去喊管理员,尼玛,就一个坑交易个毛啊,然后管理在一个更加开阔的地方开辟了好多个单间,然后我就挨个来看每个单间是否有人。如果有人我就问他是否可以交易,如果回应了我,那我就进入第2步。如果我没有人,那我就占着这个单间等其他人来交易,进入第4步。 
      6.如果我尝试了几次都没有成功,我就会认为,是不是我TM选的这个单间风水不好?不行,得换个地儿继续(从头开始);如果我尝试了多次发现还没有成功,怒了,把管理员喊来:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

原文地址:https://www.cnblogs.com/pony1223/p/9485124.html

时间: 2024-07-29 18:13:28

Java并发编程原理与实战二十九:Exchanger的相关文章

Java并发编程原理与实战二十:线程安全性问题简单总结

一.出现线程安全性问题的条件 •在多线程的环境下 •必须有共享资源 •对共享资源进行非原子性操作 二.解决线程安全性问题的途径 •synchronized (偏向锁,轻量级锁,重量级锁) •volatile •JDK提供的原子类 •使用Lock(共享锁,排它锁) 三.认识的“*锁” •偏向锁 Java偏向锁(Biased Locking)是Java6引入的一项多线程优化. 偏向锁,顾名思义,它会偏向于第一个访问锁的线程,如果在运行过程中,同步锁只有一个线程访问,不存在多线程争用的情况,则线程是不

Java并发编程原理与实战二十六:闭锁 CountDownLatch

关于闭锁 CountDownLatch 之前在网上看到过一篇举例非常形象的例子,但不记得是出自哪里了,所以这里就当自己再重新写一篇吧: 例子如下: 我们每天起早贪黑的上班,父母每天也要上班,有一天定了一个饭店,一家人一起吃个饭,通知大家下班去饭店集合. 假设:3个人在不同的地方上班,必须等到3个人到场才能吃饭,用程序如何实现呢? 方式一: public class Test1 { /** * 模拟爸爸去饭店 */ public static void fatherToRes() { System

Java并发编程原理与实战四十五:问题定位总结

背景   “线下没问题的”. “代码不可能有问题 是系统原因”.“能在线上远程debug么”    线上问题不同于开发期间的bug,与运行时环境.压力.并发情况.具体的业务相关.对于线上的问题利用线上环境可用的工具,收集必要信息 对定位问题十分重要.    对于导致问题的bug.资源瓶颈很难直观取得数据,需要根据资源使用数据.日志等信息推测问题根源.并且疑难问题的定位通常需要使用不同的方法追根溯源.    这篇wiki我对自己使用过的工具做了整理,并分享一些案例. 1.  常见问题1.1 可用性

Java并发编程原理与实战四十四:final域的内存语义

一.final域的重排序规则 对于final域,编译器和处理器要遵循两个重拍序规则: 1.在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序. 2.初次读一个包含final域的对象的应用,与随后初次读这个final域,这两个操作之间不能重排序 下面通过一个示例来分别说明这两个规则: public class FinalTest { int i;//普通变量 final int j; static FinalTest obj; publi

Java并发编程原理与实战视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是

Java并发编程原理与实战

Java并发编程原理与实战网盘地址:https://pan.baidu.com/s/1c3mpC7A 密码: pe62备用地址(腾讯微云):https://share.weiyun.com/11ea938c7ad43783a934ed1d492eed8d 密码:ogHukS 原文地址:http://blog.51cto.com/13406637/2071116

Java并发编程原理与实战八:产生线程安全性问题原因(javap字节码分析)

前面我们说到多线程带来的风险,其中一个很重要的就是安全性,因为其重要性因此,放到本章来进行讲解,那么线程安全性问题产生的原因,我们这节将从底层字节码来进行分析. 一.问题引出 先看一段代码 package com.roocon.thread.t3; public class Sequence { private int value; public int getNext(){ return value++; } public static void main(String[] args) { S

Java并发编程原理与实战十九:AQS 剖析

一.引言在JDK1.5之前,一般是靠synchronized关键字来实现线程对共享变量的互斥访问.synchronized是在字节码上加指令,依赖于底层操作系统的Mutex Lock实现.而从JDK1.5以后java界的一位大神—— Doug Lea 开发了AbstractQueuedSynchronizer(AQS)组件,使用原生java代码实现了synchronized语义.换句话说,Doug Lea没有使用更“高级”的机器指令,也不依靠JDK编译时的特殊处理,仅用一个普普通通的类就完成了代

Java并发编程原理与实战三十七:线程池的原理与使用

一.简介 线程池在我们的高并发环境下,实际应用是非常多的!!适用频率非常高! 有过使用过Executors框架的朋友,可能不太知道底层的实现,这里就是讲Executors是由ThreadPoolExecutor实现的.好的,让我们来看看ThreadPollExcutor是怎样实现的呢? 如果你想了解ThreadPoolExecutor的话.可以先从它的构造方法看起. ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,