Java并发编程--5.信号量和障碍器

Semaphore信号量

简介

它本质上是一个共享锁,限制访问公共资源的线程数目,它也被称为计数信号量
acquire()许可一个线程, Semaphore – 1; 没有可用的许可时,Semaphore=0 ,线程阻塞
release()释放一个线程, Semaphore + 1

示例

public class MySemaphore {
    public static void main(String[] args) {
        // 使用线程池
        ExecutorService exec = Executors.newCachedThreadPool();
        // 只允许3个线程同时访问
        final Semaphore semp = new Semaphore(3);

        // 模拟4个客户端访问
        for (int index = 0; index < 4; index++) {

            Runnable run = new Runnable() {
                public void run() {
                    try {
                        // 获取许可
                        semp.acquire();

                        System.out.println("线程"+ Thread.currentThread().getName() + "获得许可:");

                        // 模拟耗时的任务
                        for (int i = 0; i < 999999; i++);

                        // 释放许可
                        semp.release();

                        System.out.println("线程"+ Thread.currentThread().getName() + "释放许可:");
                        System.out.println("当前允许进入的任务个数:"+ semp.availablePermits());

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };

            exec.execute(run);
        }
        // 关闭线程池
        exec.shutdown();
    }
}

控制台输出:

线程pool-1-thread-1获得许可:
线程pool-1-thread-2获得许可:
线程pool-1-thread-2释放许可:
当前允许进入的任务个数:2       //总共允许3个许可, 获取两个许可, 释放一个许可, 剩余2个许可
线程pool-1-thread-1释放许可:
当前允许进入的任务个数:2      //释放一个许可, 应该打印出1, 可以看出, Semaphore并不保证线程安全
线程pool-1-thread-3获得许可:
线程pool-1-thread-3释放许可:
当前允许进入的任务个数:2
线程pool-1-thread-4获得许可:
线程pool-1-thread-4释放许可:
当前允许进入的任务个数:3

CyclicBarrier 障碍器

简介

允许一组线程互相等待,到达一个公共的障碍点, 该组任务完成后, 再去完成另外一个任务
在释放等待线程后可以重用,它是循环的barrier

示例

public class MyCyclicBarrier {
    public static void main(String[] args) {
        //创建CyclicBarrier对象, 并设置执行完一组5个线程的并发任务后,再执行MainTask任务
        CyclicBarrier cb = new CyclicBarrier(5, new MainTask());  

        new SubTask("A", cb).start();
        new SubTask("B", cb).start();
        new SubTask("C", cb).start();
        new SubTask("D", cb).start();
        new SubTask("E", cb).start();
}
}   

/** 最后执行的任务 */
class MainTask implements Runnable {
    public void run() {
        System.out.println("......终于要执行最后的任务了......");
    }
}

/** 一组并发任务 */
class SubTask extends Thread {
    private String name;
    private CyclicBarrier cb;

    SubTask(String name, CyclicBarrier cb) {
        this.name = name;
        this.cb = cb;
    }

    public void run() {
        System.out.println("[并发任务" + name + "]  开始执行");

        for (int i = 0; i < 999999; i++); // 模拟耗时的任务

        System.out.println("[并发任务" + name + "]  执行完毕,通知障碍器");
        try {
            // 每执行完一项任务就通知障碍器
            cb.await();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

控制台输出:

[并发任务A]  开始执行
[并发任务D]  开始执行
[并发任务C]  开始执行
[并发任务B]  开始执行
[并发任务E]  开始执行
[并发任务B]  执行完毕,通知障碍器
[并发任务E]  执行完毕,通知障碍器
[并发任务D]  执行完毕,通知障碍器
[并发任务A]  执行完毕,通知障碍器
[并发任务C]  执行完毕,通知障碍器
......终于要执行最后的任务了......     //可以看出执行一组任务后,在执行这个线程任务

CountDownLatch 障碍器 

简介

允许1或N个线程等待其他线程完成后在执行
调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置

示例

public class MyCountDownLatch {
    public static void main(String[] args) {
        //启动会议室线程,等待与会人员参加会议
        Conference conference = new Conference(3);
        new Thread(conference).start();

        //参会者线程
        for(int i = 0 ; i < 3 ; i++){
            Participater participater = new Participater("" + i , conference);
            Thread thread = new Thread(participater);
            thread.start();
        }
    }
}

/** 会场类 */
class Conference implements Runnable{
    private final CountDownLatch countDown;//障碍器

    public Conference(int count){
        countDown = new CountDownLatch(count);
    }

    /** 与会人员到达 */
    public void arrive(String name){
        System.out.println(name + "到达.....");

        //到达一个,锁计数器 - 1, 在计数到达0之前会一直阻塞
        countDown.countDown();

        System.out.println("还有 " + countDown.getCount() + "位没有到达...");
    }

    @Override
    public void run() {
        System.out.println("准备开会,参加会议人员总数为:" + countDown.getCount());

        //调用await(),等待所有的与会人员到达
        try {
            countDown.await();
        } catch (InterruptedException e) {
        }

        System.out.println("所有人员已经到达,会议开始.....");
    }
}

/** 参会者类*/
class Participater implements Runnable{
    private String name;
    private Conference conference;

    public Participater(String name,Conference conference){
        this.name = name;
        this.conference = conference;
    }

    @Override
    public void run() {
        conference.arrive(name);
    }
}

控制台输出:

准备开会,参加会议人员总数为:3
2到达.....
还有 2位没有到达...
0到达.....
还有 1位没有到达...
1到达.....
所有人员已经到达,会议开始.....
还有 0位没有到达...

Phaser

简介

推荐阅读: http://whitesock.iteye.com/blog/1135457

http://www.2cto.com/kf/201611/560952.html

任务数目是可变的: 可以在任何时间注册新的参与者;并且在抵达屏障点时,可以注销已经注册的参与者

phase和party

phase就是阶段,初值为0:

当所有的线程执行完本轮任务,同时开始下一轮任务时,意味着当前阶段已结束,
进入到下一阶段,phase的值自动加1

party就是线程:  party=4就意味着Phaser对象当前管理着4个线程

boolean onAdvance(int phase, int registeredParties) :

1.当此方法返回true时,意味着Phaser被终止, 若此方法返回值为 phase>=3,其含义为当整个线程执行了4个阶段后,程序终止

2.当每一个阶段执行完毕,此方法会被自动调用 ,此方法内的代码会在每个阶段执行完毕时执行

示例: 可变数目的任务

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;

/**
 *可变数目: 动态注册和取消
 *
 *示例:
 *    在旅游过程中,有可能很凑巧遇到几个朋友,
 *    然后他们听说你们在旅游,所以想要加入一起继续接下来的旅游.
 *    也有可能,在旅游过程中,突然其中有某几个人临时有事,想退出这次旅游了
 */
public class MyPhaser_5 {
    public static void main(String[] args) {
        final int num = 3;
        Phaser phaser = new Phaser(num){
            /**
             * 如果该方法返回true,那么Phaser会被终止, 默认实现是在注册任务数为0时返回true
             * phase : 阶段数
             * registeredParties : 注册的线程数
             */
             @Override
             protected boolean onAdvance(int phase, int registeredParties) {
                 System.out.println("" + getArrivedParties() + "个人都到齐了,第" + (phase + 1) + "次集合 \n");
                 return phase >= num;
             }
        };

        new Thread(new TourismRunnable(phaser),"小明").start();
        new Thread(new TourismRunnable(phaser),"小刚").start();
        new Thread(new TourismRunnable(phaser),"小红").start();
    }
}

/** 旅行线程 */
class TourismRunnable implements Runnable{
    Phaser phaser;
    /**
     * 每个线程保存一个朋友计数器,小红第一次遇到一个朋友,取名`小红的朋友0号`,第二次遇到一个朋友,取名为`小红的朋友1号`
     */
    AtomicInteger frientCount = new AtomicInteger();

    public TourismRunnable(Phaser phaser) {
        this.phaser = phaser;
    }

    @Override
    public void run() {
         switch (phaser.getPhase()){
             case 0:if(!goToPoint("出发点")) break;
             case 1:if(!goToPoint("旅游景点")) break;
             case 2:if(!goToPoint("酒店")) break;
         }
    }

    /**
     * @param point 目的地
     * @return 返回true,说明还要继续旅游,否则就临时退出了
     */
    private boolean goToPoint(String point){
        try {
            if(!randomEvent()){
                //取消注册
                phaser.arriveAndDeregister();
                return false;
            }
            System.out.println(Thread.currentThread().getName() + "到了" + point);

            //阻塞
            phaser.arriveAndAwaitAdvance();
            return true;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    /**
     * 随机事件: 遇到新朋友一起旅游 或者 中途退出旅游
     * @return 返回true,说明还要继续旅游,否则就临时退出了
     */
    private boolean randomEvent() {
        int random = new Random().nextInt(100);
        String name = Thread.currentThread().getName();

        if (random < 10){
            int friendNum =  1;
            System.out.println("=====================" + name + ":遇到了"+friendNum+"个朋友,要一起去旅游");

            new Thread(new TourismRunnable(phaser), name + "的朋友" + frientCount.incrementAndGet() + "号").start();
            //注册
            phaser.bulkRegister(friendNum);

        }else if(random > 80){
            System.out.println("=====================" + name + ":突然有事要离开一下,不和他们继续旅游了");
            return false;
        }

        return true;
    }
}
时间: 2024-10-29 19:05:33

Java并发编程--5.信号量和障碍器的相关文章

Java并发编程之信号量

一.概述 技术信号量用来控制能够同时访问某特定资源的活动的数量,或者同时执行某一给定操作的数据.计数信号量可以用来实现资源池或者给一个容器限定边界. 信号量维护了一个许可集,许可的初始量通过构造函数传递给Semaphore.活动能够获取许可,并在使用之后释放许可,如果没有可用的许可,acquire方法会被阻塞,直到有可用的为止.每个release方法添加一个许可,从而可能释放一个正在阻塞的获取者. 计算信号量的一种退化形式是二元信号量:一个计数初始值为1的Semaphore,二元信号量可用作互斥

【Java并发编程】并发编程大合集-值得收藏

http://blog.csdn.net/ns_code/article/details/17539599这个博主的关于java并发编程系列很不错,值得收藏. 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Ja

《Java并发编程实战》要点笔记及java.util.concurrent 的结构介绍

买了<java并发编程实战>这本书,看了好几遍都不是很懂,这个还是要在实战中找取其中的要点的,后面看到一篇文章笔记做的很不错分享给大家!! 原文地址:http://blog.csdn.net/cdl2008sky/article/details/26377433 Subsections  1.线程安全(Thread safety) 2.锁(lock) 3.共享对象 4.对象组合 5.基础构建模块 6.任务执行 7.取消和关闭 8.线程池的使用 9.性能与可伸缩性 10.并发程序的测试 11.显

【Java并发编程】并发编程大合集

转载自:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容按照由浅入深的学习顺序总结如下,点击相应的标题即可跳转到对应的文章    [Java并发编程]实现多线程的两种方法    [Java并发编程]线程的中断    [Java并发编程]正确挂起.恢复.终止线程    [Java并发编程]守护线程和线程阻塞    [Java并发编程]Volatile关键字(上)

《Java并发编程实战》读书笔记

Subsections 线程安全(Thread safety) 锁(lock) 共享对象 对象组合 基础构建模块 任务执行 取消和关闭 线程池的使用 性能与可伸缩性 并发程序的测试 显示锁 原子变量和非阻塞同步机制 一.线程安全(Thread safety) 无论何时,只要多于一个线程访问给定的状态变量.而且其中某个线程会写入该变量,此时必须使用同步来协助线程对该变量的访问. 线程安全是指多个线程在访问一个类时,如果不需要额外的同步,这个类的行为仍然是正确的. 线程安全的实例: (1).一个无状

Java并发编程笔记 并发概览

并发概览 >>同步 如何同步多个线程对共享资源的访问是多线程编程中最基本的问题之一.当多个线程并发访问共享数据时会出现数据处于计算中间状态或者不一致的问题,从而影响到程序的正确运行.我们通常把这种情况叫做竞争条件(race condition),把并发访问共享数据的代码叫做关键区域(critical section).同步就是使得多个线程顺序进入关键区域从而避免竞争条件的发生. >>线程安全性 编写线程安全的代码的核心是要对状态访问操作进行管理,尤其是对共享的和可变的状态访问. 线

Java并发编程深入学习

基本概念 在实践中,为了更好的利用资源提高系统整体的吞吐量,会选择并发编程.但由于上下文切换和死锁等问题,并发编程不一定能提高性能,因此如何合理的进行并发编程时本文的重点,接下来介绍关于锁最基本的一些知识(选学). volatile:轻量,保证共享变量的可见性,使得多个线程对共享变量的变更都能及时获取到.其包括两个子过程,将当前处理器缓存行的数据写回到系统内存,之后会使其他CPU里缓存了该内存地址的数据无效. synchronized:相对重量,其包含3种形式,针对普通同步方法,锁是当前实例对象

《Java并发编程实战》/童云兰译【PDF】下载

<Java并发编程实战>/童云兰译[PDF]下载链接: https://u253469.pipipan.com/fs/253469-230062521 内容简介 本书深入浅出地介绍了Java线程和并发,是一本完美的Java并发参考手册.书中从并发性和线程安全性的基本概念出发,介绍了如何使用类库提供的基本并发构建块,用于避免并发危险.构造线程安全的类及验证线程安全的规则,如何将小的线程安全类组合成更大的线程安全类,如何利用线程来提高并发应用程序的吞吐量,如何识别可并行执行的任务,如何提高单线程子

Java 并发编程之任务取消(九)

Jvm关闭 jvm可正常关闭也可强行关闭,正常关闭有多种触发方式: 当最后一个正常(非守护,下面会讲到什么是守护线程)线程结束时 当调用system.exit时,或者通过其他特定于平台的方法关闭时(例如发送了SIGINT信号或键入Ctrl-c) 通过其他特定平台的方法关闭jvm,调用Runtime.halt或者在操作系统当中杀死JVM进程(例如发送sigkill)来强行关闭jvm. 关闭钩子 在正常关闭中,jvm首先调用所有已注册的关闭钩子,关闭钩子是指通过 Runtime.addShutdow