Java编程的逻辑 (81) - 并发同步协作工具

我们在67节和68节实现了线程的一些基本协作机制,那是利用基本的wait/notify实现的,我们提到,Java并发包中有一些专门的同步工具类,本节,我们就来探讨它们。

我们要探讨的工具类包括:

  • 读写锁ReentrantReadWriteLock
  • 信号量Semaphore
  • 倒计时门栓CountDownLatch
  • 循环栅栏CyclicBarrier

与71节介绍的显示锁和72节介绍的显示条件类似,它们也都是基于AQS实现的,AQS可参看71节。在一些特定的同步协作场景中,相比使用最基本的wait/notify,显示锁/条件,它们更为方便,效率更高。下面,我们就来探讨它们的基本概念、用法、用途和基本原理。

读写锁ReentrantReadWriteLock

之前章节我们介绍了两种锁,66节介绍了synchronized,71节介绍了显示锁ReentrantLock。对于同一受保护对象的访问,无论是读还是写,它们都要求获得相同的锁。在一些场景中,这是没有必要的,多个线程的读操作完全可以并行,在读多写少的场景中,让读操作并行可以明显提高性能。

怎么让读操作能够并行,又不影响一致性呢?答案是使用读写锁。在Java并发包中,接口ReadWriteLock表示读写锁,主要实现类是可重入读写锁ReentrantReadWriteLock。

ReadWriteLock的定义为:

public interface ReadWriteLock {
    Lock readLock();
    Lock writeLock();
}

通过一个ReadWriteLock产生两个锁,一个读锁,一个写锁。读操作使用读锁,写操作使用写锁。

需要注意的是,只有"读-读"操作是可以并行的,"读-写"和"写-写"都不可以。只有一个线程可以进行写操作,在获取写锁时,只有没有任何线程持有任何锁才可以获取到,在持有写锁时,其他任何线程都获取不到任何锁。在没有其他线程持有写锁的情况下,多个线程可以获取和持有读锁。

ReentrantReadWriteLock是可重入的读写锁,它有两个构造方法,如下所示:

public ReentrantLock()
public ReentrantLock(boolean fair)

fire表示是否公平,不传递的话是false,含义与显式锁一节介绍的类似,就不赘述了。

我们看个简单的例子,使用ReentrantReadWriteLock实现一个缓存类MyCache,代码如下:

public class MyCache {
    private Map<String, Object> map = new HashMap<>();
    private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private Lock readLock = readWriteLock.readLock();
    private Lock writeLock = readWriteLock.writeLock();

    public Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

代码比较简单,就不赘述了。

读写锁是怎么实现的呢?读锁和写锁看上去是两个锁,它们是怎么协调的?具体实现比较复杂,我们简述下其思路。

内部,它们使用同一个整数变量表示锁的状态,16位给读锁用,16位给写锁用,使用一个变量便于进行CAS操作,锁的等待队列其实也只有一个。

写锁的获取,就是确保当前没有其他线程持有任何锁,否则就等待。写锁释放后,也就是将等待队列中的第一个线程唤醒,唤醒的可能是等待读锁的,也可能是等待写锁的。

读锁的获取不太一样,首先,只要写锁没有被持有,就可以获取到读锁,此外,在获取到读锁后,它会检查等待队列,逐个唤醒最前面的等待读锁的线程,直到第一个等待写锁的线程。如果有其他线程持有写锁,获取读锁会等待。读锁释放后,检查读锁和写锁数是否都变为了0,如果是,唤醒等待队列中的下一个线程。

信号量Semaphore

之前介绍的锁都是限制只有一个线程可以同时访问一个资源。现实中,资源往往有多个,但每个同时只能被一个线程访问,比如,饭店的饭桌、火车上的卫生间。有的单个资源即使可以被并发访问,但并发访问数多了可能影响性能,所以希望限制并发访问的线程数。还有的情况,与软件的授权和计费有关,对不同等级的账户,限制不同的最大并发访问数。

信号量类Semaphore就是用来解决这类问题的,它可以限制对资源的并发访问数,它有两个构造方法:

public Semaphore(int permits)
public Semaphore(int permits, boolean fair)

fire表示公平,含义与之前介绍的是类似的,permits表示许可数量。

Semaphore的方法与锁是类似的,主要的方法有两类,获取许可和释放许可,主要方法有:

//阻塞获取许可
public void acquire() throws InterruptedException
//阻塞获取许可,不响应中断
public void acquireUninterruptibly()
//批量获取多个许可
public void acquire(int permits) throws InterruptedException
public void acquireUninterruptibly(int permits)
//尝试获取
public boolean tryAcquire()
//限定等待时间获取
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException
//释放许可
public void release()

我们看个简单的示例,限制并发访问的用户数不超过100,代码如下:

public class AccessControlService {
    public static class ConcurrentLimitException extends RuntimeException {
        private static final long serialVersionUID = 1L;
    }

