Java基础】并发 - 多线程
分类: Java2014-05-03 23:56 275人阅读 评论(0) 收藏 举报
介绍
Java多线程
多线程任务执行
大多数并发应用程序时围绕执行任务(task)进行管理的;所谓任务就是抽象的,离散的工作单元。
围绕执行任务来管理应用程序时,第一步是要指明一个清晰的任务边界。大多数应用服务器程序都选择了下面这个自然的任务辩解:单独的客户请求;
任务时逻辑上的单元;
任务
Runnable
表示一个任务单元(java.lang)
[java] view plaincopy
- public interface Runnable
Modifier and Type | Method and Description |
---|---|
void |
run()
When an object implementing interface |
Callable
表示一个任务单元(java.util.concurrent)
[java] view plaincopy
- public interface Callable<V>
Modifier and Type | Method and Description |
---|---|
V |
call()
Computes a result, or throws an exception if unable to do so. |
Runnable V.S. Callable
- Runnable的run方法无返回值也不会抛出异常,就算通过Future也无法获得结果;
- Runnable通常提交给一个Thread执行;Callable通常提交给ExecutorService执行;
Future
表示一个任务单元计算返回的结果(java.util.concurrent);
Future可以用来检查任务是否完成,同时也可以阻塞在get方法上等待任务结果。
[java] view plaincopy
- public interface Future<V>
Modifier and Type | Method and Description |
---|---|
boolean |
cancel(boolean mayInterruptIfRunning)
Attempts to cancel execution of this task. |
V |
get()
Waits if necessary for the computation to complete, and then retrieves its result. |
V |
get(long timeout, TimeUnit unit)
Waits if necessary for at most the given time for the computation to complete, and then retrieves its result, if available. |
boolean |
isCancelled()
Returns true if this task was cancelled before it completed normally. |
boolean |
isDone()
Returns true if this task completed. |
i.e.
[java] view plaincopy
- interface ArchiveSearcher { String search(String target); }
- class App {
- ExecutorService executor = ...
- ArchiveSearcher searcher = ...
- void showSearch(final String target) throws InterruptedException {
- Future<String> future = executor.submit(new Callable<String>() {
- public String call() {
- return searcher.search(target);}});
- displayOtherThings(); // do other things while searching
- try {
- displayText(future.get()); // use future, get方法会阻塞直到相应线程执行完毕为止
- } catch (ExecutionException ex) { cleanup(); return; }
- }
- }
FutureTask
(java.util.concurrent)
[java] view plaincopy
- public class FutureTask<V> extends Object implements RunnableFuture<V>
FutureTask可以包装Runnable或者Callable;其中,RunnableFuture实现了Runnable和Future接口,FutureTask可以看成是一个复合体。
protected void |
done()
Protected method invoked when this task transitions to state |
无限制创建线程的缺点
- 线程生命周期的开销;
- 资源消耗量;
- 稳定性(应该限制可创建线程的数目);
应用线程池
线程池是与工作队列紧密绑定的。所谓工作队列,其作用是持有所有等待执行的任务。
Executor框架可以将任务的提交与执行策略解耦。
当任务都是同类的、独立的时候,线程池才会有最佳的工作表现。
定制线程池的大小
线程池合理的长度取决于未来提交的任务类型和所部署系统的特征。很少会把线程池的长度硬编码;池的长度应该由某种配置机制来提供,或者利用Runtime.availableProcessors的结果,动态地进行计算。
需要做的仅仅是避免“过大”和“过小”这两种极端情况。
Executor框架
Executor接口(java.util.concurrent)
[java] view plaincopy
- public interface Executor
Modifier and Type | Method and Description |
---|---|
void |
execute(Runnable command)
Executes the given command at some time in the future.The command may execute in a new thread, in a pooled thread, or in the calling thread, at the discretion of the Executor implementation. |
可以用于异步任务执行,而且支持很多不同类型的任务执行策略。为任务提交和任务执行之间的解耦提供了标准的方法。
Executor基于生产者-消费者模式。如果在程序中实现一个生产者-消费者的设计,使用Executor通常是最简单的方式。
Executor可以为任务指定执行策略:
- 任务在什么线程中执行;
- 任务以什么顺序执行;
- 可以有多少个任务并发执行;
- 可以有多少个任务进入等待执行队列;
- 如果系统过载,需要放弃一个任务,应该挑选哪一个任务;另外,如何通知应用程序知道;
- 在一个任务执行前与结束后,应该做什么处理;
Executor框架让制定一个执行策略变得简单;不过想要使用Executor,还必须能够让任务描述为Runnable。
ExecutorService
ExecutorService(java.util.concurrent)接口扩展了Executor接口,添加了一些用于生命周期管理的方法(同时还有一些用于任务提交的便利方法):
[java] view plaincopy
- public interface ExecutorService extends Executor
ExecutorService指出Executor生命周期的三种状态:
- 运行(running):ExecutorService最初创建后的初始状态是运行状态;
- 关闭(shutting down):shutdown方法会启动一个平缓的关闭过程:停止接受新的任务,同时等待已经提交的任务完成——包括尚未开始执行的任务;
- 终止(terminated):shutdownNow方法会启动一个强制的关闭过程,尝试取消所有运行的任务和排在队列中尚未开始的任务;
ThreadPoolExecutor
(java.uti.concurrent)
[java] view plaincopy
- public class ThreadPoolExecutor extends AbstractExecutorService
大多数通过构造函数传递给ThreadPoolExecutor的参数,都可以在创建后通过setXXX进行修改。如果Executor是通过Executors的某个工厂方法创建的,那么可以先把其转换为ThreadPoolExecutor类型,然后访问setXXX方法。
线程的创建与销毁
- 核心池大小(core pool size): 核心池大小时目标的大小;线程池的实现试图维护池的大小:即使没有任务执行,池的大小也等于核心池的大小,并且知道工作队列充满之前,池都不会创建更多的线程。
- 最大池大小(maximum pool size):最大池的大小是可同时活动的线程数的上线;
- 存活时间(keep-alive time): 如果一个线程已经闲置的事件超过了存活时间,它将成为一个被回收的候选者,如果当前的池的大小超过了核心池的大小,线程池会终止它。
以上的因素共同管理着线程的创建和销毁。
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
以上的参数可以在ThreadPoolExecutor的构造函数中设置,而且创建之后可以通过setXXX进行修改。
ThreadFactory线程工厂
线程池需要创建一个线程,都要通过一个线程工厂(thread factory)来完成。ThreadFactory只有唯一的方法:newThread,它会在线程池需要创建一个新线程时调用。
(java.uti..concurrent)
[java] view plaincopy
- public interface ThreadFactory
Modifier and Type | Method and Description |
---|---|
Thread |
newThread(Runnable r)
Constructs a new |
有很多原因需要使用定制的线程工厂,比如实例化一个定制Thread类的示例等。
扩展ThreadPoolExecutor / “钩子”函数
方法:
protected void |
beforeExecute(Thread t, Runnable r)
Method invoked prior to executing the given Runnable in the given thread. |
protected void |
afterExecute(Runnable r, Throwable t)
Method invoked upon completion of execution of the given Runnable. |
在每个任务执行之前和之后被调用;可以用它们添加日志,统计信息收集等功能。无论任务是正常地从run方法返回,还是抛出一个一场,afterExecute都会被调用;如果任务完成后抛出一个Error,则afterExecute不会被调用;如果beforeExecute抛出一个RuntimeException,任务则将不被执行,afterExecute也不会被调用。
方法:
protected void |
terminated()
Method invoked when the Executor has terminated. |
terminated钩子会在线程池完成关闭动作后调用,也就是当所有任务都已完成并且所有工作者线程也已经关闭后,会执行terminated。terminated可以用来释放Executor在生命周期里分配的资源,记录日志或者完成统计信息等。
饱和策略
当一个有限队列充满后,饱和策略开始起作用。
ThreadPoolExecutor的饱和策略可以通过调用方法:
void |
setRejectedExecutionHandler(RejectedExecutionHandler handler)
Sets a new handler for unexecutable tasks. |
来修改。
JDK提供了4中不同的饱和策略:
Modifier and Type | Class and Description |
---|---|
static class |
ThreadPoolExecutor.AbortPolicy
A handler for rejected tasks that throws a |
static class |
ThreadPoolExecutor.CallerRunsPolicy
A handler for rejected tasks that runs the rejected task directly in the calling thread of the |
static class |
ThreadPoolExecutor.DiscardOldestPolicy
A handler for rejected tasks that discards the oldest unhandled request and then retries |
static class |
ThreadPoolExecutor.DiscardPolicy
A handler for rejected tasks that silently discards the rejected task. |
取消/关闭 - 任务/线程,中断线程
大多数时候,我们通常允许任务/线程在结束任务后自行停止。但是,有时候我们希望在任务或线程自然结束之间就停止它们,可能是因为用户取消了操作,或者应用程序需要快速关闭。
任务取消
设置一个“cancellation requested”取消标志,任务会定期查看;如果发现标志被设置过,任务就会提前结束;i.e.
[java] view plaincopy
- public class PrimeGenerator implements Runnable{
- private final list<BigInteger> primes = new ArrayList<BigInteger>();
- private volatile boolean cancelled;
- public void run(){
- BigInteger p = BigInteger.ONE;
- while (!cancelled){
- p = p.nextProbablePrime();
- synchronized (this){
- primes.add(p);
- }
- }
- }
- public void cancel() { cancelled = true; }
- public synchronized List<BigInteger> get(){
- return new ArrayList<BigInteger>(primes);
- }
- }
- List<BigInteger> aSecondOfPrimes() throws InterruptedException {
- PrimeGenerator generator = new PrimeGenerator();
- new Thread(generator).start();
- try{
- SECOND.sleep(1);
- }finally {
- generator.cancel();
- }
- return generator.get();
- }
中断
使用上面的取消任务方法,任务并不是立刻被取消的,需要花费一些时间。而且如果一个任务使用这个方案调用一个阻塞方法,可能遇到一个更严重的问题——任务可能永远都不检查取消标志,因此永远不会终结。
特定阻塞类的方法支持中断——线程中断是一个协作机制,一个线程给另一个线程发送信号,通知它在方便或者可能的情况下停止正在做的工作,去做其他事情。
每一个线程都有一个boolean类型的中断状态,在中断的时候,这个中断状态被设置为true。每个线程都应不时检查该标志,以判断线程是否应该被中断。
一个被中断的线程不会被终止;中断一个线程只是为了引起线程的注意(设置中断状态),被中断线程/任务应该自己决定如何应对中断。某些线程很重要,不应理会中断,而是在处理完抛出异常后继续执行。
- 阻塞状态中断——阻塞库函数,比如Thread.sleep和Object.wait,试图监测线程何时被中断,并提前返回。它们对中断的响应表现为:清除中断状态,抛出interruptedException;这表示阻塞操作应为中断的缘故提前结束。
- 非阻塞状态中断——当线程并不处于阻塞状态的请款下发生中断时,会设置线程中的中断状态,然后一直等到被取消的活动获取中断状态,来检查是否发生了中断。通过这样的方法使中断变粘,如果不触发InterruptedException,中断状态会一直保持,知道有人特意去清除中断状态。
Thread类的中断方法
void |
interrupt()
Interrupts this thread. |
static boolean |
interrupted()
Tests whether the current thread has been interrupted. |
boolean |
isInterrupted()
Tests whether this thread has been interrupted.该方法会清除中断状态。 |
调用interrupt方法并不意味着必然停止目标线程正在进行的工作;它仅仅传递了请求中断的消息。
我们对中断本身最好的理解应该是:它并不会真正中断一个正在运行的线程,它仅仅发出中断请求;线程自己会在下一个方便的时刻中断,这些时刻被成为取消点。
线程中断的run/call方法一般如下:
[java] view plaincopy
- public void run(){
- try{
- //...
- while(!Thread.currentThread().isInterrupted()//&& more work to do){
- //do more work
- }
- }catch(InterruptedExcetion e){
- //thread was interrputed during sleep or wait
- }finally{
- cleanup,if required
- }
- //exiting the run method terminates the thread
- }
通过Future取消
ExecutorService.submit方法会返回一个Future来描述任务,Future有一个cancel方法,需要一个boolean类型的参数,它的返回值表示取消尝试是否成功,这仅仅是告诉了你它是否能够接受中断,而不是任务是否检测并处理了中断。当参数为true,并且任务当前正运行于一些线程中,那么这个线程是应该中断的。把这个参数设置成false意味着如果还没启动的话,不要运行这个任务。
什么时候可以采用一个true作为参数调用cancel?任务执行线程是由标准的Executor实现创建的,它实现了一个中断策略,使得任务可以通过中断被取消,所以当它们在标准Executor中运行时,通过它们的Future来取消任务,这时设置true是安全的。当尝试取消一个任务的时候,不应该直接中断线程池,应为你不知道中断请求到达时,什么任务正在运行,只能通过任务的Future来做这件事情。
(在任务的call或run方法中,仍然可以使用Thread.currentThread().isInterrupted()判断是否处于中断状态,如任务使用FutureTask,则可以使用isCancelled;)
Future(true)最后调用的其实还是执行线程的interrupt方法设置线程的中断状态;设置完成后返回true。
中断策略
当调用可中断的阻塞函数时,比如Thread.sleep或者BlockingQueue.put,有两种处理InterruptedException的策略:
- 传递异常——使你的方法也成为可中断的阻塞方法;
- 保存中断状态,上层调用栈中的代码能够对其进行处理;
只有实现了线程中断策略的代码才可以接受中断请求,通用目的的任务和库的代码绝不应该接受中断请求。
线程安全-同步
定义
- 同步:避免多个线程在同一时间访问同意数据。
- 一个对象是否应该是线程安全的取决于它是否会被多个线程访问。
- 线程安全取决于程序中如何使用对象,而不是对象完成了什么。
- 保证对象的线程安全性需要使用同步机制来协调对其可变状态的访问。
- 无论何时,只要有对于一个的线程访问给定的状态变量,而且其中某个线程会写入该变量,此时必须使用同步机制来协调线程对该变量的访问。
- 当多个线程访问一个类时,如果不用考虑这些线程在运行时环境下的调度和交替执行,并且不需要额外的同步及在调用方代码不必做其他的协调,这个类的行为仍然是正确的,那么这个类是线程安全的。
- 线程安全的类封装了任何必要的同步,因此客户不需要自己提供。
- 无状态对象(不包含域也不引用其他类的域)永远是线程安全的。
- 检查再运行(如惰性初始化)和读写改操作必须是原子地执行。
- 状态:用来确定某个线程是否有访问权限;
锁
即使单一对象的操作是原子的,也不能保证多个原子对象之间的协作也是原子的。
显式锁: Lock
显式锁需要指定起始位置和终止位置。
一般使用ReentrantLock类作为显式锁,多个线程中必须要使用一个ReentrantLock类作为对象才能保证锁的生效。且在加锁和解锁处需要通过lock()和unlock()显式指出,一般会在finally块中写unlock()防止死锁。
[java] view plaincopy
- Lock lock = new ReentrantLock();
- ... ...
- lock.lock();
- try{
- //
- //
- }finally{
- lock.unlock();
- }
内部锁: synchronized块
每个Java对象都可以隐式地扮演一个用于同步的锁的角色;这些内置的锁被成为内部锁(intrinsic locks)或监视器锁(monitor locks)。
synchronized块:
- 一个synchronized块有两个部分:锁对象的引用,以及这个锁保护的代码块。
synchronized方法:
- synchronized可以直接作用于对象的方法,synchronized方法method是对跨越了整个方法体的synchronized块的描述。
- synchronized方法的锁,就是该方法所在的对象本身(静态的synchronized方法则从Class对象上获得锁)。
同步过程:
- 执行线程进入synchronized块之前会自动获得锁,而无论通过正常控制途径退出,还是从块中跑抛出一场,线程都在放弃对synchronized块的控制时自动放弃锁。获得内部锁的唯一途径是:进入这个内部锁保护的同步块或方法。
synchronized块 V.S. synchronized方法(包括static方法)
- 把synchronized当作函数的修饰符:
[java] view plaincopy
- public synchronized void method()
- {
- ... ...
- }
它锁定的是该方法所在的对象。也就是说,当一个对象P1在不同的线程中执行这个同步方法时,它们之间会形成互斥,达到同步的效果。但是,这个对象所属的class所产生的另一个对象P2却能够任意调用这个被加了synchronized关键字的方法。
上面的代码等同于如下的代码:
[java] view plaincopy
- public void method()
- {
- synchronized (this)
- {
- ... ...
- }
- }
可见synchronized方法的实质是将synchronized作用于object reference。那个拿到了P1对象锁的线程,才能够调用对象P1的同步方法;而对于对象P2而言,P1这个锁和它毫不相干。
2. synchronized块:
[java] view plaincopy
- class Foo implements Runnable
- {
- private byte[] lock = new byte[0];
- public void method()
- {
- synchronized (lock)
- {
- ... ...
- }
- }
- }
3. 将synchronized作用于static函数
[java] view plaincopy
- class Foo
- {
- public synchronized static void method1() // 同步的static函数
- {
- ... ...
- }
- public void method2()
- {
- synchronized (Foo.class) // class literal 类名称字面变量
- {
- ... ...
- }
- }
- }
上述代码中的效果是相同的,取得的锁是当前这个方法所属的类class level
4. synchronized(this) 与 synchronized(class)比较
- synchronized(class)很特别,它会让另一个线程在任何需要获取class作为monitor的地方等待。class和this作为不同的monitor可以同时使用,不存在一个线程获取了class,另一个线程就不能获取该类的一切实例的情况。
- synchronized(class) v.s. synchronized(this):线程鸽子获取monitor,不会有等待
- synchronized(this) v.s. synchronized(this):如果不同线程监视的是同一个实例对象,就会等待。如果是不同的实例,不会等待
- synchronized(class) v.s. synchronized(class):无论不同线程监视的是同一个实例或者是不同的实例,都会等待
内部锁是可重入的:
- 线程在试图获得它自己占有的锁时,请求会成功。
- 重进入的实现是通过为每个锁关联一个请求技术和一个占有它的线程。
- 内部锁可重入避免了例如递归调用时的死锁,或者一个类中的两个对象都使用了内部锁,在其中一个方法中调用另一个方法将导致死锁。
显式锁 V.S. 内部锁
synchronized是托管给JVM执行的,而Lock是Java写的控制锁的代码。在Java1.5中,synchronized是性功能低效的,因为这是一个重量级操作,需要调用操作接口,导致有可能加锁消耗的系统时间比加锁以外的操作还多。相比之下,使用Java提供的Lock对象,性能更高一些。但是到了Java1.6,发生了变化。synchronized在语意上很清晰,可以进行很多优化,有适应自旋、锁消除、锁粗化、轻量锁、偏向锁等等。导致在Java1.6上synchronized的性能并不比Lock差。官方也表示,它们也更支持synchronized,在未来的版本中还有优化余地。
(synchronized原始采用的是CPU悲观锁机制,即线程获得的是独占锁。独占锁意味着其他线程只能依靠阻塞来等待线程释放锁。而CPU转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起CPU频繁的上下文切换导致效率很低。
而Lock用的是乐观锁方式,每次不加锁二十假设没有冲突而去完成某项操作,如果冲突失败就重试,知道成功位置。乐观锁实现的机制就是CAS操作。)
使用锁的劣势
- 挂起和恢复线程会带来很大的开销;对于基于锁,并且其操作过度细分的类(比如同步容器类,大多数方法只包含很少的操作),当频繁地发生锁的竞争时,调度与真正用于工作的开销的比值会很可观。
- 当一个线程正在等待锁时,它不能做任何其他事情。如果持有锁的线程发生了永久行的阻塞,所有等待该锁的线程都不会前进了。
原子操作类
从Java1.5开始JDK的并发包里提供了一些类来支持原子操作,不如AtomicBoolean,AtomicInteger等。
java.util.concurrent.atomic包中原子变量类:
Class | Description |
---|---|
AtomicBoolean |
A boolean value that may be updated atomically.
|
AtomicInteger |
An int value that may be updated atomically.
|
AtomicIntegerArray |
An int array in which elements may be updated atomically.
|
AtomicIntegerFieldUpdater<T> |
A reflection-based utility that enables atomic updates to designated volatile int fields of designated classes.
|
AtomicLong |
A long value that may be updated atomically.
|
AtomicLongArray |
A long array in which elements may be updated atomically.
|
AtomicLongFieldUpdater<T> |
A reflection-based utility that enables atomic updates to designated volatile long fields of designated classes.
|
AtomicMarkableReference<V> |
An AtomicMarkableReference maintains an object reference along with a mark bit, that can be updated atomically.
|
AtomicReference<V> |
An object reference that may be updated atomically. |
AtomicReferenceArray<E> |
An array of object references in which elements may be updated atomically. |
AtomicReferenceFieldUpdater<T,V> |
A reflection-based utility that enables atomic updates to designated volatile reference fields of designated classes.
|
AtomicStampedReference<V> |
An AtomicStampedReference maintains an object reference along with an integer "stamp", that can be updated atomically.
|
原子变量比锁更精巧,更轻量,并且在多处理系统中,对实现高性能的并发代码非常关键。更新原子变量的快速(非竞争)路径,并不会比获取锁的快速路径差,并且通常会更快;而慢速路径绝对会比锁的慢速路径快,因为它不会引起线程的挂起和重新调度。在使用原子变量取代锁的算法中,线程更不易出现延迟,如果它们遇到竞争,也更容易恢复。
由硬件提供的原子操作指令实现。
原子操作类 V.S. 锁
锁与原子化随着竞争的不同,性能也发生了改变。在中低程度的竞争下,原子化提供更好的可伸缩性;在高强度的竞争下,锁能够更好地帮助我们避免竞争。
使用锁实现的随机数字生成器:
[java] view plaincopy
- public clas ReentrantLockPseudoRandom extends PseudoRandom{
- private final Lock lock = new ReentrantLock(false);
- private int seed;
- ReentrantLockPseudoRandom(int seed){
- this.seed = seed;
- }
- public int nextInt(int n){
- lock.lock();
- try{
- int s = seed;
- seed = calcilateNext(s);
- int remainder = s%n;
- return remainder > 0 ? remainder : remainder + n;
- }finally{
- lock.unlock();
- }
- }
- }
使用原子操作类实现的随机数字生成器(CAS):
[java] view plaincopy
- public class AtomicPseudoRandom extends PseudoRandom{
- private AtomicInteger seed;
- AtomicPseudoRandom(int seed){
- this.seed = new AtomicIntegers(seed);
- }
- public int nextInt(int n){
- while (true){
- int s = seed.get();
- int nextSeed = calculateNext(s);
- if (seed.compareAndSet(s, nextSeed)){
- int remainder = s%n;
- return remainder > 0 ? remainder : remainder + n;
- }
- }
- }
- }
非阻塞算法 - 自旋CAS
一个线程的失败或挂起不应该影响其他线程的失败或挂起,这样的算法成为非阻塞算法;如果算法的每一个步骤中都有线程能够继续执行,这样的算法成为锁自由算法。在线程间使用CAS进行协调,这样的算法如果能构建正确的话,它即是非阻塞的,又是锁自由的。
好的非阻塞算法已经在多重常见的数据结构上现身,包括栈、队列、优先级队列、哈希表等。
- 非阻塞算法通过使用低层级并发原语比如:比较并交换CAS,取代了锁。原子变量类向用户提供了这些低层级原语(比如AtomicInteger.compareAndSet)。
- 自旋CAS实现的基本思路就是循环进行CAS操作直到成功为止。
非阻塞算法在设计和实现中很困难,但是在典型条件下能够提供更好的可伸缩性,并能更好地预防活跃度失败。从JVM的一个版本到下一个版本间并发性能的提升程度很大程度上源于非阻塞算法的使用,包括在JVM内部以及平台类库。
i.e.
[java] view plaincopy
- public class Counter{
- private AtomicInteger atomicI = new AtomicInteger(0);
- private int i = 0;
- public static void main(String[] args){
- final Counter cas = new Counter();
- List<Thread> ts = new ArrayList<Thread>(100);
- for (int j = 0; j < 100; j++){
- Thread t = new Thread(new Runnable(){
- public void run(){
- for (int i = 0; i < 1000; i++){
- cas.count();
- cas.safeCount();
- }
- }
- });
- ts.add(t);
- }
- for (Thread t : ts){
- t.start();
- }
- for (Thread t : ts){
- t.join();
- }
- }
- /* 使用CAS实现线程安全计数器 */
- private void safeCount(){
- while(true){
- int i = atomicI.get();
- boolean suc = atomicI.compareAndSet(i, ++i);
- if (suc){
- break;
- }
- }
- }
- /* 非线程安全计数器 */
- private void count(){
- i++;
- }
- }
CAS存在的问题
- ABA问题
因为CAS需要在操作值的时候检查下值有没有发生变化,如果没有发生变化则更新。但是如果一个值原来是A,变成了B,又变成了A,那么使用CAS进行检查时会发现它的值没有发生变化,但是实际上却变化了。ABA问题的解决思路就是使用版本号。在变量前面追加版本号,每次变量更新的时候把版本号加一,那么A-B-A就会变成1A-2B-3A.
从Java1.5开始JDB的atomic包里提供了一个类AtomicStampedReference来解决ABA问题。这个类的compareAndSet方法作用是先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
- 循环时间开销大
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销。
- 只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是多多个共享变量操作时,循环CAS就无法保证操作的原子行,这个时候就可以使用锁,或者有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如有两个共享变量i = 2, j = a;合并成ij = 2a,然后用CAS来操作ij。从Java1.5开始JDK提供了AtomicReference类来保证引用对象之间的原子性,可以把多个变量放在一个对象里来进行CAS操作。
使用支持CAS(Compare and Swap)的数据结构
在Java并发包中有一些并发框架也使用了自旋CAS的方式来实现原子操作。
同步容器
使用(java.util.Collections)Collections.synchronizedXxx工厂方法创建。
这些类对每一个公共方法进行同步从而实现了线程安全,这样一次只有一个线程能访问容器的状态。
同步容器类在每个操作的执行期间都持有一个锁。
同步容器使用中可能出现的问题:
- 同步容器都是线程安全的。但是对于符合操作,可能需要使用额外的客户端加锁进行保护。只要我们知道应该使用哪个锁,就有可能对其他的容器操作创建新的原子操作。同步容器类通过对它的对象自身进行加锁(使用this作为锁).
i,e.:
[java] view plaincopy
- public static Object getLast(Vector list){
- synchronized(list){
- int lastIndex = list,size() - 1;
- return list.get(lastIndex);
- }
- }
- public static void deleteLast(Vector list){
- synchronized(list){
- int lastIndex = list.size() - 1;
- list.remove(lastIndex);
- }
- }
并发容器
用并发容器替换同步容器,这种方法以有很小风险带来了可扩展性显著的提高。
同步容器通过对容器的所有状态进行串行访问,从而实现了它们的线程安全。这样做的代价是削弱了并发性,当多个线程共同竞争容器级的锁时,吞吐量就会降低。
ConcurrentHaspMap
ConcurrentHaspMap使用一个更加细化的锁机制,叫做分离锁。这个机制允许更深层次的共享访问。任意数量读线程可以并发访问Map,读者和写者也可以并发访问Map,并且有限数量的写线程还可以并发修改Map,结果是,为并发访问带来更高的吞吐量,同时几乎没有损失单个线程访问的性能。
相比于Hashtable和synchronizedMao,ConcurrentHashMap有众多的优势,而且几乎不存在什么劣势,因此在大多数情况下用ConcurrentHashMap取代同步Map实现只会带来更好的可伸缩性。只有当你的程序需要在独占访问中加锁时,ConcurrentHashMap才会无法胜任。
一些常见的复合操作:比如“缺少即加入”,“相等便移除”和“相等便替换”都已被实现为原子操作,并且这些操作已经在ConcurrentMap接口中声明:
Modifier and Type | Method and Description |
---|---|
V |
putIfAbsent(K key, V value)
If the specified key is not already associated with a value, associate it with the given value. |
boolean |
remove(Object key, Object value)
Removes the entry for a key only if currently mapped to a given value. |
V |
replace(K key, V value)
Replaces the entry for a key only if currently mapped to some value. |
boolean |
replace(K key, V oldValue, V newValue)
Replaces the entry for a key only if currently mapped to a given value. |
写入时复制容器:CopyOnWriteArrayList,CopyOnWriteArraySet
通常情况下它提供了更好的并发性,并避免了在迭代期间对容器加锁和复制。
“写入时复制”容器的线程安全性来源于,只要有效的不可变对象被正确发布,那么访问它将不在需要更多的同步。在每次需要修改时,他们会创建并重新发布一个新的容器拷贝,以此来实现可变形。“写入时复制”容器的迭代器保留一个底层基础数组的引用。这个数组作为迭代器的七点,永远不会被修改,因此对它的同步不过是为了确保数组内容的可见性。因此,多个线程可以对这个容器进行迭代,并且不会受到另一个或多个想要修改容器的线程带来的干涉。
在每次容器改变时复制基础数组需要一定的开销。特别是当容器较大的时候;当对容器迭代操作的频率远远高于对容器修改的频率时,使用“写入时复制”容器是个合理的选择。这个准则描述了许多事件通知系统:递交一个通知需要迭代已注册的监听器,并调用之中的一个,在多数情况下,注册和住校一个事件监听器的次数要比收到事件通知的次数少得多。
内存可见性
锁和可见性
锁不仅是关于同步和互斥的,也是关于内存可见的。为了保证所有线程都能够看到共享的,可变变量的最新值,读取和写入线程必须使用公共的锁进行同步。
volatile变量
当一个域声明为volatile类型后,编译器与运行时会监视这个变量:他是共享的,而且对它的操作不会与其他的内存操作一起被重排序。volatile变量不会缓存在寄存器或者缓存在对其他处理器隐藏的地方。所以,读一个volatile类型的变量时,总会返回由某一个线程所写入的最新值。
(不要过度依赖volatile变量所提供的可见性)
锁 V.S. volatile变量
加锁可以保证可见性与原子性;volatile变量只能保证可见性;
线程封闭
访问共享的,可变的数据要求使用同步。一个可以避免同步的方式就是不共享数据。如果数据仅在单线程中被访问,就不需要任何同步。
线程封闭(Thread confinement)技术是实现线程安全的最简单的方式之一。当对象封闭在一个线程中时,这种做法会自动成为线程安全的,即使被封闭的对象本身并不是。
栈限制
仅仅使用局部变量,这样变量的作用于就只作用于当前线程的堆栈中,不会逸出。
[java] view plaincopy
- public int func(parameters)
- {
- // only use local variables
- }
ThreadLocal
线程局部变量。它的功用非常简单,就是为每一个使用该变量的线程都提供一个变量值的副本,每一个线程都可以独立地改变自己的副本,而不会和其他线程的副本冲突。从线程的角度看,就好像是每一个线程都完全拥有该变量。
synchronized采取的是“以时间换空间”的策略;而ThreadLocal采取的是“以空间换时间”的思路。
ThreadLocal通常用于防止在基于可变的Singleton或全局变量的设计中,出现共享:
比如应用程序可能会维护一个全局的数据库连接,这个Connection在启动时就已经被初始化了。利用ThreadLocal存储JDBC连接,每个线程都会拥有自己的Connection:
[java] view plaincopy
- private static ThreadLocal<Connection> connectionHolder = new ThreadLocal<Connection>{
- public Connection initialValue(){
- return DriverManager.getConnection(DB_URL);
- }
- };
- public static Connection getConnection(){
- return connectionHolder.get();
- }
不可变对象
不可变对象永远是线程安全的。
不可变性并不简单地等于将对象中的所有域都声明为final类型,只有满足如下状态,一个对象才是不可变的:
- 它的状态不能在创建后再被修改;
- 所有域都是final类型,并且,
- 它被正确创建
组合多个对象时的同步策略
将线程安全性委托给现有的线程安全类
... ....
... ...
编写同步策略的文档
为类的用户便携类线程安全性担保的文档;为类的维护者编写类的同步策略文档。
阻塞
线程可能会因为集中原因被阻塞活暂停:等待I/O操作结束,等待获得一个锁,等待从Thread.sleep中唤醒,或者是等待另一个线程的计算结果。当一个线程阻塞时,它通常被挂起,并被设置成线程阻塞的某个状态(BLOCKED,WAITING或是TIMED_WAITING)。一个阻塞的操作和一个普通的操作之间的差别仅仅在于,被阻塞的线程必须等待一个事件的发生才能继续进行,并且这个事件时超越它自己控制的,因而需要花费更长的事件———等待I/O操作完成,锁可用,或者是外部计算结束。当外部事件发生后,线程被置回RUNNABLE状态,重新获得调度的机会。
当一个方法能够抛出InterruptedException的时候,是在告诉你这个方法是一个可阻塞的方法,进一步看,如果它被中断,将可以提前结束阻塞状态。
中断
中断是一种协作机制。
Synchronizer - 并发流程控制
Synchronizer是一个对象,它根据本身的状态调节线程的控制流。阻塞队列可以扮演一个Synchronizer的角色;其他类型的Synchronizer包括信号量,关卡以及闭锁。
在平台类库中存在一些Synchronizer类:如果这些不能满足要求,可以创建自己的Synchronizer。
所有的Synchronizer都拥有类似的结构特性:它们封装状态,而这些状态决定着线程执行到某一点时是通过还是被迫等待;它们还提供操控状态的方法,以及高效地等待Synchronizer进入到期望状态的方法。
闭锁(latch)
闭锁latch是一种Synchronizer,它可以延迟线程的进度直到线程到达终止状态(terminal)。一个闭锁工作起来就像一道大门:直到闭锁到达终点状态之前,门一直是关闭的,没有线程能够通过,在终点状态到来的时候,门开了,允许所有线程都通过。一旦闭锁到达了终点状态,它就不能够再改变状态了,所以它会永远保持敞开状态。闭锁可以用来确保特定活动知道其他的活动完成后才发生。
闭锁是一次性使用的对象:一旦进入到最终状态,就不能被重置了。
i.e. : java.util.concurrent.CountDownLatch
[java] view plaincopy
- public class TestHarness {
- public long timeTasks(int nThreads, final Runnable task) throws InterruptedException {
- final CountDownLatch startGate =new CountDownLatch(l);
- final CountDownLatch endGate = new CountDownLatch(nThreads);
- for (int i = O; i < nThreads; i++) {
- Thread t =new Thread() {
- public void run() {
- try {
- startGate.await();
- try {
- task.run();
- } finally {
- endGate.countDown();
- }catch (InterruptedException ignored) {
- }
- );
- t.start();
- }
- long start= System.nanoTime();
- startGate.countDown();
- endGate.await();
- long end= System.nanoTime();
- return end-start;
- }
- }
FutureTask
java.util.concurrent.FutureTask<V> implements Runnable, Future<V>, RunnableFuture<V>
V: FutureTask的get方法返回的值的类型。
get()的行为依赖于任务的状态。如果它已经完成,get可以立刻得到返回的结果,否则会被阻塞知道任务转入完成状态,然后返回结果或者抛出异常。FutureTask把计算的结果从运行计算的线程传送到需要这个结果的线程:FutureTask的规约保证了这种传递建立在结果的安全发布基础上。
i.e. :使用FutureTask预载稍后需要的数据:
[java] view plaincopy
- public class PreLoader{
- private final FutureTask<ProductInfo> future =
- new FutureTask<ProductInfo>(
- new Callable<ProductInfo>(){
- public ProductInfo call() throws DataLoadException{
- return loadProductInfo();
- }
- }
- );
- private final Thread thread = new Thread(future);
- public void start(){
- thread.start();
- }
- public ProductInfo get() throws DataLoadException, InterruptedException {
- try{
- return future.get();
- }catch (ExecutionException e){
- Throwable casue = e.getCause();
- if (cause instanceof DataLoadException){
- throw (DataLoadException)cause;
- }else{
- throw launderThrowable(cause);
- }
- }
- }
- }
信号量 Semaphore
java.util.concurrent.Semaphore
计数的信号量;信号量维护一系列的permit(概念上的permit),acquire()方法会一直阻塞直到有可用的permit。release()方法增加一个permit。
信号量通常用于限制线程的数量。
i.e.:使用信号量控制池中可用资源:
[java] view plaincopy
- class Pool {
- private static final int MAX_AVAILABLE = 100;
- private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
- public Object getItem() throws InterruptedException {
- available.acquire();
- return getNextAvailableItem();
- }
- public void putItem(Object x) {
- if (markAsUnused(x))
- available.release();
- }
- // Not a particularly efficient data structure; just for demo
- protected Object[] items = ... whatever kinds of items being managed
- protected boolean[] used = new boolean[MAX_AVAILABLE];
- protected synchronized Object getNextAvailableItem() {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (!used[i]) {
- used[i] = true;
- return items[i];
- }
- }
- return null; // not reached
- }
- protected synchronized boolean markAsUnused(Object item) {
- for (int i = 0; i < MAX_AVAILABLE; ++i) {
- if (item == items[i]) {
- if (used[i]) {
- used[i] = false;
- return true;
- } else
- return false;
- }
- }
- return false;
- }
- }
关卡Barrier
关卡类似于闭锁,它们都能够阻塞一组线程,知道某些事情发生。其中关卡与闭锁关键的不同在于,所有线程必须同事到达关卡点,才能继续处理。闭锁等待的是事件;关卡等待的是其他线程。
java.util.concurrent.CyclicBarrier
i.e.:
[java] view plaincopy
- class Solver {
- final int N;
- final float[][] data;
- final CyclicBarrier barrier;
- class Worker implements Runnable {
- int myRow;
- Worker(int row) { myRow = row; }
- public void run() {
- while (!done()) {
- processRow(myRow);
- try {
- barrier.await(); // waits until all parties have invoked await on this barrier
- } catch (InterruptedException ex) {
- return;
- } catch (BrokenBarrierException ex) {
- return;
- }
- }
- }
- }
- public Solver(float[][] matrix) {
- data = matrix;
- N = matrix.length;
- <strong>barrier = new CyclicBarrier(N,
- new Runnable() {
- public void run() {
- mergeRows(...);
- }
- });</strong>
- for (int i = 0; i < N; ++i)
- new Thread(new Worker(i)).start();
- waitUntilDone();
- }
- }
关卡 Exchange
java.util.concurrent.Exchanger<V>
构建自定义同步工具
创建状态依赖类最简单的方法通常是将它构建于已有的状态依赖库类(如FutureTask,Semaphore,BlockingQueue等)之上;但是如果类库没有提供需要的功能,也可以使用语言和类库提供的底层机制,包括内部条件队列,显式的Condition对象和AbstractQueueSynchronizer框架,构建属于自己的Synchronizer。
使用(内部)条件队列 - wait, notify, notifyAll
条件队列可以让一组线程——称作等待集——以某种方式等待相关条件变成真,它也由此得名。不同于传统的队列(它们的元素是数据项),条件队列的元素是等待相关条件的线程。
就像每个Java对象都能当作锁一样,每个对象也能当作条件队列。Object中的wait,notify,notifyAll方法构成了内部条件队列的API。
一个对象的内部锁与它的内部条件队列是先关的:为了能够调用对象X中的任何一个条件队列方法,必须持有对象X的锁。
Object.wait会自动释放锁,并请求操作系统挂起当前线程,让其他线程获得该锁进而修改对象的状态。当它被唤醒时,它会在返回前重新获得锁。直观上看,调用wait意味着“我要去休息了,但是发生了需要关注的事情后叫醒我”,调用通知(notity)方法意味着“需要关注的事情发生了”。
显式的Condition对象
正如Lock是广义的内部锁,Condition也是广义的内部条件队列。
[java] view plaincopy
- public interface Condition (java.util.concurrent.locks)
一个Condition和一个单独的Lock相关联,就像条件队列和单独的内部锁相关联一样:调用Condition相关联的Lock的Lock.newCondition方法,可以创建一个Condition。Condition提供了比内部条件队列要丰富的多的特征集,每个锁可以有多个等待集。
不同于内部条件队列,可以让每个Lock都有任意数量的Condition对象。
wait,notify,notifyAll在Condition对象中的对等体是wait,signal和signalAll。
示例:
[java] view plaincopy
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock();
- try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public Object take() throws InterruptedException {
- lock.lock();
- try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }