线程基础:JDK1.5+(10)——线程新特性(下)

(接上文《线程基础:JDK1.5+(9)——线程新特性(中)》)

3-4、CountDownLatch:同步器

上文中我们主要讲解了JDK1.5+中提供的一个重要工具:Semaphore信号量,并且用这个工具第一次实现了“100米赛跑”的需求。在第一次的实现中,我们还运用了“线程专栏”中已介绍的多个知识点,包括锁、线程池、队列、Callable接口等。

但实际上第一次实现的“100米赛跑”的需求,离我们真正的需求还有一定的距离:在我们第一次实现时,选手是否可以跑步,完全取决于“是否有空闲的赛道”。但是原始需求中特别要求对所有的选手进行分组。特别是初赛时(因为决赛时就只有一组了):最多5名选手一组,每组选手需要一起进行跑步

也就是说,选手是否可以跑步,不但取决于是否有空闲的赛道,还取决于当前选手所属组的其他选手的准备情况。只有当跑道有空余,并且该组其它选手也准备完毕,才能一起开始跑步(这样才能称为赛跑嘛)。

在JKD1.5+环境中,Doug Lea和他的团队为我们提供了可以很好实现这个要求的工具类:CountDownLatch和CyclicBarrier。我们首先介绍CountDownLatch的基本使用方式:

3-4-1、CountDownLatch基本使用

CountDownLatch是一个同步计数器,能够保证在其他线程完成某一个业务操作前,当前线程一直处于等待/阻塞状态。具体来说,这个计数器将会从给定的某一个数值count开始,通过countDown()方法的调用进行倒数。当执行某一次countDown()操作后,计数器的count数值等于0,所有调用了await()方法的线程,就解除等待/阻塞状态继续执行。我们来看一段简单的示例代码:

package test.thread.countDownLatch;

import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

public class TestCountDownLatch {
    /**
     * 日志
     */
    private static Log LOGGER = LogFactory.getLog(TestCountDownLatch.class);

    static {
        BasicConfigurator.configure();
    }

    public static void main(String[] args) throws Throwable {
        // 同步计数器从5开始计数
        final CountDownLatch countDownLatch = new CountDownLatch(5);

        // 启动子线程,处理“其他”业务
        for(int index = 0 ; index < 5 ; index++) {
            Thread childThread = new Thread() {
                @Override
                public void run() {
                    //等待,以便模型业务处理过程消耗的时间
                    synchronized (this) {
                        try {
                            this.wait(1000);
                        } catch (InterruptedException e) {
                            TestCountDownLatch.LOGGER.error(e.getMessage(), e);
                        }
                    }

                    // 完成业务处理过程,计数器-1
                    long threadid = Thread.currentThread().getId();
                    TestCountDownLatch.LOGGER.info("子线程(" + threadid + ")执行完成!");
                    countDownLatch.countDown();
                }
            };
            childThread.start();
        }

        // 等待所有子线程的业务都处理完成(计数器的count为0时)
        countDownLatch.await();
        TestCountDownLatch.LOGGER.info("所有子线程的处理都完了,主线程继续执行...");
    }
}

以下是可能的执行结果(因为每次执行的效果可能有所区别):

1 [Thread-3] INFO test.thread.countDownLatch.TestCountDownLatch  - 子线程(12)执行完成!
1 [Thread-4] INFO test.thread.countDownLatch.TestCountDownLatch  - 子线程(13)执行完成!
1 [Thread-1] INFO test.thread.countDownLatch.TestCountDownLatch  - 子线程(10)执行完成!
1 [Thread-2] INFO test.thread.countDownLatch.TestCountDownLatch  - 子线程(11)执行完成!
1 [Thread-0] INFO test.thread.countDownLatch.TestCountDownLatch  - 子线程(9)执行完成!
9 [main] INFO test.thread.countDownLatch.TestCountDownLatch  - 所有子线程的处理都完了,主线程继续执行...

