Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)

在JDK的并发包里提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类提供了一种并发流程控制的手段,Exchanger工具类则提供了在线程间交换数据的一种手段。本章会配合一些应用场景来介绍如何使用这些工具类。

CountDownLatch

CountDownLatch允许一个或多个线程等待其他线程完成操作。
假如有这样一个需求:我们需要解析一个Excel里多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成(或者汇总结果)。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法是使用join()方法。

import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

public class JoinCountDownLatchTest {
    private static Random sr=new Random(47);
    private static AtomicInteger result=new AtomicInteger(0);
    private static int threadCount=10;
    private static class Parser implements Runnable{
        String name;
        public Parser(String name){
            this.name=name;
        }
        @Override
        public void run() {
            int sum=0;
            int seed=Math.abs(sr.nextInt()) ;
            Random r=new Random(47);
            for(int i=0;i<100;i++){
                sum+=r.nextInt(seed);
            }
            result.addAndGet(sum);
            System.out.println(name+"线程的解析结果:"+sum);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads=new Thread[threadCount];
        for(int i=0;i<threadCount;i++){
            threads[i]=new Thread(new Parser("Parser-"+i));
        }
        for(int i=0;i<threadCount;i++){
            threads[i].start();
        }
        for(int i=0;i<threadCount;i++){
            threads[i].join();
        }
        System.out.println("所有线程解析结束!");
        System.out.println("所有线程的解析结果:"+result);
    }
}

输出:

Parser-1线程的解析结果:-2013585201
Parser-0线程的解析结果:1336321192
Parser-2线程的解析结果:908136818
Parser-5线程的解析结果:-1675827227
Parser-3线程的解析结果:1638121055
Parser-4线程的解析结果:1513365118
Parser-6线程的解析结果:489607354
Parser-8线程的解析结果:1513365118
Parser-7线程的解析结果:-1191966831
Parser-9线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果:1605138237

join用于让当前执行线程等待join线程执行结束。其实现原理是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。

在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功能更多。

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

public class CountDownLatchTest {
    private static Random sr=new Random(47);
    private static AtomicInteger result=new AtomicInteger(0);
    private static int threadCount=10;//线程数量
    private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch
    private static class Parser implements Runnable{
        String name;
        public Parser(String name){
            this.name=name;
        }
        @Override
        public void run() {
            int sum=0;
            int seed=Math.abs(sr.nextInt()) ;
            Random r=new Random(47);
            for(int i=0;i<100;i++){
                sum+=r.nextInt(seed);
            }
            result.addAndGet(sum);
            System.out.println(name+"线程的解析结果:"+sum);
            countDown.countDown();//注意这里
        }
    }
    public static void main(String[] args) throws InterruptedException {
        Thread[] threads=new Thread[threadCount];
        for(int i=0;i<threadCount;i++){
            threads[i]=new Thread(new Parser("Parser-"+i));
        }
        for(int i=0;i<threadCount;i++){
            threads[i].start();
        }
        /*
        for(int i=0;i<threadCount;i++){
            threads[i].join();
        }*/
        countDown.await();//将join改为使用CountDownLatch
        System.out.println("所有线程解析结束!");
        System.out.println("所有线程的解析结果:"+result);
    }
}

输出:

Parser-0线程的解析结果:1336321192
Parser-1线程的解析结果:-2013585201
Parser-2线程的解析结果:-1675827227
Parser-4线程的解析结果:1638121055
Parser-3线程的解析结果:908136818
Parser-5线程的解析结果:1513365118
Parser-7线程的解析结果:489607354
Parser-6线程的解析结果:1513365118
Parser-8线程的解析结果:-1191966831
Parser-9线程的解析结果:-912399159
所有线程解析结束!
所有线程的解析结果:1605138237

CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。

当我们调用CountDownLatch的countDown方法时,N就会减1,CountDownLatch的await方法会阻塞当前线程,直到N变成零。由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。
如果有某个解析sheet的线程处理得比较慢,我们不可能让主线程一直等待,所以可以使用另外一个带指定时间的await方法——await(long time,TimeUnit unit),这个方法等待特定时间后,就会不再阻塞当前线程。join也有类似的方法。
注意:计数器必须大于等于0,只是等于0时候,计数器就是零,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值。一个线程调用countDown方法happen-before,另外一个线程调用await方法。

CyclicBarrier

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会

开门,所有被屏障拦截的线程才会继续运行。当所有等待线程都被释放以后,CyclicBarrier可以被重用。CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

public CyclicBarrier(int parties, Runnable barrierAction) {
}

public CyclicBarrier(int parties) {
}

参数parties指让多少个线程或者任务等待至barrier状态;参数barrierAction为当这些线程都达到barrier状态时会执行的内容

public class Test {
    public static void main(String[] args) {
        int N = 4;
        CyclicBarrier barrier  = new CyclicBarrier(N);
        for(int i=0;i<N;i++)
            new Writer(barrier).start();
    }
    static class Writer extends Thread{
        private CyclicBarrier cyclicBarrier;
        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
            try {
                Thread.sleep(5000);      //以睡眠来模拟写入数据操作
                System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }catch(BrokenBarrierException e){
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }
}

执行结果:

线程Thread-0正在写入数据...
线程Thread-3正在写入数据...
线程Thread-2正在写入数据...
线程Thread-1正在写入数据...
线程Thread-2写入数据完毕,等待其他线程写入完毕
线程Thread-0写入数据完毕,等待其他线程写入完毕
线程Thread-3写入数据完毕,等待其他线程写入完毕
线程Thread-1写入数据完毕,等待其他线程写入完毕
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...
所有线程写入完毕,继续处理其他任务...

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

Semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore可以控同时访问的线程个数,通过acquire()获取一个许可,如果没有就等待,而 release() 释放一个许可。

假若一个工厂有5台机器,但是有8个工人,一台机器同时只能被一个工人使用,只有使用完了,其他工人才能继续使用。那么我们就可以通过Semaphore来实现:

public class Test {
    public static void main(String[] args) {
        int N = 8;            //工人数
        Semaphore semaphore = new Semaphore(5); //机器数目
        for(int i=0;i<N;i++)
            new Worker(i,semaphore).start();
    }

