- 显示锁
Lock接口是Java 5.0新增的接口,该接口的定义如下:
12
3
4
5
6
7
8
public
interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}
与内置加锁机制不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显示的。ReentrantLock实现了Lock接口,与内置锁相比,ReentrantLock有以下优势:可以中断获取锁操作,获取锁时候可以设置超时时间。以下代码给出了Lock接口的标准使用形式:
12
3
4
5
6
7
Lock lock = new ReentrantLock();
...
lock.lock();
try{
...
} finally {
lock.unlock();
1.1、轮询锁与定时锁
可定时的与可轮询的锁获取方式是由tryLock方法实现的,与无条件的锁获取方式相比,它具有跟完善的错误回复机制。tryLock方法的说明如下:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
boolean tryLock():仅在调用时锁为空闲状态才获取该锁。如果锁可用,则获取锁,并立即返回值
true
。如果锁不可用,则此方法将立即返回值
false
。
boolean tryLock(long
time
, TimeUnit unit) throws InterruptedException:
如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
如果锁可用,则此方法将立即返回值
true
。如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下三种情况之一前,该线程将一直处于休眠状态:
锁由当前线程获得;或者
其他某个线程中断当前线程,并且支持对锁获取的中断;或者
已超过指定的等待时间
如果获得了锁,则返回值
true
。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在获取锁时被中断,并且支持对锁获取的中断,
则将抛出 InterruptedException,并会清除当前线程的已中断状态。
如果超过了指定的等待时间,则将返回值
false
。如果
time
小于等于 0,该方法将完全不等待。
在内置锁中,死锁是一个严重的问题,恢复程序的唯一方法是重新启动程序,而防止死锁的唯一方法就是在构造程序时避免出现不一致的锁顺序,可定时的与可轮询的锁提供了另一种选择:先用tryLock()尝试获取所有的锁,如果不能获取所有需要的锁,那么释放已经获取的锁,然后重新尝试获取所有的锁,以下例子演示了使用tryLock避免死锁的方法:先用tryLock来获取两个锁,如果不能同时获取,那么就回退并重新尝试。
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public
boolean transferMoney(Account fromAcct, Account toAcct, DollarAmount amount, long timeout, TimeUnit unit) throws InsufficientFundsException, InterruptedException {
long fixedDelay = 1;
long randMod = 2;
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (
true
) {
if (fromAcct.lock.tryLock()) {
try {
if (toAcct.lock.tryLock()) {
try {
if (fromAcct.getBalance().compareTo(amount) < 0)
throw new InsufficientFundsException();
else
{
fromAcct.debit(amount);
toAcct.credit(amount);
return
true
;
}
} finally {
toAcct.lock.unlock();
}
}
} finally {
fromAcct.lock.unlock();
}
}
if (System.nanoTime() < stopTime)
return
false
;
NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
}
}
1.2、可中断的锁获取操作
lockInterruptibly方法能够在获得锁的同时保持对中断的响应,该方法说明如下:
12
3
4
5
6
7
8
9
10
11
12
13
14
void lockInterruptibly() throws InterruptedException:
如果当前线程未被中断,则获取锁。
如果锁可用,则获取锁,并立即返回。
如果锁不可用,出于线程调度目的,将禁用当前线程,并且在发生以下两种情况之一以前,该线程将一直处于休眠状态:
锁由当前线程获得;或者
其他某个线程中断当前线程,并且支持对锁获取的中断。
如果当前线程:
在进入此方法时已经设置了该线程的中断状态;或者
在获取锁时被中断,并且支持对锁获取的中断,
则将抛出 InterruptedException,并清除当前线程的已中断状态。
1.3、读-写锁
Java 5除了增加了Lock接口,还增加了ReadWriteLock接口,即读写锁,该接口定义如下:
12
3
4
public
interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
读写锁允许多个读线程并发执行,但是不允许写线程与读线程并发执行,也不允许写线程与写线程并发执行。下面的例子使用了ReentrantReadWriteLock包装Map,从而使他能够在多个线程之间安全的共享:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public
class
ReadWriteMap <K,V> {
private
final
Map<K, V> map;
private
final
ReadWriteLock lock =
new
ReentrantReadWriteLock();
private
final
Lock r = lock.readLock();
private
final
Lock w = lock.writeLock();
public
ReadWriteMap(Map<K, V> map) {
this
.map = map;
}
public
V put(K key, V value) {
w.lock();
try
{
return
map.put(key, value);
}
finally
{
w.unlock();
}
}
public
V remove(Object key) {
w.lock();
try
{
return
map.remove(key);
}
finally
{
w.unlock();
}
}
public
void
putAll(Map<?
extends
K, ?
extends
V> m) {
w.lock();
try
{
map.putAll(m);
}
finally
{
w.unlock();
}
}
public
void
clear() {
w.lock();
try
{
map.clear();
}
finally
{
w.unlock();
}
}
public
V get(Object key) {
r.lock();
try
{
return
map.get(key);
}
finally
{
r.unlock();
}
}
public
int
size() {
r.lock();
try
{
return
map.size();
}
finally
{
r.unlock();
}
}
public
boolean
isEmpty() {
r.lock();
try
{
return
map.isEmpty();
}
finally
{
r.unlock();
}
}
public
boolean
containsKey(Object key) {
r.lock();
try
{
return
map.containsKey(key);
}
finally
{
r.unlock();
}
}
public
boolean
containsValue(Object value) {
r.lock();
try
{
return
map.containsValue(value);
}
finally
{
r.unlock();
}
}
}
同步工具类
2.1、闭锁
闭锁是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await 方法会一直受阻塞。之后,会释放所有等待的线程,await 的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
下例给出了闭锁的常见用法,TestHarness创建一定数量的线程,利用它们并发的执行指定的任务,它使用两个闭锁,分别表示"起始门"和"结束门"。每个线程首先要做的就是在启动门上等待,从而确保所有线程都就绪后才开始执行,而每个线程要做的最后一件事是将调用结束门的countDown方法减1,这能使主线程高效地等待直到所有工作线程都执行完毕,因此可以统计所消耗的时间:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public
class
TestHarness {
public
long
timeTasks(
int
nThreads,
final
Runnable task)
throws
InterruptedException {
final
CountDownLatch startGate =
new
CountDownLatch(
1
);
final
CountDownLatch endGate =
new
CountDownLatch(nThreads);
for
(
int
i =
0
; i < nThreads; i++) {
Thread t =
new
Thread() {
public
void
run() {
try
{
startGate.await();
try
{
task.run();
}
finally
{
endGate.countDown();
}
}
catch
(InterruptedException ignored) {
}
}
};
t.start();
}
long
start = System.nanoTime();
startGate.countDown();
endGate.await();
long
end = System.nanoTime();
return
end - start;
}
}
2.2、FutureTask
FutureTask表示可取消的异步计算。利用开始和取消计算的方法、查询计算是否完成的方法和获取计算结果的方法,此类提供了对 Future 的基本实现。仅在计算完成时才能获取结果;如果计算尚未完成,则阻塞 get 方法。一旦计算完成,就不能再重新开始或取消计算。FutureTask的方法摘要如下:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
boolean
cancel(
boolean
mayInterruptIfRunning)
试图取消对此任务的执行。
protected
void
done()
当此任务转换到状态 isDone(不管是正常地还是通过取消)时,调用受保护的方法。
V get()
throws
InterruptedException, ExecutionException
如有必要,等待计算完成,然后获取其结果。
V get(
long
timeout, TimeUnit unit)
throws
InterruptedException, ExecutionException, TimeoutException
如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
boolean
isCancelled()
如果在任务正常完成前将其取消,则返回
true
。
boolean
isDone()
如果任务已完成,则返回
true
。
void
run()
除非已将此 Future 取消,否则将其设置为其计算的结果。
protected
boolean
runAndReset()
执行计算而不设置其结果,然后将此 Future 重置为初始状态,如果计算遇到异常或已取消,则该操作失败。
protected
void
set(V v)
除非已经设置了此 Future 或已将其取消,否则将其结果设置为给定的值。
protected
void
setException(Throwable t)
除非已经设置了此 Future 或已将其取消,否则它将报告一个 ExecutionException,并将给定的 throwable 作为其原因。
FutureTask可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动,以下代码就是模拟一个高开销的计算,我们可以先调用start()方法开始计算,然后在需要结果时,再调用get得到结果:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public
class
Preloader {
ProductInfo loadProductInfo()
throws
DataLoadException {
return
null
;
}
private
final
FutureTask<ProductInfo> future =
new
FutureTask<ProductInfo>(
new
Callable<ProductInfo>() {
public
ProductInfo call()
throws
DataLoadException {
return
loadProductInfo();
}
});
private
final
Thread thread =
new
Thread(future);
public
void
start() {
thread.start();
}
public
ProductInfo get()
throws
DataLoadException, InterruptedException {
try
{
return
future.get();
}
catch
(ExecutionException e) {
Throwable cause = e.getCause();
if
(cause
instanceof
DataLoadException)
throw
(DataLoadException) cause;
else
throw
new
RuntimeException(e);
}
}
interface
ProductInfo {
}
}
class
DataLoadException
extends
Exception {
}
2.3、信号量
从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会阻塞每一个 acquire(),然后等待获取许可。每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者。但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目。例如,下面的类使用信号量控制对内容池的访问:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class
Pool {
private
static
final
int
MAX_AVAILABLE =
100
;
private
final
Semaphore available =
new
Semaphore(MAX_AVAILABLE,
true
);
public
Object getItem()
throws
InterruptedException {
available.acquire();
return
getNextAvailableItem();
}
public
void
putItem(Object x) {
if
(markAsUnused(x))
available.release();
}
// Not a particularly efficient data structure; just for demo
protected
Object[] items = ... whatever kinds of items being managed
protected
boolean
[] used =
new
boolean
[MAX_AVAILABLE];
protected
synchronized
Object getNextAvailableItem() {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(!used[i]) {
used[i] =
true
;
return
items[i];
}
}
return
null
;
// not reached
}
protected
synchronized
boolean
markAsUnused(Object item) {
for
(
int
i =
0
; i < MAX_AVAILABLE; ++i) {
if
(item == items[i]) {
if
(used[i]) {
used[i] =
false
;
return
true
;
}
else
return
false
;
}
}
return
false
;
}
}
获得一项前,每个线程必须从信号量获取许可,从而保证可以使用该项。该线程结束后,将项返回到池中并将许可返回到该信号量,从而允许其他线程获取该项。注意,调用 acquire() 时无法保持同步锁,因为这会阻止将项返回到池中。信号量封装所需的同步,以限制对池的访问,这同维持该池本身一致性所需的同步是分开的。
将信号量初始化为 1,使得它在使用时最多只有一个可用的许可,从而可用作一个相互排斥的锁。这通常也称为二进制信号量,因为它只能有两种状态:一个可用的许可,或零个可用的许可。按此方式使用时,二进制信号量具有某种属性(与很多 Lock 实现不同),即可以由线程释放“锁”,而不是由所有者(因为信号量没有所有权的概念)。在某些专门的上下文(如死锁恢复)中这会很有用。
Semaphore的构造方法可选地接受一个公平 参数。当设置为 false 时,此类不对线程获取许可的顺序做任何保证。特别地,闯入 是允许的,也就是说可以在已经等待的线程前为调用 acquire() 的线程分配一个许可,从逻辑上说,就是新线程将自己置于等待线程队列的头部。当公平设置为 true时,信号量保证对于任何调用获取方法的线程而言,都按照处理它们调用这些方法的顺序(即先进先出;FIFO)来选择线程、获得许可。注意,FIFO 排序必然应用到这些方法内的指定内部执行点。所以,可能某个线程先于另一个线程调用了acquire,但是却在该线程之后到达排序点,并且从方法返回时也类似。还要注意,非同步的tryAcquire 方法不使用公平设置,而是使用任意可用的许可。
通常,应该将用于控制资源访问的信号量初始化为公平的,以确保所有线程都可访问资源。为其他的种类的同步控制使用信号量时,非公平排序的吞吐量优势通常要比公平考虑更为重要。
Semaphore还提供便捷的方法来同时 acquire 和释放多个许可。小心,在未将公平设置为 true 时使用这些方法会增加不确定延期的风险。
内存一致性效果:线程中调用“释放”方法(比如 release())之前的操作 happen-before 另一线程中紧跟在成功的“获取”方法(比如 acquire())之后的操作。
2.4、栅栏
CyclicBarrier是一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier很有用。因为该 barrier在释放等待线程后可以重用,所以称它为循环的barrier。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
示例用法:下面是一个在并行分解设计中使用barrier的例子:
12
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class
Solver {
final
int
N;
final
float
[][] data;
final
CyclicBarrier barrier;
class
Worker
implements
Runnable {
int
myRow;
Worker(
int
row) {
myRow = row;
}
public
void
run() {
while
(!done()) {
processRow(myRow);
try
{
barrier.await();
}
catch
(InterruptedException ex) {
return
;
}
catch
(BrokenBarrierException ex) {
return
;
}
}
}
}
public
Solver(
float
[][] matrix) {
data = matrix;
N = matrix.length;
barrier =
new
CyclicBarrier(N,
new
Runnable() {
public
void
run() {
//mergeRows(...);
}
});
for
(
int
i =
0
; i < N; ++i)
new
Thread(
new
Worker(i)).start();
waitUntilDone();
}
}
在这个例子中,每个 worker 线程处理矩阵的一行,在处理完所有的行之前,该线程将一直在屏障处等待。处理完所有的行之后,将执行所提供的 Runnable 屏障操作,并合并这些行。如果合并者确定已经找到了一个解决方案,那么 done() 将返回 true,所有的 worker 线程都将终止。
如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作.
对于失败的同步尝试,CyclicBarrier 使用了一种要么全部要么全不 (all-or-none) 的破坏模式:如果因为中断、失败或者超时等原因,导致线程过早地离开了屏障点,那么在该屏障点等待的其他所有线程也将通过 BrokenBarrierException以反常的方式离开。
内存一致性效果:线程中调用 await() 之前的操作 happen-before 那些是屏障操作的一部份的操作,后者依次 happen-before 紧跟在从另一个线程中对应 await() 成功返回的操作。
JAVA多线程编中的轮询锁与定时锁
时间: 2024-10-24 08:46:26
JAVA多线程编中的轮询锁与定时锁的相关文章
java用while循环设计轮询线程的性能问题
java用while循环设计轮询线程的性能问题 轮询线程在开发过程中的应用是比较广泛的,在这我模拟一个场景,有一个队列和轮询线程,主线程往队列中入队消息,轮询线程循环从队列中读取消息并打印消息内容.有点类似Android中Handler发送消息. 首先定义一个Message类. public class Message { private String content; public Message(String content) { this.content=content; } public
java多线程编程中实现Runnable接口方法相对于继承Thread方法的优势
java多线程创建方法http://blog.csdn.net/cjc211322/article/details/24999163 java创建多线程方法之间的区别http://blog.csdn.net/cjc211322/article/details/25000449 java多线程编程中实现Runnable接口方法相对于继承Thread方法的优势
Java多线程编程中Future模式的详解<;转>;
Java多线程编程中,常用的多线程设计模式包括:Future模式.Master-Worker模式.Guarded Suspeionsion模式.不变模式和生产者-消费者模式等.这篇文章主要讲述Future模式,关于其他多线程设计模式的地址如下:关于其他多线程设计模式的地址如下:关于Master-Worker模式的详解: Java多线程编程中Master-Worker模式的详解关于Guarded Suspeionsion模式的详解: Java多线程编程中Guarded Suspeionsion模式
Java多线程编程中的lock使用源码详解
将做工程过程重要的代码段做个记录,如下的代码内容是关于Java多线程编程中的lock使用详解的代码,应该是对码农有帮助. import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.locks.Lock; import java.util.concurrent.l
java多线程编程之Future/FutureTask和Callable
有这样一种场景,用多线程发送数据到某个服务器,需要知道各个线程是否都发送成功,等所有线程都发送完成才能继续下一轮计算和发送.如果用传统的多线程方式,就需要启动多个线程,然后在每个线程中分别发送数据,外部通过某种方式等待各个线程全部都发送完成,再进行后面的计算等流程.这种实现方式的代码会比较臃肿,在java中提供了一种Callable+Future的方法,可以将异步的多线程调用变为同步方式. Callable 在java的多线程编程中,有Thread和Runnable两种方式来新建线程,其中Run
Java多线程并发中 CAS 的使用与理解
一.CAS (Compare And Swap): CAS(Compare And Swap),即比较并交换 CAS(V,E,N).是解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——要更新的变量(V).预期原值(E)和新值(N).核心算法是如果V 值等于E 值,则将V 的值设为N .若V 值和E 值不同,则说明已经有其他线程做了更新,则当前线程不做更新,直到V.E两个值相等,才更新V的值. 1.代码演示: /* * 原子变量类: * AtomicBoolean *
【转】Java多线程编程中易混淆的3个关键字( volatile、ThreadLocal、synchronized)总结
概述 最近在看<ThinKing In Java>,看到多线程章节时觉得有一些概念比较容易混淆有必要总结一下,虽然都不是新的东西,不过还是蛮重要,很基本的,在开发或阅读源码中经常会遇到,在这里就简单的做个总结. 1.volatile volatile主要是用来在多线程中同步变量. 在一般情况下,为了提升性能,每个线程在运行时都会将主内存中的变量保存一份在自己的内存中作为变量副本,但是这样就很容易出现多个线程中保存的副本变量不一致,或与主内存的中的变量值不一致的情况.而当一个变量被volatil
Java 多线程编程之:notify 和 wait 用法
wait 和 notify 简介 wait 和 notify 均为 Object 的方法: Object.wait() —— 暂停一个线程 Object.notify() —— 唤醒一个线程 从以上的定义中,我们可以了解到以下事实: 想要使用这两个方法,我们需要先有一个对象 Object. 在多个线程之间,我们可以通过调用同一个对象的wait()和notify()来实现不同的线程间的可见. 对象控制权(monitor) 在使用 wait 和 notify 之前,我们需要先了解对象的控制权(mon
Java多线程学习中遇到的一个有趣的问题
今天随便写了一个线程之间相互调度的程序,代码如下: class First extends Thread { public First() { start(); } synchronized public void run() { try { wait(); } catch(InterruptedException e) { e.printStackTrace(); } try { sleep(2000); } catch(InterruptedException e) { e.printSta