以上代码片段和执行结果说明了CountDownLatch的最简单使用,CountDownLatch同步计数器从5开始计数,分别对应5个子线程的业务完成情况。每当一个子线程业务完成后,CountDownLatch同步计数器就countDown一次。直到count等于0时,这时主线程上面的await()方法解除等待/阻塞状态,继续执行。这里要注意一下:

  • 不是说只能有一次await方法的调用,而是同一时间可以有多个线程调用了await方法。只要在count还不等于0时,某个线程调用了await方法,它都会进入等待/阻塞状态。
  • 在调用await时,如果CountDownLatch同步计数器的count已经等于0了,则await方法不会进入等待/阻塞状态。
  • await调用和countDown调用不是说必须处于不同线程。同一线程中,您可以先调用countDown然后再调用await进入等待/阻塞。CountDownLatch同步计数器会始终遵循上两条工作原则。
  • 在使用CountDownLatch同步计数器时,您无需考虑脏数据的问题。CountDownLatch同步计数器是线程安全的。

3-4-2、CountDownLatch在“100米赛跑”中的应用

很明显CountDownLatch在“100米赛跑”中的使用目标是:“等待这组所有的选手全部上跑道”,然后一起开始跑步。所以,CountDownLatch的计数器,需要在选手获得“跑道”资源后,马上countDown一次。之后,获得“跑道”资源的选手要立刻调用await进入等待状态,等待其他选手也获得跑道资源。我们给这整个处理逻辑去一个名字叫做:“发令枪”,代码片段如下:

......
/**
 * 选手所关注的发令枪
 */
private CountDownLatch startingGun;
......

public Result call() throws Exception {
    ......
    try {
        // 申请上跑道(这个没有变化)
        this.runway.acquire();
        // 等待可能的发令枪
        if(this.startingGun != null) {
            // 执行到这里,说明这个选手已经拿到了跑到资源;
            // 向发令枪表达“我已准备好”,即计数器-1
            this.startingGun.countDown();
            System.out.println("选手" + name + "[" + number + "],已登上跑道,等待发令!");
            // 接下来进入“等待”状态
            // 以便等这个发令枪所管理的所有选手上跑道了,再一起跑步
            this.startingGun.await();
            System.out.println("选手" + name + "[" + number + "],跑!");
        }

        // 开始正式跑步
        return this.result = this.doRun();
    } catch(Exception e) {
        e.printStackTrace(System.out);
    } finally {
        // 都要进入初赛结果排序(中途退赛的成绩就为0)
        this.runway.release();
        System.out.println("选手" + name + "[" + number + "],比赛正常完成!");
    }
    ......
}
......

那么如何进行分组呢?我们可以让一组选手,关注同一把“发令枪”(有多少把发令枪,就有多少个组、就有多少个CountDownLatch对象)。如下图所示:

另外,分组时一定要考虑一个问题:由于报名人数不一定是5的整数倍,所以最后一组不一定有5个人。考虑到实现的代码片段如下:

......
// 这是发令枪
CountDownLatch startingGun = null;
......
// signupPlayers 是报名队列
// runwayCount 是跑道数量
for (int index = 0 ; index < this.signupPlayers.size() ; index++) {
    /*
     * 这是发令枪,发令枪的使用规则是:
     * 1、最多5位选手听从一把发令枪的命令(因为跑道最多就是5条)
     * 2、如果剩余的没有比赛的选手不足5人,则这些人听从一把发令枪的命令
     * */
    if(index % runwayCount == 0) {
        startingGun =  this.signupPlayers.size() - index > runwayCount?
            new CountDownLatch(runwayCount):
            new CountDownLatch(this.signupPlayers.size() - index);
    }

    // 获取这个选手(signupPlayers是报名队列)
    Player player = this.signupPlayers.get(index);
    // 设置选手关注的发令枪
    player.setStartingGun(startingGun);
    // 提交给裁判组协调执行
    Future<Result> future = refereeService.submit(player);

    // 开始一个选手的跑步动作状态监控
    new FutureThread(future, player, this.preliminaries).start();
}
......

这样我们就完成了对第一次“100米赛跑”实现代码的优化:增加了选手分组控制功能。以下给出相对完整的代码。注意,由于Result类没有做任何更改,所以就不需要赘述了。

  • Player选手类的的更改,主要是加入了“关注的发令枪”的关注