    static class Worker extends Thread{
        private int num;
        private Semaphore semaphore;
        public Worker(int num,Semaphore semaphore){
            this.num = num;
            this.semaphore = semaphore;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println("工人"+this.num+"占用一个机器在生产...");
                Thread.sleep(2000);
                System.out.println("工人"+this.num+"释放出机器");
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

执行结果:

工人0占用一个机器在生产...
工人1占用一个机器在生产...
工人2占用一个机器在生产...
工人4占用一个机器在生产...
工人5占用一个机器在生产...
工人0释放出机器
工人2释放出机器
工人3占用一个机器在生产...
工人7占用一个机器在生产...
工人4释放出机器
工人5释放出机器
工人1释放出机器
工人6占用一个机器在生产...
工人3释放出机器
工人7释放出机器
工人6释放出机器

Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
下面来看一下Exchanger的应用场景。
1、Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。

2、Exchanger也可以用于校对工作,比如我们需要将纸制银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗两人进行录入,录入到Excel之后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否录入一致.

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {

    private static final Exchanger<String> exgr = new Exchanger<String>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String A = "银行流水100";// A录入银行流水数据
                    String B=exgr.exchange(A);
                    System.out.println("A的视角:A和B数据是否一致:" + A.equals(B) +
",A录入的是:" + A + ",B录入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String B = "银行流水200";// B录入银行流水数据
                    String A = exgr.exchange(B);
                    System.out.println("B的视角:A和B数据是否一致:" + A.equals(B) +
",A录入的是:" + A + ",B录入是:" + B);
                } catch (InterruptedException e) {
                }
            }
        });
        threadPool.shutdown();
    }
}

输出:

B的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200
A的视角:A和B数据是否一致:false,A录入的是:银行流水100,B录入是:银行流水200

如果两个线程有一个没有执行exchange()方法,则会一直等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)设置最大等待时长。

参考:

Java并发工具类详解

Java并发编程-原子类及并发工具类

原文地址:https://www.cnblogs.com/Jason-Xiang/p/8449648.html

时间: 2024-11-10 01:14:46

Java学习笔记—多线程(并发工具类,java.util.concurrent.atomic包)的相关文章

Java学习笔记七——数组工具类Arrays

数组工具类Arrays Java提供的Arrays类里包含的一些static修饰的方法可以直接操作数组.若将里面的方法用熟的话,那开发效率会大大提高.下面介绍其中的方法. List<T> asList(T... a) 作用:将指定数组或数组元素,转换成固定大小的List. 用法: String[] strArr = { "aaa", "bbb", "vvv" }; //用法1:参数是数组引用 List<String> li

