Java 多线程(2)-廖雪峰
使用wait和notify
在Java程序中,synchronized
解决了多线程竞争的问题。例如,对于一个任务管理器,多个线程同时往队列中添加任务,可以用synchronized
加锁:
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
}
但是synchronized
并没有解决多线程协调的问题。
仍然以上面的TaskQueue
为例,我们再编写一个getTask()
方法取出队列的第一个任务:
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
}
public synchronized String getTask() {
while (queue.isEmpty()) {
}
return queue.remove();
}
}
上述代码看上去没有问题:getTask()
内部先判断队列是否为空,如果为空,就循环等待,直到另一个线程往队列中放入了一个任务,while()
循环退出,就可以返回队列的元素了。
但实际上while()
循环永远不会退出。因为线程在执行while()
循环时,已经在getTask()
入口获取了this
锁,其他线程根本无法调用addTask()
,因为addTask()
执行条件也是获取this
锁。
因此,执行上述代码,线程会在getTask()
中因为死循环而100%占用CPU资源。
如果深入思考一下,我们想要的执行效果是:
- 线程1可以调用
addTask()
不断往队列中添加任务; - 线程2可以调用
getTask()
从队列中获取任务。如果队列为空,则getTask()
应该等待,直到队列中至少有一个任务时再返回。
因此,多线程协调运行的原则就是:当条件不满足时,线程进入等待状态;当条件满足时,线程被唤醒,继续执行任务。
对于上述TaskQueue
,我们先改造getTask()
方法,在条件不满足时,线程进入等待状态:
public synchronized String getTask() {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
当一个线程执行到getTask()
方法内部的while
循环时,它必定已经获取到了this
锁,此时,线程执行while
条件判断,如果条件成立(队列为空),线程将执行this.wait()
,进入等待状态。
这里的关键是:wait()
方法必须在当前获取的锁对象上调用,这里获取的是this
锁,因此调用this.wait()
。
调用wait()
方法后,线程进入等待状态,wait()
方法不会返回,直到将来某个时刻,线程从等待状态被其他线程唤醒后,wait()
方法才会返回,然后,继续执行下一条语句。
有些仔细的童鞋会指出:即使线程在getTask()
内部等待,其他线程如果拿不到this
锁,照样无法执行addTask()
,肿么办?
这个问题的关键就在于wait()
方法的执行机制非常复杂。首先,它不是一个普通的Java方法,而是定义在Object
类的一个native
方法,也就是由JVM的C代码实现的。其次,必须在synchronized
块中才能调用wait()
方法,因为wait()
方法调用时,会释放线程获得的锁,wait()
方法返回后,线程又会重新试图获得锁。
因此,只能在锁对象上调用wait()
方法。因为在getTask()
中,我们获得了this
锁,因此,只能在this
对象上调用wait()
方法:
public synchronized String getTask() {
while (queue.isEmpty()) {
// 释放this锁:
this.wait();
// 重新获取this锁
}
return queue.remove();
}
当一个线程在this.wait()
等待时,它就会释放this
锁,从而使得其他线程能够在addTask()
方法获得this
锁。
现在我们面临第二个问题:如何让等待的线程被重新唤醒,然后从wait()
方法返回?答案是在相同的锁对象上调用notify()
方法。我们修改addTask()
如下:
public synchronized void addTask(String s) {
this.queue.add(s);
this.notify(); // 唤醒在this锁等待的线程
}
注意到在往队列中添加了任务后,线程立刻对this
锁对象调用notify()
方法,这个方法会唤醒一个正在this
锁等待的线程(就是在getTask()
中位于this.wait()
的线程),从而使得等待线程从this.wait()
方法返回。
我们来看一个完整的例子:
public class Main {
public static void main(String[] args) throws InterruptedException {
var q = new TaskQueue();
var ts = new ArrayList<Thread>();
for (int i=0; i<5; i++) {
var t = new Thread() {
public void run() {
// 执行task:
while (true) {
try {
String s = q.getTask();
System.out.println("execute task: " + s);
} catch (InterruptedException e) {
return;
}
}
}
};
t.start();
ts.add(t);
}
var add = new Thread(() -> {
for (int i=0; i<10; i++) {
// 放入task:
String s = "t-" + Math.random();
System.out.println("add task: " + s);
q.addTask(s);
try { Thread.sleep(100); } catch(InterruptedException e) {}
}
});
add.start();
add.join();
Thread.sleep(100);
for (var t : ts) {
t.interrupt();
}
}
}
class TaskQueue {
Queue<String> queue = new LinkedList<>();
public synchronized void addTask(String s) {
this.queue.add(s);
this.notifyAll();
}
public synchronized String getTask() throws InterruptedException {
while (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
}
这个例子中,我们重点关注addTask()
方法,内部调用了this.notifyAll()
而不是this.notify()
,使用notifyAll()
将唤醒所有当前正在this
锁等待的线程,而notify()
只会唤醒其中一个(具体哪个依赖操作系统,有一定的随机性)。这是因为可能有多个线程正在getTask()
方法内部的wait()
中等待,使用notifyAll()
将一次性全部唤醒。通常来说,notifyAll()
更安全。有些时候,如果我们的代码逻辑考虑不周,用notify()
会导致只唤醒了一个线程,而其他线程可能永远等待下去醒不过来了。
但是,注意到wait()
方法返回时需要重新获得this
锁。假设当前有3个线程被唤醒,唤醒后,首先要等待执行addTask()
的线程结束此方法后,才能释放this
锁,随后,这3个线程中只能有一个获取到this
锁,剩下两个将继续等待。
再注意到我们在while()
循环中调用wait()
,而不是if
语句:
public synchronized String getTask() throws InterruptedException {
if (queue.isEmpty()) {
this.wait();
}
return queue.remove();
}
这种写法实际上是错误的,因为线程被唤醒时,需要再次获取this
锁。多个线程被唤醒后,只有一个线程能获取this
锁,此刻,该线程执行queue.remove()
可以获取到队列的元素,然而,剩下的线程如果获取this
锁后执行queue.remove()
,此刻队列可能已经没有任何元素了,所以,要始终在while
循环中wait()
,并且每次被唤醒后拿到this
锁就必须再次判断:
while (queue.isEmpty()) {
this.wait();
}
所以,正确编写多线程代码是非常困难的,需要仔细考虑的条件非常多,任何一个地方考虑不周,都会导致多线程运行时不正常。
简单使用wait和notify的例子
public class TestWait extends Thread {
public static void main(String[] args) throws InterruptedException {
TestWait testWait = new TestWait();
testWait.start();
Thread.sleep(100); // 这里需要保证先调用wait,否则notifyAll可能会先于wait执行
testWait.myNotify();
}
@Override
public void run() {
try {
myWait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private synchronized void myWait() throws InterruptedException {
log.info("myWait..1..");
this.wait();
log.info("myWait..2..");
}
private synchronized void myNotify() {
log.info("myNotify..1..");
this.notifyAll();
log.info("myNotify..2..");
}
}
小结
wait
和notify
用于多线程协调运行:
- 在
synchronized
内部可以调用wait()
使线程进入等待状态; - 必须在已获得的锁对象上调用
wait()
方法; - 在
synchronized
内部可以调用notify()
或notifyAll()
唤醒其他等待线程; - 必须在已获得的锁对象上调用
notify()
或notifyAll()
方法; - 已唤醒的线程还需要重新获得锁后才能继续执行。
使用ReentrantLock
从Java 5开始,引入了一个高级的处理并发的java.util.concurrent
包,它提供了大量更高级的并发功能,能大大简化多线程程序的编写。
我们知道Java语言直接提供了synchronized
关键字用于加锁,但这种锁一是很重,二是获取时必须一直等待,没有额外的尝试机制。
java.util.concurrent.locks
包提供的ReentrantLock
用于替代synchronized
加锁,我们来看一下传统的synchronized
代码:
public class Counter {
private int count;
public void add(int n) {
synchronized(this) {
count += n;
}
}
}
如果用ReentrantLock
替代,可以把代码改造为:
public class Counter {
private final Lock lock = new ReentrantLock();
private int count;
public void add(int n) {
lock.lock();
try {
count += n;
} finally {
lock.unlock();
}
}
}
因为synchronized
是Java语言层面提供的语法,所以我们不需要考虑异常,而ReentrantLock
是Java代码实现的锁,我们就必须先获取锁,然后在finally
中正确释放锁。
顾名思义,ReentrantLock
是可重入锁,它和synchronized
一样,一个线程可以多次获取同一个锁。
和synchronized
不同的是,ReentrantLock
可以尝试获取锁:
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
...
} finally {
lock.unlock();
}
}
上述代码在尝试获取锁的时候,最多等待1秒。如果1秒后仍未获取到锁,tryLock()
返回false
,程序就可以做一些额外处理,而不是无限等待下去。
所以,使用ReentrantLock
比直接使用synchronized
更安全,线程在tryLock()
失败的时候不会导致死锁。
小结
ReentrantLock
可以替代synchronized
进行同步;
ReentrantLock
获取锁更安全;
必须先获取到锁,再进入try {...}
代码块,最后使用finally
保证释放锁;
可以使用tryLock()
尝试获取锁。
使用Condition
使用ReentrantLock
比直接使用synchronized
更安全,可以替代synchronized
进行线程同步。
但是,synchronized
可以配合wait
和notify
实现线程在条件不满足时等待,条件满足时唤醒,用ReentrantLock
我们怎么编写wait
和notify
的功能呢?
答案是使用Condition
对象来实现wait
和notify
的功能。
我们仍然以TaskQueue
为例,把前面用synchronized
实现的功能通过ReentrantLock
和Condition
来实现:
class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
可见,使用Condition
时,引用的Condition
对象必须从Lock
实例的newCondition()
返回,这样才能获得一个绑定了Lock
实例的Condition
实例。
Condition
提供的await()
、signal()
、signalAll()
原理和synchronized
锁对象的wait()
、notify()
、notifyAll()
是一致的,并且其行为也是一样的:
await()
会释放当前锁,进入等待状态;signal()
会唤醒某个等待线程;signalAll()
会唤醒所有等待线程;- 唤醒线程从
await()
返回后需要重新获得锁。
此外,和tryLock()
类似,await()
可以在等待指定时间后,如果还没有被其他线程通过signal()
或signalAll()
唤醒,可以自己醒来:
if (condition.await(1, TimeUnit.SECOND)) {
// 被其他线程唤醒
} else {
// 指定时间内没有被其他线程唤醒
}
可见,使用Condition
配合Lock
,我们可以实现更灵活的线程同步。
小结
Condition
可以替代wait
和notify
;
Condition
对象必须从Lock
对象获取。
使用ReadWriteLock
前面讲到的ReentrantLock
保证了只有一个线程可以执行临界区代码:
public class Counter {
private final Lock lock = new ReentrantLock();
private int[] counts = new int[10];
public void inc(int index) {
lock.lock();
try {
counts[index] += 1;
} finally {
lock.unlock();
}
}
public int[] get() {
lock.lock();
try {
return Arrays.copyOf(counts, counts.length);
} finally {
lock.unlock();
}
}
}
但是有些时候,这种保护有点过头。因为我们发现,任何时刻,只允许一个线程修改,也就是调用inc()
方法是必须获取锁,但是,get()
方法只读取数据,不修改数据,它实际上允许多个线程同时调用。
实际上我们想要的是:允许多个线程同时读,但只要有一个线程在写,其他线程就必须等待:
读 | 写 | |
---|---|---|
读 | 允许 | 不允许 |
写 | 不允许 | 不允许 |
使用ReadWriteLock
可以解决这个问题,它保证:
- 只允许一个线程写入(其他线程既不能写入也不能读取);
- 没有写入时,多个线程允许同时读(提高性能)。
用ReadWriteLock
实现这个功能十分容易。我们需要创建一个ReadWriteLock
实例,然后分别获取读锁和写锁:
public class Counter {
private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
private final Lock rlock = rwlock.readLock();
private final Lock wlock = rwlock.writeLock();
private int[] counts = new int[10];
public void inc(int index) {
wlock.lock(); // 加写锁
try {
counts[index] += 1;
} finally {
wlock.unlock(); // 释放写锁
}
}
public int[] get() {
rlock.lock(); // 加读锁
try {
return Arrays.copyOf(counts, counts.length);
} finally {
rlock.unlock(); // 释放读锁
}
}
}
把读写操作分别用读锁和写锁来加锁,在读取时,多个线程可以同时获得读锁,这样就大大提高了并发读的执行效率。
使用ReadWriteLock
时,适用条件是同一个数据,有大量线程读取,但仅有少数线程修改。
例如,一个论坛的帖子,回复可以看做写入操作,它是不频繁的,但是,浏览可以看做读取操作,是非常频繁的,这种情况就可以使用ReadWriteLock
。
小结
使用ReadWriteLock
可以提高读取效率:
ReadWriteLock
只允许一个线程写入;ReadWriteLock
允许多个线程在没有写入时同时读取;ReadWriteLock
适合读多写少的场景。
使用StampedLock
前面介绍的ReadWriteLock
可以解决多线程同时读,但只有一个线程能写的问题。
如果我们深入分析ReadWriteLock
,会发现它有个潜在的问题:如果有线程正在读,写线程需要等待读线程释放锁后才能获取写锁,即读的过程中不允许写,这是一种悲观的读锁。
要进一步提升并发执行效率,Java 8引入了新的读写锁:StampedLock
。
StampedLock
和ReadWriteLock
相比,改进之处在于:读的过程中也允许获取写锁后写入!这样一来,我们读的数据就可能不一致,所以,需要一点额外的代码来判断读的过程中是否有写入,这种读锁是一种乐观锁。
乐观锁的意思就是乐观地估计读的过程中大概率不会有写入,因此被称为乐观锁。反过来,悲观锁则是读的过程中拒绝有写入,也就是写入必须等待。显然乐观锁的并发效率更高,但一旦有小概率的写入导致读取的数据不一致,需要能检测出来,再读一遍就行。
我们来看例子:
public class Point {
private final StampedLock stampedLock = new StampedLock();
private double x;
private double y;
public void move(double deltaX, double deltaY) {
long stamp = stampedLock.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
stampedLock.unlockWrite(stamp); // 释放写锁
}
}
public double distanceFromOrigin() {
long stamp = stampedLock.tryOptimisticRead(); // 获得一个乐观读锁
// 注意下面两行代码不是原子操作
// 假设x,y = (100,200)
double currentX = x;
// 此处已读取到x=100,但x,y可能被写线程修改为(300,400)
double currentY = y;
// 此处已读取到y,如果没有写入,读取是正确的(100,200)
// 如果有写入,读取是错误的(100,400)
if (!stampedLock.validate(stamp)) { // 检查乐观读锁后是否有其他写锁发生
stamp = stampedLock.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
stampedLock.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
}
和ReadWriteLock
相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()
获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()
去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据。
可见,StampedLock
把读锁细分为乐观读和悲观读,能进一步提升并发效率。但这也是有代价的:一是代码更加复杂,二是StampedLock
是不可重入锁,不能在一个线程中反复获取同一个锁。
StampedLock
还提供了更复杂的将悲观读锁升级为写锁的功能,它主要使用在if-then-update的场景:即先读,如果读的数据满足条件,就返回,如果读的数据不满足条件,再尝试写。
小结
StampedLock
提供了乐观读锁,可取代ReadWriteLock
以进一步提升并发性能;
StampedLock
是不可重入锁。
使用Concurrent集合
我们在前面已经通过ReentrantLock
和Condition
实现了一个BlockingQueue
:
public class TaskQueue {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private Queue<String> queue = new LinkedList<>();
public void addTask(String s) {
lock.lock();
try {
queue.add(s);
condition.signalAll();
} finally {
lock.unlock();
}
}
public String getTask() {
lock.lock();
try {
while (queue.isEmpty()) {
condition.await();
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
BlockingQueue
的意思就是说,当一个线程调用这个TaskQueue
的getTask()
方法时,该方法内部可能会让线程变成等待状态,直到队列条件满足不为空,线程被唤醒后,getTask()
方法才会返回。
因为BlockingQueue
非常有用,所以我们不必自己编写,可以直接使用Java标准库的java.util.concurrent
包提供的线程安全的集合:ArrayBlockingQueue
。
除了BlockingQueue
外,针对List
、Map
、Set
、Deque
等,java.util.concurrent
包也提供了对应的并发集合类。我们归纳一下:
interface | non-thread-safe | thread-safe |
---|---|---|
List | ArrayList | CopyOnWriteArrayList |
Map | HashMap | ConcurrentHashMap |
Set | HashSet / TreeSet | CopyOnWriteArraySet |
Queue | ArrayDeque / LinkedList | ArrayBlockingQueue / LinkedBlockingQueue |
Deque | ArrayDeque / LinkedList | LinkedBlockingDeque |
使用这些并发集合与使用非线程安全的集合类完全相同。我们以ConcurrentHashMap
为例:
Map<String, String> map = ConcurrentHashMap<>();
// 在不同的线程读写:
map.put("A", "1");
map.put("B", "2");
map.get("A", "1");
因为所有的同步和加锁的逻辑都在集合内部实现,对外部调用者来说,只需要正常按接口引用,其他代码和原来的非线程安全代码完全一样。即当我们需要多线程访问时,把:
Map<String, String> map = HashMap<>();
改为:
Map<String, String> map = ConcurrentHashMap<>();
就可以了。
java.util.Collections
工具类还提供了一个旧的线程安全集合转换器,可以这么用:
Map unsafeMap = new HashMap();
Map threadSafeMap = Collections.synchronizedMap(unsafeMap);
但是它实际上是用一个包装类包装了非线程安全的Map
,然后对所有读写方法都用synchronized
加锁,这样获得的线程安全集合的性能比java.util.concurrent
集合要低很多,所以不推荐使用。
小结
使用java.util.concurrent
包提供的线程安全的并发集合可以大大简化多线程编程:
多线程同时读写并发集合是安全的;
尽量使用Java标准库提供的并发集合,避免自己编写同步代码。
使用Atomic
Java的java.util.concurrent
包除了提供底层锁、并发集合外,还提供了一组原子操作的封装类,它们位于java.util.concurrent.atomic
包。
我们以AtomicInteger
为例,它提供的主要操作有:
- 增加值并返回新值:
int addAndGet(int delta)
- 加1后返回新值:
int incrementAndGet()
- 获取当前值:
int get()
- 用CAS方式设置:
int compareAndSet(int expect, int update)
Atomic类是通过无锁(lock-free)的方式实现的线程安全(thread-safe)访问。它的主要原理是利用了CAS:Compare and Set。
如果我们自己通过CAS编写incrementAndGet()
,它大概长这样:
public int incrementAndGet(AtomicInteger var) {
int prev, next;
do {
prev = var.get();
next = prev + 1;
} while ( ! var.compareAndSet(prev, next));
return prev;
}
CAS是指,在这个操作中,如果AtomicInteger
的当前值是prev
,那么就更新为next
,返回true
。如果AtomicInteger
的当前值不是prev
,就什么也不干,返回false
。通过CAS操作并配合do ... while
循环,即使其他线程修改了AtomicInteger
的值,最终的结果也是正确的。
我们利用AtomicLong
可以编写一个多线程安全的全局唯一ID生成器:
class IdGenerator {
AtomicLong var = new AtomicLong(0);
public long getNextId() {
return var.incrementAndGet();
}
}
通常情况下,我们并不需要直接用do ... while
循环调用compareAndSet
实现复杂的并发操作,而是用incrementAndGet()
这样的封装好的方法,因此,使用起来非常简单。
在高度竞争的情况下,还可以使用Java 8提供的LongAdder
和LongAccumulator
。
小结
使用java.util.concurrent.atomic
提供的原子操作可以简化多线程编程:
- 原子操作实现了无锁的线程安全;
- 适用于计数器,累加器等。
使用线程池
Java语言虽然内置了多线程支持,启动一个新线程非常方便,但是,创建线程需要操作系统资源(线程资源,栈空间等),频繁创建和销毁大量线程需要消耗大量时间。
如果可以复用一组线程:
┌─────┐ execute ┌──────────────────┐
│Task1│─────────>│ThreadPool │
├─────┤ │┌───────┐┌───────┐│
│Task2│ ││Thread1││Thread2││
├─────┤ │└───────┘└───────┘│
│Task3│ │┌───────┐┌───────┐│
├─────┤ ││Thread3││Thread4││
│Task4│ │└───────┘└───────┘│
├─────┤ └──────────────────┘
│Task5│
├─────┤
│Task6│
└─────┘
...
那么我们就可以把很多小任务让一组线程来执行,而不是一个任务对应一个新线程。这种能接收大量小任务并进行分发处理的就是线程池。
简单地说,线程池内部维护了若干个线程,没有任务的时候,这些线程都处于等待状态。如果有新任务,就分配一个空闲线程执行。如果所有线程都处于忙碌状态,新任务要么放入队列等待,要么增加一个新线程进行处理。
Java标准库提供了ExecutorService
接口表示线程池,它的典型用法如下:
// 创建固定大小的线程池:
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交任务:
executor.submit(task1);
executor.submit(task2);
executor.submit(task3);
executor.submit(task4);
executor.submit(task5);
因为ExecutorService
只是接口,Java标准库提供的几个常用实现类有:
- FixedThreadPool:线程数固定的线程池;
- CachedThreadPool:线程数根据任务动态调整的线程池;
- SingleThreadExecutor:仅单线程执行的线程池。
创建这些线程池的方法都被封装到Executors
这个类中。我们以FixedThreadPool
为例,看看线程池的执行逻辑:
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建一个固定大小的线程池:
ExecutorService es = Executors.newFixedThreadPool(4);
for (int i = 0; i < 6; i++) {
es.submit(new Task("" + i));
}
// 关闭线程池:
es.shutdown();
}
}
@Slf4j
class Task implements Runnable {
private final String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
log.info("start task " + name);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
log.info("end task " + name);
}
}
我们观察执行结果,一次性放入6个任务,由于线程池只有固定的4个线程,因此,前4个任务会同时执行,等到有线程空闲后,才会执行后面的两个任务。
线程池在程序结束的时候要关闭。使用shutdown()
方法关闭线程池的时候,它会等待正在执行的任务先完成,然后再关闭。shutdownNow()
会立刻停止正在执行的任务,awaitTermination()
则会等待指定的时间让线程池关闭。
如果我们把线程池改为CachedThreadPool
,由于这个线程池的实现会根据任务数量动态调整线程池的大小,所以6个任务可一次性全部同时执行。
如果我们想把线程池的大小限制在4~10个之间动态调整怎么办?我们查看Executors.newCachedThreadPool()
方法的源码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
因此,想创建指定动态范围的线程池,可以这么写:
int min = 4;
int max = 10;
ExecutorService es = new ThreadPoolExecutor(min, max,
60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
ScheduledThreadPool
还有一种任务,需要定期反复执行,例如,每秒刷新证券价格。这种任务本身固定,需要反复执行的,可以使用ScheduledThreadPool
。放入ScheduledThreadPool
的任务可以定期反复执行。
创建一个ScheduledThreadPool
仍然是通过Executors
类:
ScheduledExecutorService ses = Executors.newScheduledThreadPool(4);
我们可以提交一次性任务,它会在指定延迟后只执行一次:
// 1秒后执行一次性任务:
ses.schedule(new Task("one-time"), 1, TimeUnit.SECONDS);
如果任务以固定的每3秒执行,我们可以这样写:
// 2秒后开始执行定时任务,每3秒执行:
ses.scheduleAtFixedRate(new Task("fixed-rate"), 2, 3, TimeUnit.SECONDS);
如果任务以固定的3秒为间隔执行,我们可以这样写:
// 3秒后开始执行定时任务,以3秒为间隔执行:
ses.scheduleWithFixedDelay(new Task("fixed-delay"), 2, 3, TimeUnit.SECONDS);
注意FixedRate和FixedDelay的区别。FixedRate是指任务总是以固定时间间隔触发,不管任务执行多长时间:
│???? │?????? │??? │????? │???
├───────┼───────┼───────┼───────┼────>
│<─────>│<─────>│<─────>│<─────>│
而FixedDelay是指,上一次任务执行完毕后,等待固定的时间间隔,再执行下一次任务:
│???│ │?????│ │??│ │?
└───┼───────┼─────┼───────┼──┼───────┼──>
│<─────>│ │<─────>│ │<─────>│
因此,使用ScheduledThreadPool
时,我们要根据需要选择执行一次、FixedRate执行还是FixedDelay执行。
细心的童鞋还可以思考下面的问题:
- 在FixedRate模式下,假设每秒触发,如果某次任务执行时间超过1秒,后续任务会不会并发执行?
- 如果任务抛出了异常,后续任务是否继续执行?
Java标准库还提供了一个java.util.Timer
类,这个类也可以定期执行任务,但是,一个Timer
会对应一个Thread
,所以,一个Timer
只能定期执行一个任务,多个定时任务必须启动多个Timer
,而一个ScheduledThreadPool
就可以调度多个定时任务,所以,我们完全可以用ScheduledThreadPool
取代旧的Timer
。
小结
JDK提供了ExecutorService
实现了线程池功能:
- 线程池内部维护一组线程,可以高效执行大量小任务;
Executors
提供了静态方法创建不同类型的ExecutorService
;- 必须调用
shutdown()
关闭ExecutorService
; ScheduledThreadPool
可以定期调度多个任务。
使用Future
在执行多个任务的时候,使用Java标准库提供的线程池是非常方便的。我们提交的任务只需要实现Runnable
接口,就可以让线程池去执行:
class Task implements Runnable {
public String result;
public void run() {
this.result = longTimeCalculation();
}
}
Runnable
接口有个问题,它的方法没有返回值。如果任务需要一个返回结果,那么只能保存到变量,还要提供额外的方法读取,非常不便。所以,Java标准库还提供了一个Callable
接口,和Runnable
接口比,它多了一个返回值:
class Task implements Callable<String> {
public String call() throws Exception {
return longTimeCalculation();
}
}
并且Callable
接口是一个泛型接口,可以返回指定类型的结果。
现在的问题是,如何获得异步执行的结果?
如果仔细看ExecutorService.submit()
方法,可以看到,它返回了一个Future
类型,一个Future
类型的实例代表一个未来能获取结果的对象:
ExecutorService executor = Executors.newFixedThreadPool(4);
// 定义任务:
Callable<String> task = new Task();
// 提交任务并获得Future:
Future<String> future = executor.submit(task);
// 从Future获取异步执行返回的结果:
String result = future.get(); // 可能阻塞
当我们提交一个Callable
任务后,我们会同时获得一个Future
对象,然后,我们在主线程某个时刻调用Future
对象的get()
方法,就可以获得异步执行的结果。在调用get()
时,如果异步任务已经完成,我们就直接获得结果。如果异步任务还没有完成,那么get()
会阻塞,直到任务完成后才返回结果。
一个Future
接口表示一个未来可能会返回的结果,它定义的方法有:
get()
:获取结果(可能会等待)get(long timeout, TimeUnit unit)
:获取结果,但只等待指定的时间;cancel(boolean mayInterruptIfRunning)
:取消当前任务;isDone()
:判断任务是否已完成。
小结
对线程池提交一个Callable
任务,可以获得一个Future
对象;
可以用Future
在将来某个时刻获取结果。
使用CompletableFuture
使用Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会被迫等待。
从Java 8开始引入了CompletableFuture
,它针对Future
做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
我们以获取股票价格为例,看看如何使用CompletableFuture
:
public class Main {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
// 创建异步执行任务:
CompletableFuture<Double> cf = CompletableFuture.supplyAsync(Main::fetchPrice);
// 如果执行成功:
cf.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 如果执行异常:
cf.exceptionally((e) -> {
e.printStackTrace();
return null;
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}
}
static Double fetchPrice() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
if (Math.random() < 0.3) {
throw new RuntimeException("fetch price failed!");
}
return 5 + Math.random() * 20;
}
}
创建一个CompletableFuture
是通过CompletableFuture.supplyAsync()
实现的,它需要一个实现了Supplier
接口的对象:
public interface Supplier<T> {
T get();
}
这里我们用lambda语法简化了一下,直接传入Main::fetchPrice
,因为Main.fetchPrice()
静态方法的签名符合Supplier
接口的定义(除了方法名外)。
紧接着,CompletableFuture
已经被提交给默认的线程池执行了,我们需要定义的是CompletableFuture
完成时和异常时需要回调的实例。完成时,CompletableFuture
会调用Consumer
对象:
public interface Consumer<T> {
void accept(T t);
}
异常时,CompletableFuture
会调用Function
对象:
public interface Function<T, R> {
R apply(T t);
}
这里我们都用lambda语法简化了代码。
可见CompletableFuture
的优点是:
- 异步任务结束时,会自动回调某个对象的方法;
- 异步任务出错时,会自动回调某个对象的方法;
- 主线程设置好回调后,不再关心异步任务的执行。
如果只是实现了异步回调机制,我们还看不出CompletableFuture
相比Future
的优势。CompletableFuture
更强大的功能是,多个CompletableFuture
可以串行执行,例如,定义两个CompletableFuture
,第一个CompletableFuture
根据证券名称查询证券代码,第二个CompletableFuture
根据证券代码查询证券价格,这两个CompletableFuture
实现串行操作如下:
public class Main {
public static void main(String[] args) throws Exception {
// 第一个任务:
CompletableFuture<String> cfQuery = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油");
});
// cfQuery成功后继续执行下一个任务:
CompletableFuture<Double> cfFetch = cfQuery.thenApplyAsync((code) -> {
return fetchPrice(code);
});
// cfFetch成功后打印结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}
static String queryCode(String name) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
除了串行执行外,多个CompletableFuture
还可以并行执行。例如,我们考虑这样的场景:
同时从新浪和网易查询证券代码,只要任意一个返回结果,就进行下一步查询价格,查询价格也同时从新浪和网易查询,只要任意一个返回结果,就完成操作:
public class Main {
public static void main(String[] args) throws Exception {
// 两个CompletableFuture执行异步查询:
CompletableFuture<String> cfQueryFromSina = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://finance.sina.com.cn/code/");
});
CompletableFuture<String> cfQueryFrom163 = CompletableFuture.supplyAsync(() -> {
return queryCode("中国石油", "https://money.163.com/code/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfQuery = CompletableFuture.anyOf(cfQueryFromSina, cfQueryFrom163);
// 两个CompletableFuture执行异步查询:
CompletableFuture<Double> cfFetchFromSina = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://finance.sina.com.cn/price/");
});
CompletableFuture<Double> cfFetchFrom163 = cfQuery.thenApplyAsync((code) -> {
return fetchPrice((String) code, "https://money.163.com/price/");
});
// 用anyOf合并为一个新的CompletableFuture:
CompletableFuture<Object> cfFetch = CompletableFuture.anyOf(cfFetchFromSina, cfFetchFrom163);
// 最终结果:
cfFetch.thenAccept((result) -> {
System.out.println("price: " + result);
});
// 主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:
Thread.sleep(2000);
}
static String queryCode(String name, String url) {
System.out.println("query code from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
}
return "601857";
}
static Double fetchPrice(String code, String url) {
System.out.println("query price from " + url + "...");
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {
}
return 5 + Math.random() * 20;
}
}
上述逻辑实现的异步查询规则实际上是:
┌─────────────┐ ┌─────────────┐
│ Query Code │ │ Query Code │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└───────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
┌───────┴────────┐
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Query Price │ │ Query Price │
│ from sina │ │ from 163 │
└─────────────┘ └─────────────┘
│ │
└────────┬───────┘
▼
┌─────────────┐
│ anyOf │
└─────────────┘
│
▼
┌─────────────┐
│Display Price│
└─────────────┘
除了anyOf()
可以实现“任意个CompletableFuture
只要一个成功”,allOf()
可以实现“所有CompletableFuture
都必须成功”,这些组合操作可以实现非常复杂的异步流程控制。
最后我们注意CompletableFuture
的命名规则:
xxx()
:表示该方法将继续在已有的线程中执行;xxxAsync()
:表示将异步在线程池中执行。
小结
CompletableFuture
可以指定异步处理流程:
thenAccept()
处理正常结果;exceptional()
处理异常结果;thenApplyAsync()
用于串行化另一个CompletableFuture
;anyOf()
和allOf()
用于并行化多个CompletableFuture
。
使用ForkJoin
Java 7开始引入了一种新的Fork/Join线程池,它可以执行一种特殊的任务:把一个大任务拆成多个小任务并行执行。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。
我们来看如何使用Fork/Join对大数据进行并行求和:
public class Main {
public static void main(String[] args) throws Exception {
// 创建2000个随机数组成的数组:
long[] array = new long[2000];
long expectedSum = 0;
for (int i = 0; i < array.length; i++) {
array[i] = random();
expectedSum += array[i];
}
System.out.println("Expected sum: " + expectedSum);
// fork/join:
ForkJoinTask<Long> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Long result = ForkJoinPool.commonPool().invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("Fork/join sum: " + result + " in " + (endTime - startTime) + " ms.");
}
static Random random = new Random(0);
static long random() {
return random.nextInt(10000);
}
}
class SumTask extends RecursiveTask<Long> {
static final int THRESHOLD = 500;
long[] array;
int start;
int end;
SumTask(long[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
if (end - start <= THRESHOLD) {
// 如果任务足够小,直接计算:
long sum = 0;
for (int i = start; i < end; i++) {
sum += this.array[i];
// 故意放慢计算速度:
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
}
return sum;
}
// 任务太大,一分为二:
int middle = (end + start) / 2;
System.out.println(String.format("split %d~%d ==> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
System.out.println("result = " + subresult1 + " + " + subresult2 + " ==> " + result);
return result;
}
}
观察上述代码的执行过程,一个大的计算任务0~2000首先分裂为两个小任务0~1000和1000~2000,这两个小任务仍然太大,继续分裂为更小的0~500,500~1000,1000~1500,1500~2000,最后,计算结果被依次合并,得到最终结果。
因此,核心代码SumTask
继承自RecursiveTask
,在compute()
方法中,关键是如何“分裂”出子任务并且提交子任务:
class SumTask extends RecursiveTask<Long> {
protected Long compute() {
// “分裂”子任务:
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// invokeAll会并行运行两个子任务:
invokeAll(subtask1, subtask2);
// 获得子任务的结果:
Long result1 = fork1.join();
Long result2 = fork2.join();
// 汇总结果:
return result1 + result2;
}
}
Fork/Join线程池在Java标准库中就有应用。Java标准库提供的java.util.Arrays.parallelSort(array)
可以进行并行排序,它的原理就是内部通过Fork/Join对大数组分拆进行并行排序,在多核CPU上就可以大大提高排序的速度。
小结
Fork/Join是一种基于“分治”的算法:通过分解任务,并行执行,最后合并结果得到最终结果。
ForkJoinPool
线程池可以把一个大任务分拆成小任务并行执行,任务类必须继承自RecursiveTask
或RecursiveAction
。
使用Fork/Join模式可以进行并行计算以提高效率。
使用ThreadLocal
对于多任务,Java标准库提供的线程池可以方便地执行这些任务,同时复用线程。Web应用程序就是典型的多任务应用,每个用户请求页面时,我们都会创建一个任务,类似:
public void process(User user) {
checkPermission();
doWork();
saveStatus();
sendResponse();
}
然后,通过线程池去执行这些任务。
观察process()
方法,它内部需要调用若干其他方法,同时,我们遇到一个问题:如何在一个线程内传递状态?
process()
方法需要传递的状态就是User
实例。有的童鞋会想,简单地传入User
就可以了:
public void process(User user) {
checkPermission(user);
doWork(user);
saveStatus(user);
sendResponse(user);
}
但是往往一个方法又会调用其他很多方法,这样会导致User
传递到所有地方:
void doWork(User user) {
queryStatus(user);
checkStatus();
setNewStatus(user);
log();
}
这种在一个线程中,横跨若干方法调用,需要传递的对象,我们通常称之为上下文(Context),它是一种状态,可以是用户身份、任务信息等。
给每个方法增加一个context参数非常麻烦,而且有些时候,如果调用链有无法修改源码的第三方库,User
对象就传不进去了。
Java标准库提供了一个特殊的ThreadLocal
,它可以在一个线程中传递同一个对象。
ThreadLocal
实例通常总是以静态字段初始化如下:
static ThreadLocal<String> threadLocalUser = new ThreadLocal<>();
它的典型使用方式如下:
void processUser(user) {
try {
threadLocalUser.set(user);
step1();
step2();
} finally {
threadLocalUser.remove();
}
}
通过设置一个User
实例关联到ThreadLocal
中,在移除之前,所有方法都可以随时获取到该User
实例:
void step1() {
User u = threadLocalUser.get();
log();
printUser();
}
void log() {
User u = threadLocalUser.get();
println(u.name);
}
void step2() {
User u = threadLocalUser.get();
checkUser(u.id);
}
注意到普通的方法调用一定是同一个线程执行的,所以,step1()
、step2()
以及log()
方法内,threadLocalUser.get()
获取的User
对象是同一个实例。
实际上,可以把ThreadLocal
看成一个全局Map
:每个线程获取ThreadLocal
变量时,总是使用Thread
自身作为key:
Object threadLocalValue = threadLocalMap.get(Thread.currentThread());
因此,ThreadLocal
相当于给每个线程都开辟了一个独立的存储空间,各个线程的ThreadLocal
关联的实例互不干扰。
最后,特别注意ThreadLocal
一定要在finally
中清除:
try {
threadLocalUser.set(user);
...
} finally {
threadLocalUser.remove();
}
这是因为当前线程执行完相关代码后,很可能会被重新放入线程池中,如果ThreadLocal
没有被清除,该线程执行其他代码时,会把上一次的状态带进去。
为了保证能释放ThreadLocal
关联的实例,我们可以通过AutoCloseable
接口配合try (resource) {...}
结构,让编译器自动为我们关闭。例如,一个保存了当前用户名的ThreadLocal
可以封装为一个UserContext
对象:
public class UserContext implements AutoCloseable {
static final ThreadLocal<String> ctx = new ThreadLocal<>();
public UserContext(String user) {
ctx.set(user);
}
public static String currentUser() {
return ctx.get();
}
@Override
public void close() {
ctx.remove();
}
}
使用的时候,我们借助try (resource) {...}
结构,可以这么写:
try (var ctx = new UserContext("Bob")) {
// 可任意调用UserContext.currentUser():
String currentUser = UserContext.currentUser();
} // 在此自动调用UserContext.close()方法释放ThreadLocal关联对象
这样就在UserContext
中完全封装了ThreadLocal
,外部代码在try (resource) {...}
内部可以随时调用UserContext.currentUser()
获取当前线程绑定的用户名。
原文地址:https://www.cnblogs.com/huangwenjie/p/12361481.html