public class Player implements Callable<Result> , Comparable<Player>{
    ......
    /**
     * 跑道
     */
    private Semaphore runway;

    /**
     * 选手所关注的发令枪
     */
    private CountDownLatch startingGun;
    ......

    /**
     * @param startingGun the startingGun to set
     */
    public void setStartingGun(CountDownLatch startingGun) {
        this.startingGun = startingGun;
    }

    /**
     * @return the startingGun
     */
    public CountDownLatch getStartingGun() {
        return startingGun;
    }

    ......
    /* (non-Javadoc)
     * @see java.util.concurrent.Callable#call()
     */
    @Override
    public Result call() throws Exception {
        this.result = null;
        try {
            // 申请上跑道
            this.runway.acquire();
            // 等待可能的发令枪
            if(this.startingGun != null) {
                // 执行到这里,说明这个选手已经拿到了跑到资源;
                // 首先向发令枪表达“我已准备好”,即计数器-1
                this.startingGun.countDown();
                System.out.println("选手" + name + "[" + number + "],已登上跑道,等待发令枪!");
                // 接下来进入“等待”状态
                // 以便这个发令枪所管理的所有选手登上跑道了,再一起跑步
                this.startingGun.await();
                System.out.println("选手" + name + "[" + number + "],跑!");
            }

            // 开始正式跑步
            return this.result = this.doRun();
        } catch(Exception e) {
            e.printStackTrace(System.out);
        } finally {
            // 都要进入初赛结果排序(中途退赛的成绩就为0)
            this.runway.release();
            System.out.println("选手" + name + "[" + number + "],比赛正常完成!");
        }

        // 如果执行到这里,说明异常发生了
        this.result = new Result(Float.MAX_VALUE);
        return this.result;
    }

    /**
     * 开始跑步(跑步的处理过程没有变化)
     * @return
     * @throws Exception
     */
    private Result doRun()  throws Exception {
        /*
         * 为了表现一个选手每一次跑步都有不同的状态(但是都不会低于其最低状态),
         * 所以每一次跑步,系统都会为这个选手分配一个即时速度。
         *
         * 这个即时速度不会低于其最小速度,但是也不会高于 14米/秒(否则就是‘超人’咯)
         * */
        // 生成即时速度
        float presentSpeed = 0f;
        presentSpeed = this.minSpeed * (1.0f + new Random().nextFloat());
        if(presentSpeed > 14f) {
            presentSpeed = 14f;
        }

        // 计算跑步结果(BigDecimal的使用可自行查阅资料)
        BigDecimal calculation =  new BigDecimal(100).divide(new BigDecimal(presentSpeed) , 3, RoundingMode.HALF_UP);
        float presentTime = calculation.floatValue();

        // 让线程等待presentSpeed的时间,模拟该选手跑步的过程
        synchronized (this) {
            this.wait((long)(presentTime * 1000f));
        }

        // 返回跑步结果
        this.result = new Result(presentTime);
        return result;
    }

    ......
}
  • TwoTrack比赛主控制类,主要加入了对“选手分组”的支持
/**
 * 这是第二个比赛程序。
 * @author yinwenjie
 *
 */