java学习笔记之DBUtils工具类

DBUtils工具类 一.介绍 DBUtils是Apache组织开源的数据库工具类. 二.使用步骤 ①.创建QueryRunner对象 ②.调用update()方法或者query()方法执行sql语句 三.构造方法及静态方法 QueryRunner类 1.构造方法 ①.无参构造 QueryRunner qr =new  QueryRunner(); 使用无参构造的时候,调用update方法和query方法时就需要使用带Connection 类型参数的重载形式 ②.有参构造 QueryRunner

Java学习笔记—多线程(java.util.concurrent并发包概括,转载)

一.描述线程的类:Runable和Thread都属于java.lang包 二.内置锁synchronized属于jvm关键字,内置条件队列操作接口Object.wait()/notify()/notifyAll()属于java.lang包 二.提供内存可见性和防止指令重排的volatile属于jvm关键字 四.而java.util.concurrent包(J.U.C)中包含的是java并发编程中有用的一些工具类,包括几个部分: 1.locks部分:包含在java.util.concurrent.

java学习笔记07--日期操作类

java学习笔记07--日期操作类 一.Date类 在java.util包中定义了Date类,Date类本身使用非常简单,直接输出其实例化对象即可. [java] view plaincopy public class T { public static void main(String[] args) { Date date  = new Date(); System.out.println("当前日期:"+date); //当前日期:Thu May 16 23:00:57 CST 

Java学习笔记_23_List接口实现类

23.List接口实现类: List接口继承了Collection接口,它是一个允许存在重复项的有序集合. 1>实现类ArrayList: ArrayList类支持可随需要而增长的动态数组.数组列表以一个原大小被创建,当超过了它的大小, 类集自动增大,当对象被删除后,数组就可以缩小. 优点:ArrayList类对于使用索引取出元素用较高的效率,他可以用索引快速定位对象. 缺点:ArrayList类对于元素的删除或插入速度较慢. 构造方法: · ArrayList(): 构造一个初始容量为10的空

JAVA学习笔记 -- 多线程之共享资源

在多线程程序运行过程中,可能会涉及到两个或者多个线程试图同时访问同一个资源.为了防止这种情况的发生,必须在线程使用共享资源时给资源"上锁",以阻挡其它线程的访问.而这种机制也常常被称为互斥量,本文主要介绍它的两种方式synchronized和Lock . 1.synchronized 当任务要执行被synchronized关键字保护的代码片段的时候,它会检查锁是否可用,然后获取锁,执行代码,释放锁.synchronized也有两种用法: A.synchronized方法 import

Java学习笔记—多线程(原子类,java.util.concurrent.atomic包,转载)

原子类 Java从JDK 1.5开始提供了java.util.concurrent.atomic包(以下简称Atomic包),这个包中 的原子操作类提供了一种用法简单.性能高效.线程安全地更新一个变量的方式. 因为变量的类型有很多种,所以在Atomic包里一共提供了13个类,属于4种类型的原子更 新方式,分别是原子更新基本类型.原子更新数组.原子更新引用和原子更新属性(字段). Atomic包里的类基本都是使用Unsafe实现的包装类 java.util.concurrent.atomic中的类

《java.util.concurrent 包源码阅读》02 关于java.util.concurrent.atomic包

Aomic数据类型有四种类型:AomicBoolean, AomicInteger, AomicLong, 和AomicReferrence(针对Object的)以及它们的数组类型, 还有一个特殊的AomicStampedReferrence,它不是AomicReferrence的子类,而是利用AomicReferrence实现的一个储存引用和Integer组的扩展类 首先,所有原子操作都是依赖于sun.misc.Unsafe这个类,这个类底层是由C++实现的,利用指针来实现数据操作 关于CAS

关于java.util.concurrent.atomic.*包下面的线程问题

众所周知,jdk的java.util.concurrent.atomic包下得原子类都是线程安全的,这是正确的,但是如果如果这样设置变量,会出现什么问题呢: AtomicReference<Integer[]> ar = new AtomicReference<Integer[]>(); 可能结果不是你所想的是同步了,为什么会出现这种情况呢? 其实有一个很好的例子可以解释这个问题,大家都知道i++,这个自增的步骤就是读取旧值-->给这个值加1-->更新这个值散步组成,因