并发是个系统的知识体系,有理论上的,有语言上的,有概念上的,这份总结力求简单,看的懂,而不是用一大堆源码和概念去描述
java的内存模型
java的内存模型被称为JMM,从上图可以看出,java内存模型主要是针对多线程。
为什么要先说java的内存模型,事实上,涉及到线程之间通信的两种模型;
第一种是消息传递,这种通信方式对程序员是不透明的,即程序员必须显示的用一个线程发消息,用另一个线程接收消息(例如RabbitMQ)。这种方式在线程上显示的规定了前后关系,发消息必须在接收消息之前。
第二种是共享内存,这种方式对程序员透明,通信过程是通过隐式进行。具体来说是,前一个线程将要传递的值存放在共享内存里面,而后一个线程会去取。这种方式如果是单线程模式,则不会出现问题,虽然JVM也会进行重排序优化,但不会影响运行结果,但对于多线程来说,即使不用重排序技术,线程之间通过共享内存通信也会出现很严重的同步问题。
其原因在于:
1.多线程之间并发进行,抢占CPU的资源,而不会同步进行(并发与同步相生相克,如果同步了,何必要并发),本来要发消息的线程在接收消息的线程之后执行,导致接收到的消息不准确。
2.java虚拟机会对程序进行重排序,导致线程运行的程序语句没有按照顺序执行,在多线程中,出现了很大的错误。
3.发消息的线程只把要发的数据存在本地内存中(逻辑概念,物理上不存在),而当收消息的线程执行时,还没有来得及将其刷新到共享内存中去(主内存)。
以上三个原因导致多线程使用共享内存模式出现了一系列的问题。
解决方法:同步。(使发消息的线程永远执行在收消息的线程之前)
同步问题需要解决很多问题,第一个就是可见性,一个线程对主内存数据如何对另一个线程可见,其中涉及到的就是顺序问题,JMM中提供了happens-before关系。向程序员保证,只要你编写的两个线程满足happens-before关系,那么可见性问题就会解决,即一个线程对主内存的修改对另一线程就是可见的。
具体的关系遵守的规则如下:
1.程序顺序规则:一个线程中的每个操作,happens-before 于该线程中的任意后续操作。
解读:程序顺序规则是作用在一个线程之内,一个线程之内运行的多行程序,需要按照顺序来执行,A操作对内存的修改要想被B操作获取,必须A操作在B操作之前,即使重排序了,对于执行结果而言,A操作的结果也在B操作的结果之前。
2.监视器锁规则:对一个监视器的解锁,happens-before 于随后对这个监视器的加锁。
解读:这个很好理解,监视器就是一个管程,也就是一个锁,这个不管单线程和多线程,一个线程对一段代码加锁之后,能看到前一个线程解锁之前的操作结果。也就是解锁的过程就是将本地内存的值刷新到主内存的过程。
3.volatile变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读。
解读:volatile规则就是要一个线程先去斜volatile,然后一个线程去读这个volatile变量,那么才能保证是可见的。
接下来看一道面试题
public class ThreadSafeCache { int result; public int getResult() { return result; } public synchronized void setResult(int result) { this.result = result; } public static void main(String[] args) { ThreadSafeCache threadSafeCache = new ThreadSafeCache(); for (int i = 0; i < 8; i++) { new Thread(() -> { int x = 0; while (threadSafeCache.getResult() < 100) { x++; } System.out.println(x); }).start(); } try { //主线程等待1s,等带其他线程执行 Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } threadSafeCache.setResult(200); } }
问,当get方法不加锁,而set方法加锁,那么一个线程的set方法是否会对另一个线程的get方法可见呢,假设:线程执行的顺序没有问题。
事实上,如果是一个是只有两个线程,如果一个线程先set了,然后另一个线程get,除了这两个操作没有任何其他操作,那么这时候是可见的。这是因为,当执行set方法的时候,因为加了锁,会让set执行线程将本地内存中已经set的值刷新到主内存中去,而get方法在后面执行,因此一定会从主内存中取到这个已经修改的值,存到本地内存里面去。
但上面的程序为什么陷入了死循环呢,这是因为程序一执行,先让八个线程都执行了一遍get方法,而get方法会得到现在主线程中的值,存入本地内存中,因为get方法没有加锁,因此,在后面再执行get方法,它会一直在本地内存中去取值,因此,取到的都是原来的值,现在即使你利用主线程将set方法执行了,也不会得到set执行后主内存内的新值了。
修改,第一种方式,给get方法也加锁,根据happens-before(监视器锁规则)规则,当主线程将set方法执行后,子线程的get方法也能从主内存中获取到值(刷新),这样就可以退出死循环。
第二种,不加锁,给全局变量result加一个volatile(volatile规则),这样就会在执行时自动刷新,保证了可见性。
第三种,将全局变量定义为final,那么这样无论哪个线程都无法改变全局变量的值,一定保证可见,(但这有什么意义,根本和线程通信无关)
以上就是可见性问题的解决,但需要指出的是volatile只会保证全局数据的可见性,但同步关系,不能保证,而且在非原子操作中也会发生线程之间的安全问题。
上面的可见性,我们解决了共享内存的通信问题,但从通信中扩展,我要发送的一只一条数据,要好多条,那么问题来了,我怎么能确保我发送的数据,和你接收到的数据顺序一致呢。
如果只满足了上面的可见性,那么只能保证我发送一条数据,你肯定能接收到;但我如果发送多个数据,那么你接收的顺序和我发送的顺序就会不一致了。
比如上面的图,A操作为发送后的数据,和B接收的数据不一致。根本原因还是并发导致的,因此,要想得到相同的数据,需要将A要发送的操作和B接收的操作原子化
何谓原子化:
就是等A线程的四条操作都执行完以后,再进行发送,然后B的接收也是类似,当都接收完了,在进行汇总。
那么哪些操作是原子操作呢(即不需要原子化的操作):事实上,观察一个操作是不是原子操作,不能看其语言本身,而是要看他在内存中执行了几个指令,如果只执行一个指令,那么就是原子的,否则就不是;比如i++,其实在内存中执行了三个指令,不是原子操作。
所有不是原子操作的的操作都有线程安全问题。
1.volatile能保证原子操作吗?
显然不能,volatile只能保证可见性,不能保证原子性。(见上面分析)
针对i++的操作,计算机会执行三条指令来运行:
计算机将内存中的变量值读入cpu的寄存器--------(人看到-笔记本上的i 的值 ,把这个值写入 大脑的临时记忆区)
cpu 对其 + 1运算并运算后的值放入寄存器-------- (大脑对 这个 值进行加一运算,把这个值写入大脑的临时记忆区)
把这个值写入内存--------(把这个值写到笔记本上)
使用volatile指令,只能保证上面每条指令的对后面线程的可见性,但其实+1这条指令很可能执行的顺序有问题,导致没有执行+1操作。
2.加锁
对要做的操作加锁是可以保证操作的原子性的。为什么呢?
锁其实就是管程,保持线程同步。在执行+1操作之前,当前执行线程会获取到锁,获取到以后,其他线程暂时就不能执行这段加锁代码了,因此保证了+1操作的原子性。当线程执行完+1操作以后,会重新释放锁,让其他线程执行。
3.使用java自带的API
使用AtomicInteger中的方法可以保证原子性。(原理待解决)
从以上分析可以看出,解决原子性最好的方法还是加锁。
接下来,再聊聊线程间的通信,之前说过线程之间通信的两种方式,但其实,细分下来还包括很多种:
1.synchronized和volatile关键字,这两种上面已经分析清楚了,主要就是操作主内存。
2.等待/通知机制,这里主要用到wait和notify方法。
为什么这两个方法可以实现线程通信呢?
每一个对象都有一个与之对应的监视器 每一个监视器里面都有一个该对象的锁和一个等待队列和一个同步队列
wait()而导致阻塞的线程是放在阻塞队列中的,因竞争失败导致的阻塞是放在同步队列中的
wait语义:一是释放当前对象锁,另一个是进入阻塞队列
notify()语义:把阻塞队列中的线程放到同步队列中去
通过上面分析就很清楚了,wait可以使一个线程进入阻塞状态,并只能通过notify()来唤醒,重新拥有获得锁的能力。
那么通过这两个Object类中的方法,就能通过一个线程将另一个线程唤醒,进行了通信。
这种等待/通知机制能很好的解决生产者-消费者问题。(待解决)
3.Thread.join()方法
通过阅读源码,可以知道,如果一个线程 A 执行了 threadA.join()
,那么只有当线程 A 执行完之后,threadA.join()
之后的语句才会继续执行,类似于创建 A 的线程要等待 A 执行完后才继续执行;
具体是这样的过程,A线程在B线程中执行(一般场景为B是主线程,A是创建线程),当执行到threadA.join()之后,join方法会调用wait方法,将B线程阻塞掉。然后去执行线程A的run方法,当执行完A线程,系统会调用exit方法(不需要显示调用),执行notifyAll将B线程唤醒。,B线程接着执行。
join(long)方法的使用原则是,如果在long的时间之内,执行A的线程,等时间过了,执行B的线程。
join(long)和sleep(long)的区别:sleep的时间是固定的,当没有过完这段时间,线程不会苏醒,但join(long)不同,如果A在long之前执行完,那么B会被唤醒执行。因为exit方法会直接调用notifyall方法。
4.管道流:消息机制(待解决)
PipedInputStream
& PipedOutputStream
5.ThreadLocal类
事实上,ThreadLocal并不能用来线程通信,为什么?ThreadLocal是每一个线程拥有的一份变量,与其他线程是隔离的,怎么用来通信。
参考:《java并发编程的艺术》
对于线程而言,每一线程都在执行任务,那么如何安全的取消一个任务呢?
1.手动设置一个中断标志
分析,任务运行在一个while循环当中,while的条件就是手动设置的标志位,将该标志位设置为volatile,那么当另一个线程显示的将该标志位修改的时候,那么当前线程就可以退出while循环,将任务取消了。
这么做是最简单的取消方式,但它存在一个缺陷,如果while循环里面出现了故障,导致在while内部出现了阻塞,那么while循环的标志位永远也没有机会去验证是否被修改了,因此也无法安全的取消了(其实如果做个判断还是可以退出循环的,但这样做显然不是个优雅的方式)
public class PrimeGenerator implements Runnable { private static ExecutorService exec = Executors.newCachedThreadPool(); private final List<BigInteger> primes = new ArrayList<BigInteger>(); private volatile boolean cancelled; public void run() { BigInteger p = BigInteger.ONE; while (!cancelled) { p = p.nextProbablePrime(); synchronized (this) { primes.add(p); } } } public void cancel() { cancelled = true; } public synchronized List<BigInteger> get() { return new ArrayList<BigInteger>(primes); } static List<BigInteger> aSecondOfPrimes() throws InterruptedException { PrimeGenerator generator = new PrimeGenerator(); exec.execute(generator); try { SECONDS.sleep(1); } finally { generator.cancel(); } return generator.get(); } }
观察上面代码,是一个运行一秒钟的素数生成器,使用的是使用一个标志位来实现生成素数任务的取消。
2.使用线程的中断
中断表示的是线程中的一个标志位,为true,则表示一个运行中的线程是否被其他线程进行了中断操作,(其他线程通过调用运行中线程的interrupt()方法)。
中断有三个方法:isInterrupted()方法可以获取当前线程的中断状态;interrupted()方法会将中断标志位复位;以及调用中断的interrupt()方法
事实上,很多api会判断线程是否被中断,比如阻塞库里面的Thread.sleep和Object.wait,Object.join等,他们响应中断的方法就是,清除中断状态,然后抛出中断异常。
使用中断来取消任务
正如任务中应该包含取消策略一样,线程同样应该包含中断策略。中断策略规定线程如何解释某个中断请求——当发现中断请求时,应该做哪些工作(如果需要的话),哪些工作单元对于中断来说是原子操作,以及以多快的速度采响应中断。
最合理的中断策略是某种形式的线程级(Thread-Level)取消操作或服务级(Service-Level)取消操作:尽快退出,在必要时进行清理,通知某个所有者该线程已经退出。此外还可以建立其他的中断策略,例如暂停服务或重新开始服务,但对于那些包含非标准中断策略的线程或线程池,只能用于能知道这些策略的任务中。
3.使用Future
给一个 Runnable r 和时间 long timeout,解决“最多花 timeout 分钟运行 Runnable,没运行完就取消”这种要求。
private static final ExecutorService cancelExec = Executors.newCachedThreadPool(); public static void timedRun(Runnable r, long timeout, TimeUnit unit) { Future<?> task = cancelExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // 如果超时,抛出超时异常 } catch (ExecutionException e) { // 如果任务运行出现了异常,抛出任务的异常 throw launderThrowable(e.getCause()); } finally { // 如果任务已经结束,这句没影响 // 如果任务还在运行,这句会中断任务 task.cancel(true); } }
(后面会详细解释)
安全取消一个任务是属于业务逻辑上面的东西,与并发并没有直接的关系。
线程安全问题
线程安全的实质就是解决好同步问题。
synchronized:阻塞同步
使用方法:1.修饰代码块,锁对象是指定的。2.修饰普通方法,锁对象是调用该方法的对象。3.修饰静态方法,锁对象是该方法所在的Class对象。
synchronized是一个互斥锁,在同一时刻,只允许一个线程拿到锁对象。是个可重入锁,拿到锁的对象可以再次拿到锁。
synchronized的原理
每个对象都可以扮演一个用于同步的锁的角色,这些内置的锁被称为内部锁,或监视器锁。
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
3、如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权。
执行monitorexit的线程必须是object ref所对应的monitor的所有者。
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
上面这三个条件已经把synchronized的原理说清楚了,一段代码,或者一个变量可能有很多线程来执行,那么为了同步,需要拿这段代码的唯一对象作为锁,线程们只有拿到这个锁,才能执行这段代码,没有拿到就不可以执行,当执行完以后把锁释放了,其他的线程也可以竞争了(刚释放了锁的线程也可以竞争)
synchronized锁是一个非公平锁,有些线程可能一直无法执行。(公平锁是按照线程进行同步队列的顺序来执行的,非公平锁是如果当前有竞争的锁就先执行竞争的锁)
锁膨胀:synchronized锁在一定情况下会升级
先来介绍锁对象的概念:锁是被加到对象上面去的,被加的对象成为锁对象,每一个对象都可以成为锁对象(看看Object对象提供的那些方法就懂了)
从java对象的存储结构说起,分三个部分:对象头、实例数据、填充数据
上面是一个对象头的信息,可以看到markwork里面就存放着锁信息。
LockObject lockObject = new LockObject();//随便创建一个对象 synchronized(lockObject){ //代码 }
当执行以上代码的时候,Markword的部分信息如下
这是因为线程执行到同步代码区(临界区)的时候,会利用CAS(Compare and Swap)操作,将线程ID插入markwork中,同时修改偏向锁和标志位。
(比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。)
当偏向锁状态为1的时候,说明对象的偏向锁生效了,同时也可以看到是哪个线程获取了该对象的锁,(bit fields中的threadid)
什么是偏向锁:这个锁会偏向于第一个获得它的线程,在接下来的执行过程中,假如该锁没有被其他线程所获取,没有其他线程来竞争该锁,那么持有偏向锁的线程将永远不需要进行同步操作。(本质上的意思是,如果获得这个偏向锁的线程在没有其他线程竞争的情况下,又一次进入了该快同步代码块,或退出同步代码块,那么不需要再去加锁和解锁的操作。)
步骤如下:
- Load-and-test,也就是简单判断一下当前线程id是否与Markword当中的线程id是否一致.
- 如果一致,则说明此线程已经成功获得了锁,继续执行下面的代码.
- 如果不一致,则要检查一下对象是否还是可偏向,即“是否偏向锁”标志位的值。
- 如果还未偏向,则利用CAS操作来竞争锁,也即是第一次获取锁时的操作。
上面就是偏向锁的逻辑,但前提是,没有其他的竞争,如果有其他的竞争,线程id和markword的不一致,那么就是两个线程了,不存在偏向了(不可能偏向了,否则另一个线程怎么执行),在这种情况下,synchronized锁自动膨胀,升级为轻量级锁。
为什么需要偏向锁?
其实大多是时候,同一个线程会进入同一块同步代码块。(偏向锁默认开启)
锁撤销:偏向锁升级以后,需要将偏向锁撤销,锁撤销还是需要很大的开销的,过程如下:
- 在一个安全点停止拥有锁的线程。
- 遍历线程栈,如果存在锁记录的话,需要修复锁记录和Markword,使其变成无锁状态。
- 唤醒当前线程,将当前锁升级成轻量级锁。 所以,如果某些同步代码块大多数情况下都是有两个及以上的线程竞争的话,那么偏向锁就会是一种累赘,对于这种情况,我们可以一开始就把偏向锁这个默认功能给关闭
可以知道,其过程就是,先把拥有偏向锁的线程停止,然后修复锁记录和markword,使锁对象变成无锁状态,然后唤醒线程,然后把线程得到锁,并将锁升级为轻量级锁。
轻量级锁:
升级轻量级锁的过程
- 线程在自己的栈桢中创建锁记录 LockRecord。
- 将锁对象的对象头中的MarkWord复制到线程的刚刚创建的锁记录中。
- 将锁记录中的Owner指针指向锁对象。
- 将锁对象的对象头的MarkWord替换为指向锁记录的指针。
首先,栈帧是线程私有的,与线程的生命周期相同;在栈中记录自己获取锁的记录LockRecord,记录的内容就是锁对象的对象头中的MarkWord,然后记录中的Owner指针指向锁对象,并将锁对象的markword替换为指向锁记录的指针
轻量锁的标志是00
轻量级锁的分类:自旋锁、自适应自旋锁。
自旋锁:当有另一个线程来竞争锁时,这个线程不会进入阻塞状态,而是不断的在原地循环,直到当前线程释放锁资源之后,该线程马上能得到锁。
应用场景:自旋锁应用于那些同步代码执行很快的场景,这样竞争线程能很快得到锁,而不是一直运行cpu等待。
存在的问题:
- 如果同步代码块执行的很慢,需要消耗大量的时间,那么这个时侯,其他线程在原地等待空消耗cpu,这会让人很难受。
- 本来一个线程把锁释放之后,当前线程是能够获得锁的,但是假如这个时候有好几个线程都在竞争这个锁的话,那么有可能当前线程会获取不到锁,还得原地等待继续空循环消耗cup,甚至有可能一直获取不到锁
存在的问题是显而易见的,但事实上,当一个线程循环等待(空循环)的次数超过某一个值,锁会再次膨胀,升级为重量级锁。
自适应自旋锁:动态着根据实际情况来改变自旋等待的次数。
假如一个线程1刚刚成功获得一个锁,当它把锁释放了之后,线程2获得该锁,并且线程2在运行的过程中,此时线程1又想来获得该锁了,但线程2还没有释放该锁,所以线程1只能自旋等待,但是虚拟机认为,由于线程1刚刚获得过该锁,那么虚拟机觉得线程1这次自旋也是很有可能能够再次成功获得该锁的,所以会延长线程1自旋的次数。
另外,如果对于某一个锁,一个线程自旋之后,很少成功获得该锁,那么以后这个线程要获取该锁时,是有可能直接忽略掉自旋过程,直接升级为重量级锁的,以免空循环等待浪费资源。
轻量级锁是非阻塞锁(原理上就没有将线程阻塞)、乐观锁
轻量级锁的好处:线程的阻塞和开启都是需要开销的,要从用户态转换为内核态,因此,如果同步代码执行很快的情况下,使用轻量级锁是一种很好的选择。
重量级锁:依赖对象内部的monitor锁来实现的,而monitor又依赖操作系统的MutexLock(互斥锁)来实现的,这个时候,锁才用到了监视器锁,升级也会经过锁撤销和升级,标志位为10。
缺点:当系统检查到锁是重量级锁之后,会把等待想要获得锁的线程进行阻塞,被阻塞的线程不会消耗cup。但是阻塞或者唤醒一个线程时,都需要操作系统来帮忙,这就需要从用户态转换到内核态,而转换状态是需要消耗很多时间的,有可能比用户执行代码的时间还要长。
重量级锁是阻塞同步锁(显而易见的),悲观锁。
synchronized锁的这种升级策略使得其应用变得更加广泛了。
Lock接口
拥有的方法:
lock
lockInterruptibly
tryLock
tryLock
unlock
newCondition
其中lock()方法是获取锁,unlock()方法是释放锁
实现的类:
ReadLock in ReentrantReadWriteLock (java.util.concurrent.locks)
ReadLockView in StampedLock (java.util.concurrent.locks)
ReentrantLock (java.util.concurrent.locks)
Segment in ConcurrentHashMap (java.util.concurrent)
WriteLock in ReentrantReadWriteLock (java.util.concurrent.locks)
WriteLockView in StampedLock (java.util.concurrent.locks)
需要使用的就是ReentrantLock类和 ReentrantReadWriteLock下面的两个静态类ReadLock、WriteLock
锁机制的一篇好文
锁的实现原理:
Lock接口的实现大多数是通过聚合了一个同步器AbstractQueuedSynchronizer来完成线程访问的。
同步器使用一个int变量(用volatile修饰)的state变量来表示同步状态,通过内置的FOFO队列来完成资源获取的排队工作。Lock接口的实现类就是通过同步器来实现的。
同步器设计是通过模板方法模式,使用者继承同步器并重写指定方法。
使用三个方法来访问和修改同步状态:getState()(获取当前同步状态)、setState()(设置当前同步状态)、compareAndSetState()(使用CAS设置当前状态,保证状态的原子性)。
这样写还是很难理解,下面就介绍一个AQS的原理
1.AQS的核心思想:如果被请求的共享资源空闲,那么就将当前请求资源的线程设置为有效的工作线程,将共享资源设置为锁定状态;如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中。
上面这段话很好理解,这就是锁机制的关键
CLH:Craig、Landin and Hagersten队列,是单向链表,AQS中的队列是CLH变体的虚拟双向队列(FIFO),AQS是通过将每条请求共享资源的线程封装成一个节点来实现锁的分配。
AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。
通过上面的介绍,我们可以知道,每一个访问锁的线程被封装为一个Node,然后根据同步的状态来决定该Node如何排队,Node中的State是怎样的。
那么先介绍Node里面的属性和方法:
重要的就是WaitState、thread、prev、next(这几个方法和属性的含义一目了然,提供了封装的必要条件)
这两种模式在node里以静态final属性来实现。
witeState的一些枚举值,表示了状态的含义
以上就是Node的属性和方法的全部。
现在来看AQS里面的状态state,是一个int属性,并通过volatile来修饰。
这几个方法都是final修饰的,子类无法重写。
独占模式的流程。
从上图可以看出实现一个非公平锁的流程:首先通过CAS操作来修改状态,如果成功了就将该线程添加到setExclusiveOwnerThread里面
final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }
如果失败了,就调用acquire方法,acquire方法不可被子类重写,里面写了入队过程,通过addWaiter方法将节点放入队列的队尾,然后通过acquireQueued来循环获得状态
public final void acquire(int arg) { if (!tryAcquire(arg) && //tryAcquire是表示是否获取锁成功,如果成功就返回true,否则返回false,一般都是通过实现类来实现的
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail;//将pred指向队尾 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) {//使用CAS操作将节点放到队尾 pred.next = node; return node; } } enq(node); return node; }
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循环 final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) {//查看当前节点的前驱是不是头结点 setHead(node);//如果是头结点, p.next = null; // help GC failed = false; return interrupted;//返回一个不中断的标志 } if (shouldParkAfterFailedAcquire(p, node) && //如果不是头结点,中断,阻塞 parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
以上就是抢占式锁的原理
Lock接口实现机制就到这里了,其实还是有些地方不清楚
下面我们从“何时出队列?”和“如何出队列?”两个方向来分析一下acquireQueued源码:
// java.util.concurrent.locks.AbstractQueuedSynchronizer final boolean acquireQueued(final Node node, int arg) { // 标记是否成功拿到资源 boolean failed = true; try { // 标记等待过程中是否中断过 boolean interrupted = false; // 开始自旋,要么获取锁,要么中断 for (;;) { // 获取当前节点的前驱节点 final Node p = node.predecessor(); // 如果p是头结点,说明当前节点在真实数据队列的首部,就尝试获取锁(别忘了头结点是虚节点) if (p == head && tryAcquire(arg)) { // 获取锁成功,头指针移动到当前node setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 说明p为头节点且当前没有获取到锁(可能是非公平锁被抢占了)或者是p不为头结点,这个时候就要判断当前node是否要被阻塞(被阻塞条件:前驱节点的waitStatus为-1),防止无限循环浪费资源。具体两个方法下面细细分析 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
注:setHead方法是把当前节点置为虚节点,但并没有修改waitStatus,因为它是一直需要用的数据。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } // java.util.concurrent.locks.AbstractQueuedSynchronizer // 靠前驱节点判断当前线程是否应该被阻塞 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 获取头结点的节点状态 int ws = pred.waitStatus; // 说明头结点处于唤醒状态 if (ws == Node.SIGNAL) return true; // 通过枚举值我们知道waitStatus>0是取消状态 if (ws > 0) { do { // 循环向前查找取消节点,把取消节点从队列中剔除 node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 设置前任节点等待状态为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt主要用于挂起当前线程,阻塞调用栈,返回当前线程的中断状态。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
到此,我们就理解了整个Lock接口的机制
接下来,可以聊一聊CAS的机制,CAS是一种非阻塞同步机制,
非阻塞同步:CAS(Compare and Set)比较并设置
- 输入:
- 需要读写的内存位置 V
- 我们认为这个位置现在的值 A
- 想要写入的新值 B
- 输出: V 位置以前的值(无论写入操作是否成功)
- 含义: 我们认为 V 处的值应该是 A,如果是,把 V 处的值改为 B,如果不是则不修改,然后把 V 处现在的值返回给我。
CAS实现原子操作的三大问题
1、ABA 问题
问题描述: V 处的值经历了 A -> B -> A
的变化后,也认为是发生了变化的,而传统的 CAS 是无法发现这种变化的。
解决方法:
- 使用
AtomicStampedReference
的int stamp
版本号判断数据是否被修改过 - 使用
AtomicMarkableReference
的boolean marked
判断数据是否被修改过
2、循环时间长开销大。
自旋CAS如果长时间不成功,会给CPU带来非常大的执行开销
3、只能保证一个共享变量的原子操作。
当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁。还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来
接下里,我们开始正式的使用Lock接口的实现类开始对代码加锁,解锁。
Lock lock = new ReentrantLock(); lock.lock(); try { // 同步代码块 } finally { lock.unlock(); // 千万不能忘记在finally块中释放锁 }
上面就是Lock使用案例
下面介绍三个高级的lock方法
trylock():轮询锁:不会阻塞的锁。
trylock(long):定时锁,是可中断锁,能抛出中断异常。
lockInterruptibly():中断锁,能在获得锁的同时,保持对中断的响应。
公平锁和非公平锁
- 公平锁: 在有线程持有锁和有线程在队列中等待锁的时候,会将新发出请求的线程放入队列中,而不是立即执行它,也就是说,获取锁的顺序和线程请求锁的顺序是一样的。
- 非公平锁: 只当有线程持有锁时,新发出请求的线程才被放入队列中,如果新的线程到达时没有线程持有锁,但队列中有等待的线程(比如队列中的线程还在启动中,还没有拿到锁),这时新请求锁的线程会先于队列中的线程获取锁。
- 非公平锁性能更优的原因:
- 恢复一个被挂起的线程到这个线程真正运行起来之间,存在着巨大时时延
- 在等待队列中的线程被恢复的超长时延里,如果正好进来了一个线程获取锁,非公平锁会让这个新进来的线程先执行,它很有可以能等待队列中的线程恢复运行前就执行完了,相当于时间不变的情况下,利用等待线程的恢复运行时延,多执行了一个线程
- 只要当线程运行时间长,或锁的请求频率比较低时,才适合使用公平锁
读写锁
特点: 支持读操作并发执行,涉及到写操作时才线程间互斥执行。
方法:
- 获得读锁:
lock.readLock().lock()
- 释放读锁:
lock.readLock().unlock()
- 获得写锁:
lock.writeLock().lock()
- 释放写锁:
lock.writeLock().unlock()
Condition接口
任何一个java对象,都有一组监视器对象,包括wait()方法、wait(long)、notify()、以及notifyAll()方法。这些方法和synchronized关键字配合,可以实现等待/通知模式。
Condition接口也提供了类似的方法,与Lock接口配合可以实现等待/通知模式。
/* 获取Condition的方法 */ protected final Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); /* Condition接口中的方法 */ void await() throws InterruptedException; // 相当于wait() void awaitUninterruptibly(); long awaitNanos(long nanosTimeout) throws InterruptedException; boolean await(long time, TimeUnit unit) throws InterruptedException; // 相当于wait(long timeout) boolean awaitUntil(Date deadline) throws InterruptedException; void signal(); // 相当于notify() void signalAll(); // 相当于notifyAll()
lock与synchronized 比较
锁的释放
synchronized是在JVM层面上实现的,不但可以通过一些监控工具监控synchronized的锁定,而且在代码执行时出现异常,JVM会自动释放锁定,但是使用Lock则不行,lock是通过代码实现的,要保证锁定一定会被释放,就必须将unLock()放到finally{}中采用synchronized不需要用户去手动释放锁,当synchronized方法或者synchronized代码块执行完之后,系统会自动让线程释放对锁的占用;而Lock则必须要用户去手动释放锁,如果没有主动释放锁,就有可能导致出现死锁现象。
超时锁
如果使用 synchronized ,如果A不释放,B将一直等下去,不能被中断,如果 使用ReentrantLock,如果A不释放,可以使B在等待了足够长的时间以后,中断等待,而干别的事情
读写锁
如果多个线程都只是进行读操作,所以当一个线程在进行读操作时,其他线程只能等待无法进行读操作。通过Lock可以知道线程有没有成功获取到锁。这个是synchronized无法办到的。
效率问题
当竞争不是很激烈的时候Synchronized使用的是轻量级锁或者偏向锁,这两种锁都能有效减少轮询或者阻塞的发生,相比与Lock仍旧要将未获得锁的线程放入等待队列阻塞带来的上下文切换的开销,此时Synchronized效率会高些,当竞争激烈的时候Synchronized会升级为重量级锁,由于Synchronized的出对速度相比Lock要慢,所以Lock的效率会更高些。一般对于数据结构设计或者框架的设计都倾向于使用Lock而非Synchronized。
公平性
synchronized块不支持公平性,任何线程一旦释放就可以获得锁定,不能指定任何偏好。我们可以通过指定公平属性来实现Lock API中的公平性。它确保最长的等待线程可以访问锁
底层实现策略
syncronized:阻塞互斥同步最主要的问题就是进行线程阻塞和唤醒所带来的性能问题,因而这种同步又称为阻塞同步,它属于一种悲观的并发策略,即线程获得的是独占锁。独占锁意味着其他线程只能依靠阻塞来等待线程释放锁。而在CPU转换线程阻塞时会引起线程上下文切换,当有很多线程竞争锁的时候,会引起CPU频繁的上下文切换导致效率很低。synchronized采用的便是这种并发策略。?lock:CAS1.在乐观的并发策略中,需要操作和冲突检测这两个步骤具备原子性,它靠硬件指令来保证,这里用的是CAS操作(Compare and Swap)。JDK1.5之后,Java程序才可以使用CAS操作。我们可以进一步研究ReentrantLock的源代码,会发现其中比较重要的获得锁的一个方法是compareAndSetState,这里其实就是调用的CPU提供的特殊指令。现代的CPU提供了指令,可以自动更新共享数据,而且能够检测到其他线程的干扰,而compareAndSet() 就用这些代替了锁定。这个算法称作非阻塞算法,意思是一个线程的失败或者挂起不应该影响其他线程的失败或挂起。2.随着指令集的发展,我们有了另一种选择:基于冲突检测的乐观并发策略,通俗地讲就是先进性操作,如果没有其他线程争用共享数据,那操作就成功了,如果共享数据被争用,产生了冲突,那就再进行其他的补偿措施(最常见的补偿措施就是不断地重拾,直到试成功为止),这种乐观的并发策略的许多实现都不需要把线程挂起,因此这种同步被称为非阻塞同步。ReetrantLock采用的便是这种并发策略。
synchronized 和 ReentrantLock 的选择
- 选择方式:
- 只有当我们需要如下高级功能时才使用 ReentrantLock,否则优先使用 synchronized
- 可轮询、可定时、可中断的锁
- 公平锁
- 非块结构的锁
- 只有当我们需要如下高级功能时才使用 ReentrantLock,否则优先使用 synchronized
- 优先选择 synchronized 的原因:
- Java 6开始,ReenstrantLock 和内置锁的性能相差不大
- synchronized 是 JVM 的内置属性,未来更有可能对 synchronized 进行性能优化,如对线程封闭的锁对象的锁消除,增加锁的粒度等
- ReenstrantLock 危险性更高(如忘记在 finally 块中 lock.unlock() 了,会导致锁永远无法被释放,出现问题,极难 debug)
- 许多现有程序中已使用了 synchronized,两种方式混合使用比较易错
一篇好文
锁差不多就这些内容了。下面总结了所有锁的分类
final的线程安全
对final域来说,编译器和处理器需要遵守以下规则:
1.在构造函数内对一个final域的写入和随后把这个被构造的对象的引用赋值给一个引用变量,这两个操作之间不能重排序;
什么意思,就是不能先把对象的引用赋值给引用变量之后才在构造函数内对final进行写入。
2.初次读一个final对象的引用,与随后初次读这个final域,两个操作之间不能重排序。
需要先读对象的引用,在读final域。
为什么final引用不能从构造函数内“逸出”
在引用变量为任意线程可见之前,该引用变量指向的对象final域已经在构造函数中被正确的初始化了。
但这样还不够,必须保证初始化的时候不能对任意线程可见,即在构造函数内部,不能让这个被构造的引用为其他线程所见。
如上图所示。
同步容器类
- Vector
- HashTable
这些容器实现同步,其实就是对每一个共有方法加synchronized关键字来实现。相当于让所有对容器的访问串行操作。并发性差。
并发容器类
ConcurrentHashMap
实现了concurrentMap接口,可以在并发环境下实现更高的吞吐量。在单线程环境里指损失很小的性能。
它还实现了分段锁,使任意数量的读取线程能并发的访问Map,一定数量的写入线程可以并发的修改Map。
不会抛出 ConcorrentModificationException,它返回迭代器具有“弱一致性”,即可以容忍并发修改,但不保证将修改操作反映给容器;
size() 的返回结果可能已经过期,只是一个估计值,不过 size() 和 isEmpty() 方法在并发环境中用的也不多;
提供了许多原子的复合操作
1.V putIfAbsent(K key, V value):K 没有相应映射才插入
2.boolean remove(K key, V value):K被移除V才被移除
3.boolean replace(K key, V oldValue, V newValue):K 被映射到 oldValue 时才替换为 newValue
在构造的时候,Segment 的数量由所谓的 concurrentcyLevel 决定,默认是 16
Segment 是基于 ReentrantLock 的扩展实现的,在 put 的时候,会对修改的区域加锁
锁分段实现的原理:
不同线程在同一数据的不同部分上不会互相干扰,例如,ConcurrentHashMap 支持 16 个并发的写入器,是用 16 个锁来实现的。它的实现原理如下:
- 使用了一个包含 16 个锁的数组,每个锁保护所有散列桶的 1/16,其中第 N 个散列桶由第(N % 16)个锁来保护;
- 这大约能把对于锁的请求减少到原来的 1/16,也是 ConcurrentHashMap 最多能支持 16 个线程同时写入的原因;
- 对于 ConcurrentHashMap 的 size() 操作,为了避免枚举每个元素,ConcurrentHashMap 为每个分段都维护了一个独立的计数,并通过每个分段的锁来维护这个值,而不是维护一个全局计数;
- 关于 put 操作:
- 是否需要扩容
- 在插入元素前判断是否需要扩容,
- 比 HashMap 的插入元素后判断是否需要扩容要好,因为可以插入元素后,Map 扩容,之后不再有新的元素插入,Map就进行了一次无效的扩容
- 如何扩容
- 先创建一个容量是原来的2倍的数组,然后将原数组中的元素进行再散列后插入新数组中
- 为了高效,ConcurrentHashMap 只对某个 segment 进行扩容
- 是否需要扩容
- 关于 size 操作:
- 存在问题:如果不进行同步,只是计算所有 Segment 维护区域的 size 总和,那么在计算的过程中,可能有新的元素 put 进来,导致结果不准确,但如果对所有的 Segment 加锁,代价又过高。
- 解决方法:重试机制,通过获取两次来试图获取 size 的可靠值,如果没有监控到发生变化,即
Segment.modCount
没有变化,就直接返回,否则获取锁进行操作。
CopyOnWriteArrayList
- 只要正确发布了这个 list,它就是不可变的了,所以随便并发访问,当需要修改时,就创建一个新的容器副本替代原来的,以实现可变性;
- 应用于迭代操作远多于修改操作的情形,如:事件通知系统,分发通知时需要迭代已注册监听器链表,并调用每一个监听器,一般注册和注销事件监听器的操作远少于接收事件通知的操作。
并发工具类
可以根据自身状态协调线程的控制流:
- 生产者消费者模式:阻塞队列(BlockingQueue)
- 并发流程控制:
- 闭锁(CountDownLatch)
- 栅栏(Barrier)
- 信号量(Semaphore)
- 线程间的数据交换:交换者(Exchanger)
BlockingQueue 阻塞队列
BlockingQueue提供了可阻塞的put和take方法,
如果队列为空,take方法会一直被阻塞,直到队列中出现一个可用的元素。
如果队列为满,put方法会一直被阻塞,直到队列出现可用空间。
可以看出这是生产者-消费者模式的利器。
以上就是实现了的多种阻塞队列
阻塞队列实现的原理:(关注点是BQ是如何在队列满的时候通知put以及如何在队列空的时候通知take的)
- ArrayBlockingQueue 中有一个 ReentrantLock lock;
- 这个 lock 给我们提供了两个 Condition:notEmpty 和 notFull;
- put操作中,会以 while 循环的方式轮询 count == items.length,如果为 true,就 notFull.await(),这个阻塞状态需要通过 dequeue 方法中的 notFull.signal() 来解除;
- take操作中,会以 while 循环的方式轮询 count == 0,如果为 true,就 notEmpty.await(),这个阻塞状态需要通过 enqueue 方法中的 notEmpty.signal() 来解除。
从以上描述可以知道,是通过lock机制解决的。
CountDownLatch 闭锁
可以让一个或多个线程等待其他线程操作完成在继续执行,不可以循环使用,只能使用一次。
public CountDownLatch(int count); // 参数count为计数值 // 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,或等待中线程中断 public void await() throws InterruptedException; // 和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行 public boolean await(long timeout, TimeUnit unit) throws InterruptedException; public void countDown(); // 将count值减1
CyclicBarrier 栅栏
可以让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,让所有线程通过,并且这个屏障可以循环使用(这点和 CountDownLatch 很不同)。
/** * parties指让多少个线程或者任务等待至barrier状态 * barrierAction为当这些线程都达到barrier状态时会执行的内容 */ public CyclicBarrier(int parties, Runnable barrierAction); // 常用 public CyclicBarrier(int parties); public int await() throws InterruptedException, BrokenBarrierException; public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException;
Semaphore 信号量
用来控制同时访问特定资源的线程数量
// 参数permits表示许可数目,即同时可以允许多少线程进行访问,默认是非公平的 public Semaphore(int permits) { sync = new NonfairSync(permits); } // 这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可 public Semaphore(int permits, boolean fair) { sync = (fair) ? new FairSync(permits) : new NonfairSync(permits); } /* 会阻塞等待的acquire方法 */ public void acquire() throws InterruptedException; // 获取一个许可 public void acquire(int permits) throws InterruptedException; // 获取permits个许可 public void release(); // 释放一个许可 public void release(int permits); // 释放permits个许可 /* 会阻塞但不等待,立即返回的acquire方法 */ // 尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire() { } // 尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { } // 尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false public boolean tryAcquire(int permits) { } // 尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }
Exchanger
用于两个线程之间交换数据的工具类,第一个线程执行了exchange(V)方法,它会阻塞在那里,等待第二个线程执行exchange(V),exchange(V)会返回另一个线程传入的值
public Exchanger(); public V exchange(V x) throws InterruptedException; public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException;
线程池
为什么使用线程池
在正常的负载下,服务器应用程序应该同时表现出良好的吞吐量和快速的响应性。
当负载过载时,应用程序的性能应该逐渐降低,而不是直接失败。
如果不使用线程池,为每一个任务都创建一个线程来执行,其一,线程的创建和销毁都需要时间,其二,线程数超过cpu数,增加线程反而会降低性能,因为会出现频繁的上下文切换。
因此,使用线程池的好处就是:
1.降低资源消耗:可以重复使用已经创建好的线程。
2.提高响应速度:任务到达时,不需要等待线程创建的时间。
3.提高线程的可管理性。
线程的Executor框架
Executor 框架的主要成员:
- ThreadPoolExecutor
- ScheduledThreadPoolExecutor
- Future 接口 & FutureTask 实现类
- Executors 工厂类
Executor是一个接口,只定义了一个方法
void execute(Runnable command);
这是几乎所有需要执行的类需要的方法,历练是一个要执行的接口类,执行的run方法需要自己定义。
ExecutorService是继承了Exector的接口,里面定义了一些需要实现的方法。
// shutdown方法将执行平缓的关闭过程: // 不再接受新的任务,同时等待已经提交的任务执行完成(包括那些还未开始执行的任务) void shutdown(); // 执行粗暴的关闭过程: // 它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务 List<Runnable> shutdownNow(); boolean isshutdown(); // 返回ExecutorService是否已经终止 boolean isTerminated(); // 等待ExecutorService到达终止状态,一般调用完它之后立即调用shutdown boolean awaitTermination(long timeout,TimeUnit unit) throws InterruptedException; // ...
对于线程池的实现类:ThreadPoolExecutor,它继承了抽象类AbstractExecutorService,而该抽象类则实现了ExecutorService接口。
这个类的一些方法就不多做描述了,具体线程池的处理步骤如下:
下面是Executor框架的执行框图:
Executor框架由三大部分组成:
1.任务,被执行的任务需要实现的接口:Runnable接口。Callable接口。
2.任务的执行:包括执行的核心接口Executor,以及继承这些接口的实现类。
3.异步计算的结果:包括Future接口和接口的实现类。
Future接口以及实现类:
Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。
Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。
public incerface Future<V>{ boolean cance1(boolean mayInterruptIfRunning); boolean iscancelled(); boolean isDone(); V get() throws InterruptedException,ExecutionException, CancellationException; V get(long timeout,TimeUnit unit) throws InterruptedException,ExecutionException, CancellationException,TimeoutException; }
Future需要自己实现,提供了一个FutureTask的实现类
状态迁移
执行过程
Future 的 get 方法对于任务的状态的不同表现:
- 任务已完成:立即返回结果或抛出异常。
- 任务未完成:阻塞直到任务完成。
- 任务抛出异常:将异常封装为 ExecutionException 后再次抛出,ExecutionException 异常可以通过 getCause() 方法获取被封装的初始异常。
- 任务被取消:抛出 CancallationException 异常,这是个 RuntimeException,需要显式 catch。
ThreadPoolExecutor 线程池
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
分类
FixedThreadPool
特点: 固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,如果某个线程由于发生了未预期的 Exception 而结束,那么线程池会补充一个新的线程。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, // 线程池大小不可扩展 0L, TimeUnit.MILLISECONDS, // 多余线程会被立即终止 new LinkedBlockingQueue<Runnable>()); // 使用容量为 Integer.MAX_VALUE 的工作队列 // 由于使用了无界队列,不会拒绝任务,所以不会调用 handler }
CacheThreadPool
特点: 可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 初始为0,线程池中的线程数是无界的 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
注意:
- 池中不会有空闲线程,也不会有等待的线程
- 一旦任务到达的速度大于线程池处理任务的速度,就会创建一个新的线程给任务
- 与另外两个线程池不同的地方在于,这个工作队列并不是用来放还没有执行的任务的,而是用来放执行过任务后空闲下的线程的,空闲下来的线程会被:
SynchronousQueue#poll(keepAliveTime, TimeUnit.NANOSECONDS)
poll 到工作队列中等待 60s,如果这 60s 有新的任务到达了,这个线程就被派出去执行任务,如果没有,就销毁。
SingleThreadPool
特点: 单个工作者线程来执行任务,如果这个线程异常结束,会创建另一个线程来替代。能确保依照任务在队列中的顺序来串行执行。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, // 线程池的大小固定为1 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); // 使用容量为 Integer.MAX_VALUE 的工作队列 }
线程池特点:
- 在创建 ThreadPoolExecutor 初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用 prestartAllCoreThreads
- 将线程池的 corePoolSize 设置为 0 且不使用 SynchronousQueue 作为工作队列会产生的奇怪行为:只有当线程池的工作队列被填满后,才会开始执行任务
- 产生原因:如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,因为这个时候才会创建新的线程,在此之前,线程池只有在工作队列中等待任务,没有执行任务的线程。
ScheduledThreadPoolExecutor
特点: 可以在给定的延迟后运行命令,或者定期执行命令。比 Timer 更灵活,功能更强大。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit)
实现原理:
- 使用 DelayWorkQueue 作为工作队列,ScheduledThreadPoolExecutor 会把待执行的任务 ScheduledFutureTask 放到工作队列中
- ScheduledFutureTask 中有以下 3 个主要的成员变量:
- long time:表示该任务将要被执行的具体时间;
- long sequenceNumber:表示任务被添加到 ScheduledThreadPoolExecutor 中的序号;
- long period:表示任务执行的间隔周期。
- 任务执行的过程:
- 线程从 DelayWorkQueue 中获取到期的任务;
- 执行这个任务;
- 修改这个任务的 time 为下一次的执行时间;
- 将该任务再次 add 进 DelayWorkQueue。
对比 Timer(Timer 的缺陷)
- Timer 在执行所有定时任务时只会创建一个线程。如果有一个任务执行时间太长导致它后面的任务超时,那么后面超时的任务会立即执行,从而破坏了其他 TimerTask 的准时执行。线程池能弥补这个缺陷,因为它可以提供多个线程来执行延时任务和周期任务。
- 线程泄漏:Timer 线程并不捕获未检查异常,当 TimerTask 抛出未检查的异常时将终止定时线程。这种情况下,整个 Timer都会被取消,将导致已经被调度但尚未执行的 TimerTask 将不会再执行,新的任务也不能被调度。
拒绝策略
JDK 提供了 4 种 RejectedExecutionHandler 接口的实现,它们都是以 ThreadPoolExecutor 类的静态内部类的形式定义的,它们的具体实现以及拒绝策略如下:
AbortPolicy(默认)流产
抛出异常,调用者自己处理
public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); // 抛异常! } }
DiscardPolicy (Discard:抛弃)
抛弃新提交的任务
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { //这里什么都不干 } }
DiscardOldestPolicy(抛弃下一个被执行的任务)
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 先判断线程池关没 e.getQueue().poll(); // 丢到等待队列中下一个要被执行的任务 e.execute(r); // 重新尝试提交新来的任务 } } }
抛弃下一个被执行的任务,然后重新尝试提交任务
CallerRunsPolicy
既不抛异常,也不抛任务
- 它不会在线程池中执行该任务,而是在调用 execute 提交这个任务的线程执行
- 如当主线程提交了任务时,任务队列已满,此时该任务会在主线程中执行。这样主线程在一段时间内不会提交任务给线程池,使得工作者线程有时间来处理完正在执行的任务
- 可以实现服务器在高负载下的性能缓慢降低
- 提交任务的应用程序被拿去执行任务了,不会返回 accept,TCP 层的请求队列会被填满而抛弃请求,客户端才会反应过来,即可以通过 TCP 层来缓冲一下
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 直接在把它提交来的线程调用它的 run 方法,相当于没有新建一个线程来执行它, // 而是直接在提交它的线程执行它,这样负责提交任务的线程一段时间内不会提交新的任务来 r.run(); } } }
线程池大小的设置
- 计算密集型任务: N = N_cpu + 1
- 加 1 的原因:当有一个线程偶尔故障时,额外的那个线程可以立即补上,保证CPU时钟不会被浪费
- 包含 I/O 或其他阻塞操作: N = N_cpu * U_cpu * (1 + W / C)
- N_cpu:CPU 的个数
- U_cpu:目标 CPU 利用率
- W / C:等待时间 (Wait) / 计算时间 (Compute)
- 获取 CPU 数目的方法:
int N_CPUS = Runtime.getRuntime().availableProcessors();
生产者-消费者模式
生产者生产数据到缓冲区中,消费者从缓冲区中取数据。
如果缓冲区已经满了,则生产者线程阻塞;
如果缓冲区为空,那么消费者线程阻塞。
方式一:
synchronized、wait和notify
生产者线程需要的操作:添加资源,在添加的时候需要判断资源是不是满了,如果满了就不可以继续添加资源了。
消费者线程需要的操作:减少资源,在减少的时候需要判断资源是不是空了,如果空了就不能继续减少了。
逻辑很简单,利用synchronied以及在添加和减少的时候保持线程的同步,而当生产者执行添加代码的时候判断资源已经满了,则将生产线程阻塞,当生产了一个资源以后,可以唤醒消费线程,让其来消费。
消费者的逻辑和生产者一致。
package com.liuxinghang.bingfa; public class ProducerAndConsumer { public static void main(String[] args) { Resource resource=new Resource(); ProducerThread p1=new ProducerThread(resource); ConsumerThread c1=new ConsumerThread(resource); ConsumerThread c2=new ConsumerThread(resource); p1.start(); c1.start(); c2.start(); } } //资源类 class Resource{ //当前的资源数 private int num=0; //可以存储的资源数 private int size=10; //添加资源 public synchronized void add(){ if(num<size)//没有满的时候 { num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //因为已经生产了,现在可以通知消费者来消费了 notifyAll(); }else {//资源已经满了 try { wait();//生产者线程阻塞,因为已经满了 System.out.println(Thread.currentThread().getName()+"线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } } public synchronized void remove(){ if(num>0)//资源不为空,可以消费 { num--; System.out.println(Thread.currentThread().getName() + "消费一件资源,当前资源池有" + num + "个"); //已经消费了一件了,资源不会为满,因此可以通知生产者来生产了。 notifyAll(); }else {//资源已经空了 try { wait();//消费者线程阻塞,因为已经空了 System.out.println(Thread.currentThread().getName()+"线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } } } //生产者的线程,只做一件事,就是添加资源 class ProducerThread extends Thread{ private Resource resource; public ProducerThread(Resource resource){ this.resource = resource; } @Override public void run() { resource.add(); } } //消费者的线程,只做一件事,就是添加资源 class ConsumerThread extends Thread{ private Resource resource; public ConsumerThread(Resource resource){ this.resource = resource; } @Override public void run() { resource.remove(); } }
方式二
lock和condition的await、signalAll
因为逻辑和上面的一样,所以只是演示一下lock的用法。
package com.liuxinghang.a; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ProducerAndConsumer1 { public static void main(String[] args) { Lock lock = new ReentrantLock(); Condition producerCondition = lock.newCondition(); Condition consumerCondition = lock.newCondition(); Resource1 resource = new Resource1(lock,producerCondition,consumerCondition); ProducerThread p1=new ProducerThread(resource); ConsumerThread c1=new ConsumerThread(resource); ConsumerThread c2=new ConsumerThread(resource); p1.start(); c1.start(); c2.start(); } } class Resource1{ private int num=0;//当前资源数 private int size=10;//资源总数 //锁 private Lock lock; private Condition producerCondition; private Condition consumerCondition; public Resource1(Lock lock, Condition producerCondition, Condition consumerCondition) { this.lock = lock; this.producerCondition = producerCondition; this.consumerCondition = consumerCondition; } public void add(){ //添加之前加锁 lock.lock(); try { if (num < size) { num++; System.out.println(Thread.currentThread().getName() + "生产一件资源,当前资源池有" + num + "个"); //生产了商品可以唤醒消费者了 consumerCondition.signalAll(); } else { //让生产者线程阻塞 try { producerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { lock.unlock(); } } public void remove(){ //添加之前加锁 lock.lock(); try { if (num >0) { num--; System.out.println(Thread.currentThread().getName() + "消费一件资源,当前资源池有" + num + "个"); //消费了商品可以唤醒生产者了 producerCondition.signalAll(); } else { //让消费者者线程阻塞 try { consumerCondition.await(); System.out.println(Thread.currentThread().getName() + "线程进入等待"); } catch (InterruptedException e) { e.printStackTrace(); } } }finally { lock.unlock(); } } } //生产者的线程,只做一件事,就是添加资源 class ProducerThread extends Thread{ private Resource1 resource; public ProducerThread(Resource1 resource){ this.resource = resource; } @Override public void run() { resource.add(); } } //消费者的线程,只做一件事,就是添加资源 class ConsumerThread extends Thread{ private Resource1 resource; public ConsumerThread(Resource1 resource){ this.resource = resource; } @Override public void run() { resource.remove(); } }
方式三:
BlockingQueue阻塞队列
阻塞队列是一个天然的生产者-消费者。
package com.liuxinghang.a; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; public class BlockingQueueConsumerProducer { public static void main(String[] args) { Resource resource=new Resource(); ProducerThread1 p = new ProducerThread1(resource); ConsumerThread1 c=new ConsumerThread1(resource); p.start(); c.start(); } } class Resource{ BlockingQueue resourceQueue=new LinkedBlockingQueue(10);//队列里面可以加入10个资源 public void add(){ try { resourceQueue.put(1); System.out.println("生产者" + Thread.currentThread().getName() + "生产一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } public void remove(){ try { resourceQueue.take(); System.out.println("消费者" + Thread.currentThread().getName() + "消耗一件资源," + "当前资源池有" + resourceQueue.size() + "个资源"); } catch (InterruptedException e) { e.printStackTrace(); } } } class ProducerThread1 extends Thread{ private Resource resource; public ProducerThread1(Resource resource) { this.resource = resource; } @Override public void run() { //添加 while(true){ resource.add(); } } } class ConsumerThread1 extends Thread{ private Resource resource; public ConsumerThread1(Resource resource) { this.resource = resource; } @Override public void run() { //添加 // while(true){ resource.remove(); // } } }
线程池设计
线城池就是以一个或多个线程来执行多个应用程序的线程集合
方案一:
编写一个线程池接口
public interface ThreadPool <Job extends Runnable> { //执行一个任务(Job),这个Job必须实现Runnable void execute(Job job); //关闭线程池 void shutdown(); //增加工作者线程,即用来执行任务的线程 void addWorkers(int num); //减少工作者线程 void removeWorker(int num); //获取正在等待执行的任务数量 int getJobSize(); }
概念:Job是一个任务,它继承了Runnable接口,可以用户自己定义自己要执行的任务,把任务放在run方法里面。
工作者线程:也是一个任务,只是每一个工作者都在做一件事,就是从job队列中取出job来,然后将该job运行。
当然这里面就涉及到可能job队列是空的,没办法取到,因此,需要等待唤醒机制。
public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { //线程池维护工作线程的最大数量 private static final int MAX_WORKER_NUMBERS=10; //线程池维护工作线程的默认值 private static final int DEFAULT_WORKER_NUMBERS=5; //线程池维护工作线程的最小数量 private static final int MIN_WORKER_NUMBERS=1; //维护一个工作列表,加入客户端发起的工作任务 private final LinkedList<Job> jobs=new LinkedList<Job>(); //工作者线程的列表 private final List<Worker> workers= Collections.synchronizedList(new ArrayList<Worker>()); //工作者线程的数量 private int workerNum; //每个工作者线程编号的生成 private AtomicLong threadNum=new AtomicLong(); //生成默认的线程池 public DefaultThreadPool(){ this.workerNum=DEFAULT_WORKER_NUMBERS; initializeWorkers(this.workerNum); } public DefaultThreadPool(int num){ if(num>MAX_WORKER_NUMBERS){ this.workerNum=DEFAULT_WORKER_NUMBERS; }else { this.workerNum=num; } initializeWorkers(this.workerNum); } //初始化每一个工作者的线程 private void initializeWorkers(int num){ for(int i=0;i<num;i++){ Worker worker=new Worker(); workers.add(worker); Thread thread=new Thread(worker); thread.start(); } } //执行job @Override public void execute(Job job) { if(job!=null){ synchronized (jobs){//这里使用生产者消费者模式 jobs.addLast(job); jobs.notifyAll(); } } } @Override public void shutdown() { for(Worker worker:workers){ worker.shutdown(); } } @Override public void addWorkers(int num) { //加锁,防止该线程还没增加完成而下个线程继续增加导致工作者线程超过最大值 synchronized (jobs){ if(num + this.workerNum > MAX_WORKER_NUMBERS){ num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWorkers(num); this.workerNum+=num; } } @Override public void removeWorker(int num) { synchronized (jobs) { if(num>=this.workerNum){ throw new IllegalArgumentException("超过了已有的线程数量"); } for (int i = 0; i < num; i++) { Worker worker = workers.get(i); if (worker != null) { //关闭该线程并从列表中移除 worker.shutdown(); workers.remove(i); } } this.workerNum -= num; } } @Override public int getJobSize() { return workers.size(); } //定义一个工作者的线程类 class Worker implements Runnable{ //表示是否运行该worker private volatile boolean running=true; @Override public void run() { while(running){ Job job=null; //线程的等待通知机制 synchronized (jobs){ if(jobs.isEmpty()){ try { jobs.wait(); } catch (InterruptedException e) { //感知到外部对该线程的中断操作,返回 Thread.currentThread().interrupt(); return; } } //如果不为空,从job里面取出一个job(job是任务) job=jobs.removeFirst(); } //执行job if(job!=null){ job.run(); } } } public void shutdown(){ running=false; } } }
DefaultThreadPool
方案二:通过ThreadGroup来创建
线程池一般需要一个线程管理类: ThreadPoolManager,其作用有:
1)提供创建一定数量的线程的方法。主线程调用该方法,从而创建线程。创建的线程执行自己的例程,线程的例程阻塞在任务抓取上。
2)提供对任务队列的操作的方法。主线程调用初始化任务队列的方法,然后在有任务的时候,调用提供的任务添加方法,将任务添入等待队列。当主线程调用任务的添加方法时,会触发等待的线程,从而使得阻塞的线程被唤醒,其抓取任务,并执行任务。
线程池需要一个任务队列: List<Task>,其作用有:
提供任务的增删方法。而且该任务队列需要进行排他处理,防止多个工作线程对该任务队列进行同时的抓取操作或者主线程的加入与工作线程的抓取的并发操作。
线程池需要一个类似信号量的通知机制:wait -notify:
工作线程调用wait阻塞在任务抓取上。主线程添加任务后,调用notify触发阻塞的线程。
线程池需要一个线程类:WorkThread,其作用有:
提供线程的例程。创建线程WorkThread后,需要抓取任务,并执行任务。这是线程的例程。
线程池需要一个任务类:Task,其作用有:
提供线程抓取并执行的任务目标。
原文地址:https://www.cnblogs.com/lovejune/p/12502999.html