public class TwoTrack {
    ......
    public void track() {
        /*
         * 赛跑分为以下几个阶段进行;
         *
         * 1、报名
         * 2、初赛,11名选手,分成两组,每组最多5名选手。
         * 因为场地只有5条赛道,只有拿到进场许可的才能使用赛道,进行比赛
         *
         * 3、决赛:初赛结果将被写入到一个队列中进行排序,只有成绩最好的前五名选手,可以参加决赛。
         *
         * 4、决赛结果的前三名将分别作为冠亚季军被公布出来
         * */

        //1、================报名
        // 这就是跑道,需求上说了只有5条跑道,所以只有5个permits。
        Semaphore runway = new Semaphore(5);
        this.signupPlayers.clear();
        for(int index = 0 ; index < TwoTrack.PLAYERNAMES.length ; ) {
            Player player = new Player(TwoTrack.PLAYERNAMES[index], ++index , runway);
            this.signupPlayers.add(player);
        }

        //2、================进行初赛
        // 这是赛道的数量
        int runwayCount = 5;
        // 这是裁判
        ExecutorService refereeService = Executors.newFixedThreadPool(5);
        // 这是发令枪
        CountDownLatch startingGun = null;
        for (int index = 0 ; index < this.signupPlayers.size() ; index++) {
            /*
             * 这是发令枪,发令枪的使用规则是:
             * 1、最多5位选手听从一把发令枪的命令(因为跑道最多就是5条)
             * 2、如果剩余的没有比赛的选手不足5人,则这些人听从一把发令枪的命令
             * */
            if(index % runwayCount == 0) {
                startingGun =  this.signupPlayers.size() - index > runwayCount?
                    new CountDownLatch(runwayCount):
                    new CountDownLatch(this.signupPlayers.size() - index);
            }

            // 获取这个选手
            Player player = this.signupPlayers.get(index);
            // 设置选手关注的发令枪
            player.setStartingGun(startingGun);
            // 提交给裁判组准备执行
            Future<Result> future = refereeService.submit(player);

            // 开始一个选手的跑步动作状态监控
            new FutureThread(future, player, this.preliminaries).start();
        }
        //! 只有当PLAYERNAMES.length位选手的成绩都产生了,才能进入决赛,这很重要
        while(this.preliminaries.size() < TwoTrack.PLAYERNAMES.length) {
            try {
                synchronized (this.preliminaries) {
                    this.preliminaries.wait();
                }
            } catch(InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }

        // 3、============决赛(只有初赛结果的前5名可以参见)
        // 决赛的发令枪
        startingGun = new CountDownLatch(5);
        for(int index = 0 ; index < 5 ; index++) {
            Player player = this.preliminaries.poll();
            // 重新设置选手关注的发令枪
            player.setStartingGun(startingGun);
            // 提交给裁判组准备执行
            Future<Result> future = refereeService.submit(player);

            // 开始一个选手的跑步动作状态监控
            new FutureThread(future, player, this.finals).start();
        }
        //! 只有当5位选手的决赛成绩都产生了,才能到下一步:公布成绩
        while(this.finals.size() < 5) {
            try {
                synchronized (this.finals) {
                    this.finals.wait();
                }
            } catch(InterruptedException e) {
                e.printStackTrace(System.out);
            }
        }

        // 4、============公布决赛成绩(前三名)
        for(int index = 0 ; index < 3 ; index++) {
            Player player = this.finals.poll();
            switch (index) {
            case 0:
                System.out.println("第一名:"  + player.getName() + "[" + player.getNumber() + "],成绩:" + player.getResult().getTime() + "秒");
                break;
            case 1:
                System.out.println("第二名:"  + player.getName() + "[" + player.getNumber() + "],成绩:" + player.getResult().getTime() + "秒");
                break;
            case 2:
                System.out.println("第三名:"  + player.getName() + "[" + player.getNumber() + "],成绩:" + player.getResult().getTime() + "秒");
                break;
            default:
                break;
            }
        }
    }

    ......
    //其他诸如FutureThread、main函数的代码都没有变化
}

3-4-3、相似工具类CyclicBarrier

在JDK1.5+中还有一个和CountDownLatch类似的同步计数工具:CyclicBarrier。不同的是CyclicBarrier的计数是循环进行的,而且也不需要向CountDownLatch那样显示的调用countDown进行减一操作。

如何理解CyclicBarrier计数器的循环工作方式呢?我们先来看看一个比较简单的示例代码:

public class TestCyclicBarrier {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日志
     */
    private static Log LOGGER = LogFactory.getLog(TestCyclicBarrier.class);