    private static final int MAX_PERMITS = 100;
    private Semaphore permits = new Semaphore(MAX_PERMITS, true);

    public boolean login(String name, String password) {
        if (!permits.tryAcquire()) {
            // 同时登录用户数超过限制
            throw new ConcurrentLimitException();
        }
        // ..其他验证
        return true;
    }

    public void logout(String name) {
        permits.release();
    }
}

代码比较简单,就不赘述了。

需要说明的是,如果我们将permits的值设为1,你可能会认为它就变成了一般的锁,不过,它与一般的锁是不同的。一般锁只能由持有锁的线程释放,而Semaphore表示的只是一个许可数,任意线程都可以调用其release方法。主要的锁实现类ReentrantLock是可重入的,而Semaphore不是,每一次的acquire调用都会消耗一个许可,比如,看下面代码段:

Semaphore permits = new Semaphore(1);
permits.acquire();
permits.acquire();
System.out.println("acquired");

程序会阻塞在第二个acquire调用,永远都不会输出"acquired"。

信号量的基本原理比较简单,也是基于AQS实现的,permits表示共享的锁个数,acquire方法就是检查锁个数是否大于0,大于则减一,获取成功,否则就等待,release就是将锁个数加一,唤醒第一个等待的线程。

倒计时门栓CountDownLatch

我们在68节使用wait/notify实现了一个简单的门栓MyLatch,我们提到,Java并发包中已经提供了类似工具,就是CountDownLatch。它的大概含义是指,它相当于是一个门栓,一开始是关闭的,所有希望通过该门的线程都需要等待,然后开始倒计时,倒计时变为0后,门栓打开,等待的所有线程都可以通过,它是一次性的,打开后就不能再关上了。

CountDownLatch里有一个计数,这个计数通过构造方法进行传递:

public CountDownLatch(int count)

多个线程可以基于这个计数进行协作,它的主要方法有:

public void await() throws InterruptedException
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
public void countDown() 

await()检查计数是否为0,如果大于0,就等待,await()可以被中断,也可以设置最长等待时间。countDown检查计数,如果已经为0,直接返回,否则减少计数,如果新的计数变为0,则唤醒所有等待的线程。

在68节,我们介绍了门栓的两种应用场景,一种是同时开始,另一种是主从协作。它们都有两类线程,互相需要同步,我们使用CountDownLatch重新演示下。

在同时开始场景中,运行员线程等待主裁判线程发出开始指令的信号,一旦发出后,所有运动员线程同时开始,计数初始为1,运动员线程调用await,主线程调用countDown,示例代码如下:

public class RacerWithCountDownLatch {
    static class Racer extends Thread {
        CountDownLatch latch;

