并发(Concurrency) 与并行(Parallelism)
以KTV唱歌为例, Parallelism 是指有多少人可以使用话筒同时唱歌,
Concurrency是指同一个话筒被多少个人轮流使用;
一个科室两个专家同时出诊, 就是两个并行任务; 其中一个医生时而问诊, 时而看化验单,
然后继续问诊, 就是并发;
- 并发特点:
- 并发程序之间相互制约;
- 并发程序的执行过程断断续续;
- 当并发数量设置合理并且CPU 拥有足够能力时, 并发可以提高程序效率;
7.1 线程安全
一、线程状态
- NEW , 线程被创建且未启动的状态.
创建线程的三种方式:
- extends Thread , @Override run()
- implements Runnable , @Override run()
- implements Callable , V @Override call(), call() 可以抛出异常
- RUNNABLE , 就绪状态, 是调用start() 之后运行之前的状态;
多次调用start() 会抛出IllegalStateException;
- RUNNING , 允许状态,
- BLOCKED , 阻塞状态
几个原因:
- 同步阻塞: 锁被其他线程占用;
- 主动阻塞: 调用Thread某些方法, 主动让出CPU执行权, 如sleep() 、join()
- 等待阻塞: 执行了wait()
- DEAD , run() 结束或异常退出, 不可逆;
二、保证高并发场景下线程安全的几个度量
- 数据单线程内可见:
如ThreadLoca采用此机制
- 只读对象:
允许复制, 拒绝写入; 如String Integer
一个对象拒绝写入的条件:
- final修饰, 避免被继承;
- private final 修饰, 避免属性别中途修改;
- 没有任何更新方法;
- 返回值不能可变对象的应用;
- 线程安全类:
如StringBuffer采用了synchronized修饰;
- 同步与锁机制:
三、Java并发包(java.util.concurrent, JUC)
@auther Doug Lea
- 主要类族:
- 线程同步类
逐步淘汰了Object wait()和 notify();
如: CountDownLatch Semaphore CyclicBarrier
-
并发集合类
执行速度快, 提取数据准; 如ConcurrentHashMap
-
线程管理类
使用线程池;如 Executors的静态工厂 和 ThreadPoolExecutor等, 通过ScheduledExecutorService来执行定时任务;
-
锁
Lock接口; ReentrantLock
- 线程同步类
7.2 什么是锁
一、锁的两种特性: 互斥性和不可见性
二、Java锁的常见两种实现方式:
- JUC包中的锁类 : volatile
- 利用同步代码块 : synchronized
- 同步对象, 同步方法
- 原则: 锁的范围尽可能小, 时间尽可能短; 能锁对象就不锁类, 能锁代码块就不要锁方法;
- synchronized 通过JVM实现
- 监视锁monitor是每个对象与生俱来的隐藏字段, 通过monitor状态来加锁解锁
- 字节码文件中通过monitorente, monitorexit来加锁解锁;
7.3 线程同步
7.3.1 同步是什么
* 原子性 (i++不具备1)
7.3.2 volatile
- 线程的可见性: 某线程修改共享变量的指令对其他线程来说都是可见的, 反映的是指令执行的实时透明度;
- 解决双重检查锁定( Double-checked Locking )问题
如:
VolatileNotAtomic.java
, 可以事项count++原子操作的其他类有AtomicLong和LongAdder;
jdk1.8推荐LongAdder, 它减少了乐观锁的重试次数; -
volatile是轻量级的线程操作可见方式, 并非同步方式, 如果用于多写环境, 会造成线程安全问题;
如果是一写多读的并发场景, 则修饰变量非常合适, 如 CopyOnWriteArrayList接口中
它修改数据时候会把整个数据集合复制, 对写加锁, 修改后再用setArray() 指向新的集合// 源码 package java.util.concurrent; public class CopyOnWriteArrayList<E> { // 集合真正存储元素的数组 private transient volatile Object[] array ; final void setArray (Object [] a ) { array = a; } }
- volatile会使线程的执行速度变慢
7.3.3 信号量同步
信号量同步是指不同线程之间通过传递同步信号量来协调线程执行的先后次序;
基于时间维度和信号维度的两个类
- CountDownLatch
CountDownLatchTest.java
- (倒数); countDown() 用于使计数器减一, await()方法用于调用该方法的线程处于等待状态;
- Semaphore
* acquire() (获取) 调用成功后往下一步执行;
* release() (释放) 释放持有的信号量, 下一线程可以获取空闲信号量来进入执行;
* CyclicBarrier (循环使用的屏障式)
通过reset() 释放线程资源;
* **结论:** 无论是从性能还是安全性, 我们应该尽量使用JUC并发包中的小号量同步类, 而避免使用对象的wait()和notify();
**体具可以参考[并发编程网](http://ifeve.com/)**
## 7.4 线程池
### 7.4.1 ThreadPool的好处
* 利用线程池管理并复用线程, 控制最大并发数等;
* 实现任务线程队列缓存策略和拒绝机制;
* 实现某些与时间相关的功能, 如定时执行, 周期执行等;
* 隔离线程环境
如何创建线程
#### 1. ThreadPoolExecutor 构造方法:
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
常驻核心线程数;设置过大造成资源浪费, 过小造成频繁创建销毁;
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
表示能够容纳同时执行的最大运行的线程数
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
线程空闲时间, 空闲时间达到时会被销毁; 默认线程数大于corePoolSize时生效
* @param unit the time unit for the {@code keepAliveTime} argument
时间单位
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
缓存队列; 线程大于maximmPoolSize时, 进入BlockingQueue阻塞队列, LinkedBlockingQueue是单向链表, 用于控制入队出队的原子性,
两个锁分别控制元素的添加和获取, 是生产消费模型队列
* @param threadFactory the factory to use when the executor
* creates a new thread
线程工厂; 生产一组相同任务的线程
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
执行拒绝策略的对象; 当超过workQueue缓存区上限是, 来处理请求;一种简单的限流保护;
友好的拒绝策略:
1. 保存到数据库削峰填谷. 空闲时再取出来执行;
2. 转向提示页面;
3. 打印日志;
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
// 队列, 线程工厂, 拒绝策略都必须实例化
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
// ...
}
#### 2. Executor和ThreadPoolExecutor关系
![图7-4 线程池相关类图](https://img2018.cnblogs.com/blog/1084504/201904/1084504-20190410212621790-561282211.jpg)
* Executors 的5个核心方法 :
* newFixedThreadPool : jdk1.8引入
* newCachedThreadPool
* newScheduledThreadPool
* newSingleThreadExecutor: 创建单线程的线程池
* newFixedThreadPool
* LinkedBlockingQueue
// 无边界队列, 如果请求瞬间非常大, 会造成OOM风险
// 除了newWorkStealingPool其他四个方法都有资源耗尽风险
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
```
- Executes默认的线程工厂和拒绝策略过于简单, 对用户不友好;
UserThreadFactory.java
- RejectedExecutionHandler
- ThreadPoolExecutorde 的四个公开内部类
- AbortPolicy 丢弃任务并抛出RejectedExecutionException异常
- DiscardPolicy 丢弃任务, 不抛异常(不推荐)
- DiscardOldestPolicy 抛弃任务中等待最久的任务, 把当前任务加入队列
- CallerRunsPolicy 直接调用任务的run(), 如果线程池
- 根据之前实现的结程工厂和拒绝策略,线程池的相关代码实现 :
UserThreadPool.java
7.4.2 线程源码讲解
- ThreadPoolExecutor 属性定义中频繁使用位移来表示线程状态
- 分析 ThreadPoolExecutor 关于 execute 方法的实现,
总结: 线程池使用注意点
- 合理配置各类参数, 应根据实际业务场景来设置合理的工作线程数;
- 线程资源必须通过线程池提供, 不允许应用中自行显示创建线程;
- 创建线程或线程池时请指定有意义的名称, 便于出错时的回溯;
7.5 ThreadLocal
原文地址:https://www.cnblogs.com/52liming/p/10686229.html