    public static void main(String[] args) throws Throwable {
        // 同步计数器的技术周期为3
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

        // 启动子线程,处理“其他”业务
        for(int index = 0 ; index < 5 ; index++) {
            Thread childThread = new Thread() {
                @Override
                public void run() {
                    // 可获得设置的屏障数值
                    // int parties = cyclicBarrier.getParties();
                    // 可获取当前已经进入等待状态的任务数量
                    // int numberWaiting = cyclicBarrier.getNumberWaiting();
                    TestCyclicBarrier.LOGGER.info("本线程已准备好处理业务......");
                    try {
                        cyclicBarrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        TestCyclicBarrier.LOGGER.error(e.getMessage() , e);
                    }
                    TestCyclicBarrier.LOGGER.info("开始处理业务......");
                }
            };
            childThread.start();
        }
    }
}

我们可以用下图表示以上代码的工作过程:

在上图中,CyclicBarrier的parties屏障设置为3,其意义是只要有通过CyclicBarrier的await方法进入阻塞等待的线程数量达到了3,则CyclicBarrier就解除这些线程的阻塞状态让他们可以继续执行。所以可以理解为CyclicBarrier的计数功能是可重复使用的,当等待的线程数量达到了设置的屏障值就放行这些线程

3-4-4、使用CyclicBarrier改写“比赛”

下面我们将“比赛”中使用CountDownLatch实现的发令枪改写成使用CyclicBarrier来实现。改写发令枪不会使发令枪的工作职责发生任何变化,所以改写量是比较小的。另外由于这个小节中我们已经给出了很多代码了,为了节约篇幅这里只给出最小化的代码片段。