        public Racer(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                this.latch.await();
                System.out.println(getName()
                        + " start run "+System.currentTimeMillis());
            } catch (InterruptedException e) {
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int num = 10;
        CountDownLatch latch = new CountDownLatch(1);
        Thread[] racers = new Thread[num];
        for (int i = 0; i < num; i++) {
            racers[i] = new Racer(latch);
            racers[i].start();
        }
        Thread.sleep(1000);
        latch.countDown();
    }
}

代码比较简单,就不赘述了。在主从协作模式中,主线程依赖工作线程的结果,需要等待工作线程结束,这时,计数初始值为工作线程的个数,工作线程结束后调用countDown,主线程调用await进行等待,示例代码如下:

public class MasterWorkerDemo {
    static class Worker extends Thread {
        CountDownLatch latch;

        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }

        @Override
        public void run() {
            try {
                // simulate working on task
                Thread.sleep((int) (Math.random() * 1000));

                // simulate exception
                if (Math.random() < 0.02) {
                    throw new RuntimeException("bad luck");
                }
            } catch (InterruptedException e) {
            } finally {
                this.latch.countDown();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int workerNum = 100;
        CountDownLatch latch = new CountDownLatch(workerNum);
        Worker[] workers = new Worker[workerNum];
        for (int i = 0; i < workerNum; i++) {
            workers[i] = new Worker(latch);
            workers[i].start();
        }
        latch.await();
        System.out.println("collect worker results");
    }
}

需要强调的是,在这里,countDown的调用应该放到finally语句中,确保在工作线程发生异常的情况下也会被调用,使主线程能够从await调用中返回。

循环栅栏CyclicBarrier

我们在68节使用wait/notify实现了一个简单的集合点AssemblePoint,我们提到,Java并发包中已经提供了类似工具,就是CyclicBarrier。它的大概含义是指,它相当于是一个栅栏,所有线程在到达该栅栏后都需要等待其他线程,等所有线程都到达后再一起通过,它是循环的,可以用作重复的同步。

CyclicBarrier特别适用于并行迭代计算,每个线程负责一部分计算,然后在栅栏处等待其他线程完成,所有线程到齐后,交换数据和计算结果,再进行下一次迭代。

与CountDownLatch类似,它也有一个数字,但表示的是参与的线程个数,这个数字通过构造方法进行传递:

public CyclicBarrier(int parties)

它还有一个构造方法,接受一个Runnable参数,如下所示:

public CyclicBarrier(int parties, Runnable barrierAction)

这个参数表示栅栏动作,当所有线程到达栅栏后,在所有线程执行下一步动作前,运行参数中的动作,这个动作由最后一个到达栅栏的线程执行。

CyclicBarrier的主要方法就是await:

public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

await在等待其他线程到达栅栏,调用await后,表示自己已经到达,如果自己是最后一个到达的,就执行可选的命令,执行后,唤醒所有等待的线程,然后重置内部的同步计数,以循环使用。

await可以被中断,可以限定最长等待时间,中断或超时后会抛出异常。需要说明的是异常BrokenBarrierException,它表示栅栏被破坏了,什么意思呢?在CyclicBarrier中,参与的线程是互相影响的,只要其中一个线程在调用await时被中断了,或者超时了,栅栏就会被破坏,此外,如果栅栏动作抛出了异常,栅栏也会被破坏,被破坏后,所有在调用await的线程就会退出,抛出BrokenBarrierException。

我们看一个简单的例子,多个游客线程分别在集合点A和B同步:

public class CyclicBarrierDemo {
    static class Tourist extends Thread {
        CyclicBarrier barrier;

        public Tourist(CyclicBarrier barrier) {
            this.barrier = barrier;
        }

        @Override
        public void run() {
            try {
                // 模拟先各自独立运行
                Thread.sleep((int) (Math.random() * 1000));

                // 集合点A
                barrier.await();

                System.out.println(this.getName() + " arrived A "
                        + System.currentTimeMillis());

                // 集合后模拟再各自独立运行
                Thread.sleep((int) (Math.random() * 1000));

                // 集合点B
                barrier.await();
                System.out.println(this.getName() + " arrived B "
                        + System.currentTimeMillis());
            } catch (InterruptedException e) {
            } catch (BrokenBarrierException e) {
            }
        }
    }