  • Player选手类中,关于发令枪的定义要修改:
......
/**
 * 选手所关注的发令枪
 */
private CyclicBarrier startingGun;

/**
 * @param startingGun the startingGun to set
 */
public void setStartingGun(CyclicBarrier startingGun) {
    this.startingGun = startingGun;
}

/**
 * @return the startingGun
 */
public CyclicBarrier getStartingGun() {
    return startingGun;
}
......
  • Player选手类中的发令枪使用部分需要改写。使用CyclicBarrier后就不需要显示调用countDown()方法了:
......
// 申请上跑道
this.runway.acquire();
// 等待可能的发令枪
if(this.startingGun != null) {
    // 执行到这里,说明这个选手已经拿到了跑到资源;
    System.out.println("选手" + name + "[" + number + "],已登上跑道,等待发令枪!");
    // 接下来进入“等待”状态
    // 以便这个发令枪所管理的所有选手登上跑道了,再一起跑步
    this.startingGun.await();
    System.out.println("选手" + name + "[" + number + "],跑!");
}

// 开始正式跑步
return this.result = this.doRun();
......
  • TwoTrack主操作类中,关于发令枪的定义要进行变更:从CountDownLatch变成CyclicBarrier:
......
// 这是发令枪
CyclicBarrier startingGun = null;
......
  • TwoTrack主操作类中,根据条件决定CyclicBarrier中parties屏障值的代码业务要进行调整。从之前确定CountDownLatch计数初值变化而来:
......
if(index % runwayCount == 0) {
    startingGun =  this.signupPlayers.size() - index > runwayCount?
        new CyclicBarrier(runwayCount):
        new CyclicBarrier(this.signupPlayers.size() - index);
}
......

这里我们就不再赘述代码的工作效果了,因为工作效果不会有任何变化。

4、后续内容

下一篇文章开始,我们开始讲解线程间通讯。

时间: 2024-10-10 13:49:22

线程基础:JDK1.5+(10)——线程新特性(下)的相关文章

Java基础:JDK1.5后的新特性:自动拆装箱,以及注意事项

首先来看一段代码: 1 Integer x = new Integer(4); 2 Integer y = 4; 在JDK1.5版本后,以上两行代码都能编译通过,那是因为JDK1.5后加入新特性,自动装箱. 第一句代码是正常的创建对象方法,创建了一个Integer包装类对象. 而第二句中,当左边的Interger类型变量指向右边的int基本类型数据时,右边的基本数据类型会自动装箱成Integer对象,即隐式执行了new Integer(4). 再来一段代码: 1 Integer x = new

Java 9和Java 10的新特性

http://www.infoq.com/cn/news/2014/09/java9 Java 9新特性汇总 继2014年3月份Java 8发布之后,Open JDK加快了开发速度, Java 9的发布已经提上日程.预计在2016年发布Java 9,同时公布了JEP(JDK改进提议)中的前期列表.任职于Takipi 的Alex Zhitnitsky整理了Java 9中一些纳入JSR(Java规范提案)的新特性和大家一直期待但未确定的一些特性.这些特性有Jigsaw项目.新的智能编译工具.期待已久

ArcGIS 10.2新特性介绍:影像

1.新增栅格类型 新增支持三种新的栅格类型:DMCii.Pleiades 和 SPOT6. 同时,新增提供中国卫星 Raster Type 扩展下载,支持中国卫星影像数据在ArcGIS 中的管理和使用.扩展支持的卫星产品包括: HJ 1A/1B CCD raster type ZY02C HRC raster type ZY02C PMS raster type ZY3 – CRESDA ZY3 – SASMAC 对于支持的国产卫星,正射.融合.镶嵌和匀色等常用处理,通过 ArcGIS 的On-

Spark整合kafka0.10.0新特性(二)

接着Spark整合kafka0.10.0新特性(一)开始 import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LocationStrat

jdk1.5出现的新特性----&gt;增强for循环

package cn.itcast.jdk15; import java.util.HashMap;import java.util.HashSet;import java.util.Iterator;import java.util.Map;import java.util.Set; /*jdk1.5出现的新特性---->增强for循环 增强for循环的作用: 简化迭代器的书写格式.(注意:增强for循环的底层还是使用了迭代器遍历.) 增强for循环的适用范围: 如果是实现了Iterable接

Java 8中你可能没听过的10个新特性

lambda表达式,lambda表达式,还是lambda表达式.一提到Java 8就只能听到这个,但这不过是其中的一个新功能而已,Java 8还有许多新的特性——有一些功能强大的新类或者新的用法,还有一些功能则是早就应该加到Java里了. 这里我准备介绍它的10个我个人认为非常值得了解的新特性.总会有一款适合你的,开始来看下吧. default方法 这是Java语言的一个新特性,现在接口类里可以包含方法体(这就是default方法)了.这些方法会隐式的添加到实现这个接口的每个子类中. 这使得你可

开发者所需要知道的 iOS 10 SDK 新特性

转自:https://onevcat.com/2016/06/ios-10-sdk/ 写的很好啊.哈哈哈 总览 距离 iPhone 横空出世已经过去了 9 个年头,iOS 的版本号也跨入了两位数.在我们回顾过去四五年 iOS 系统的发展的时候,不免感叹变化速度之快,迭代周期之短.iOS 7 翻天覆地的全新设计,iOS 8 中 Size Classes 的出现,应用扩展,以及 Cloud Kit 的加入,iOS 9 里的分屏多任务特性等等.Apple 近年都是在 WWDC 发布新的系统和软件,然后

前端面试基础-html篇之H5新特性

h5的新特性(目前个人所了解)如下 语义化标签 表单新特性 视频(video)和音频(audio) canvas画布 svg绘图 地理定位 为鼠标提供的拖放API webworker (重点)Storage (重点)Websocket HTML语义化是什么? 语义化是指根据内容的结构化(内容语义化),选择合适的标签(代码语义化),便于开发者阅读和写出更优雅的代码的同时,让浏览器的爬虫和机器很好的解析. 为什么要语义化? 有利于SEO,有助于爬虫抓取更多的有效信息,爬虫是依赖于标签来确定上下文和各

Red Hat OpenStack 10的新特性

这是Red Hat有史以来最好的版本,同时也是第一个长生命周期版本(最长五年支持),这篇文章会介绍为什么这是你私有云最好的礼物. 由于要使用命令行,以前安装OpenStack是很繁重的工作.这个版本提供了一个新的图形界面给Director.这是我们简化OpenStack部署目标的第一步,根据前几个月的内部测试反馈,在将来的版本将会有更多优化. 新的Director图形界面 另一个通过所有测试的新特性是分布虚拟路由(DVR),这个特性虽然功能上还有一些限制,但是已经是生产级别的,同时可以通过Dir

Java基础之JDK1.8的接口新特性

在Java的JDK1.8中: 1.接口中使用default来修饰普通方法与使用static来修饰普通方法的意义用于避免子类重复实现同样的代码 1 package test; 2 /** 3 * 泛型的使用 4 * @author Administrator 5 * 6 */ 7 public class TestType { 8 9 public static void main(String[] args) { 10 Msg msg = new MsgImpl(); 11 msg.show()