    public static void main(String[] args) {
        int num = 3;
        Tourist[] threads = new Tourist[num];
        CyclicBarrier barrier = new CyclicBarrier(num, new Runnable() {

            @Override
            public void run() {
                System.out.println("all arrived " + System.currentTimeMillis()
                        + " executed by " + Thread.currentThread().getName());
            }
        });
        for (int i = 0; i < num; i++) {
            threads[i] = new Tourist(barrier);
            threads[i].start();
        }
    }
}

在我的电脑上的一次输出为:

all arrived 1490053578552 executed by Thread-1
Thread-1 arrived A 1490053578555
Thread-2 arrived A 1490053578555
Thread-0 arrived A 1490053578555
all arrived 1490053578889 executed by Thread-0
Thread-0 arrived B 1490053578890
Thread-2 arrived B 1490053578890
Thread-1 arrived B 1490053578890

多个线程到达A和B的时间是一样的,使用CyclicBarrier,达到了重复同步的目的。

CyclicBarrier与CountDownLatch可能容易混淆,我们强调下其区别:

  • CountDownLatch的参与线程是有不同角色的,有的负责倒计时,有的在等待倒计时变为0,负责倒计时和等待倒计时的线程都可以有多个,它用于不同角色线程间的同步。
  • CyclicBarrier的参与线程角色是一样的,用于同一角色线程间的协调一致。
  • CountDownLatch是一次性的,而CyclicBarrier是可以重复利用的。

小结

本节介绍了Java并发包中的一些同步协作工具:

  • 在读多写少的场景中使用ReentrantReadWriteLock替代ReentrantLock,以提高性能
  • 使用Semaphore限制对资源的并发访问数
  • 使用CountDownLatch实现不同角色线程间的同步
  • 使用CyclicBarrier实现同一角色线程间的协调一致

实际中,应该优先使用这些工具,而不是手工用wait/notify或者显示锁/条件同步。

下一节,我们来探讨一个特殊的概念,线程局部变量ThreadLocal,它是什么呢?

原文地址:https://www.cnblogs.com/ivy-xu/p/12375276.html

时间: 2024-10-06 14:26:07

Java编程的逻辑 (81) - 并发同步协作工具的相关文章

Java编程的逻辑 (38) - 剖析ArrayList

从本节开始,我们探讨Java中的容器类,所谓容器,顾名思义就是容纳其他数据的,计算机课程中有一门课叫数据结构,可以粗略对应于Java中的容器类,我们不会介绍所有数据结构的内容,但会介绍Java中的主要实现,并分析其基本原理和主要实现代码. 前几节在介绍泛型的时候,我们自己实现了一个简单的动态数组容器类DynaArray,本节,我们介绍Java中真正的动态数组容器类ArrayList. 我们先来看它的基本用法. 基本用法 新建ArrayList ArrayList是一个泛型容器,新建ArrayLi

Java编程的逻辑 (94) - 组合式异步编程

前面两节讨论了Java 8中的函数式数据处理,那是对38节到55节介绍的容器类的增强,它可以将对集合数据的多个操作以流水线的方式组合在一起.本节继续讨论Java 8的新功能,主要是一个新的类CompletableFuture,它是对65节到82节介绍的并发编程的增强,它可以方便地将多个有一定依赖关系的异步任务以流水线的方式组合在一起,大大简化多异步任务的开发. 之前介绍了那么多并发编程的内容,还有什么问题不能解决?CompletableFuture到底能解决什么问题?与之前介绍的内容有什么关系?

Java编程的逻辑 (58) - 文本文件和字符流

上节我们介绍了如何以字节流的方式处理文件,我们提到,对于文本文件,字节流没有编码的概念,不能按行处理,使用不太方便,更适合的是使用字符流,本节就来介绍字符流. 我们首先简要介绍下文本文件的基本概念.与二进制文件的区别.编码.以及字符流和字节流的区别,然后我们介绍Java中的主要字符流,它们有: Reader/Writer:字符流的基类,它们是抽象类. InputStreamReader/OutputStreamWriter:适配器类,输入是InputStream,输出是OutputStream,

Java编程的逻辑 (79) - 方便的CompletionService

上节,我们提到,在异步任务程序中,一种常见的场景是,主线程提交多个异步任务,然后希望有任务完成就处理结果,并且按任务完成顺序逐个处理,对于这种场景,Java并发包提供了一个方便的方法,使用CompletionService,这是一个接口,它的实现类是ExecutorCompletionService,本节我们就来探讨它们. 基本用法 接口和类定义 与77节介绍的ExecutorService一样,CompletionService也可以提交异步任务,它的不同是,它可以按任务完成顺序获取结果,其具

Java编程的逻辑 (87) - 类加载机制

上节,我们探讨了动态代理,在前几节中,我们多次提到了类加载器ClassLoader,本节就来详细讨论Java中的类加载机制与ClassLoader. 类加载器ClassLoader就是加载其他类的类,它负责将字节码文件加载到内存,创建Class对象.与之前介绍的反射.注解.和动态代理一样,在大部分的应用编程中,我们不太需要自己实现ClassLoader. 不过,理解类加载的机制和过程,有助于我们更好的理解之前介绍的内容,更好的理解Java.在反射一节,我们介绍过Class的静态方法Class.f

Java编程的逻辑 (91) - Lambda表达式

在之前的章节中,我们的讨论基本都是基于Java 7的,从本节开始,我们探讨Java 8的一些特性,主要内容包括: 传递行为代码 - Lambda表达式 函数式数据处理 - 流 组合式异步编程 - CompletableFuture 新的日期和时间API 本节,我们先讨论Lambda表达式,它是什么?有什么用呢? Lambda表达式是Java 8新引入的一种语法,是一种紧凑的传递代码的方式,它的名字来源于学术界的λ演算,具体我们就不探讨了. 理解Lambda表达式,我们先回顾一下接口.匿名内部类和

Java编程的逻辑 (26) - 剖析包装类 (上)

包装类 Java有八种基本类型,每种基本类型都有一个对应的包装类. 包装类是什么呢?它是一个类,内部有一个实例变量,保存对应的基本类型的值,这个类一般还有一些静态方法.静态变量和实例方法,以方便对数据进行操作. Java中,基本类型和对应的包装类如下表所示: 基本类型 包装类 boolean Boolean byte Byte short Short int Integer long Long float Float double Double char Character 包装类也都很好记,除

Java编程的逻辑 (29) - 剖析String

上节介绍了单个字符的封装类Character,本节介绍字符串类.字符串操作大概是计算机程序中最常见的操作了,Java中表示字符串的类是String,本节就来详细介绍String. 字符串的基本使用是比较简单直接的,我们来看下. 基本用法 可以通过常量定义String变量 String name = "老马说编程"; 也可以通过new创建String String name = new String("老马说编程"); String可以直接使用+和+=运算符,如: S

Java编程的逻辑 (18) - 为什么说继承是把双刃剑

继承是把双刃剑 通过前面几节,我们应该对继承有了一个比较好的理解,但之前我们说继承其实是把双刃剑,为什么这么说呢?一方面是因为继承是非常强大的,另一方面是因为继承的破坏力也是很强的. 继承的强大是比较容易理解的,具体体现在: 子类可以复用父类代码,不写任何代码即可具备父类的属性和功能,而只需要增加特有的属性和行为. 子类可以重写父类行为,还可以通过多态实现统一处理. 给父类增加属性和行为,就可以自动给所有子类增加属性和行为. 继承被广泛应用于各种Java API.框架和类库之中,一方面它们内部大