线程通信
一 使用Synchronized的线程
1.当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但java也提供了一些机制来保证线程协调运行。
Object类提供了wait(),notify()和notifyAll()三个方法,这三个方法属于Object类,但是必须由同步监视器来调用,可以分为以下两种情况:
(1)对于使用synchronized修饰的同步方法,因为该类的默认实例this就是同步监视器,所以可以在同步方法中直接调用这三个方法
(2)对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法
2.wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。无时间参数的wait会一直等待,直到其他线程通知,带毫秒的wait会等到指定时间后自动苏醒。调用wait()方法的当前线程会释放对该同步监视器的锁定。
notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都是在此同步监视器上等待,则会选择唤醒其中一个线程,选择是任意的。
notifyAll():唤醒在此同步监视器上等待的所有线程。
3.假设在系统中有两个线程,分别代表存款者和取钱者,系统要求存款者和取钱者不断重复存钱、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱,不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。
程序中可以通过一个旗标来标识账户中是否已有存款,当旗标为false时,账户中没有存款,存款者线程可以向下执行,当存款者存款后,旗标为true,并调用notify()或notifyAll()方法来唤醒其他线程,并调用wait()让存款者线程等待,取款者的操作类似。
Account.java
public class Account { private String accountNo; private double balance; private boolean flag=false; public Account(String accountNo,double balance){ this.accountNo=accountNo; this.balance=balance; } //因为账户余额不可以随便更改,所以只为balance提供getter方法 public double getBalance() { return balance; } public String getAccountNo() { return accountNo; } public void setAccountNo(String accountNo) { this.accountNo = accountNo; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Account account = (Account) o; return accountNo.equals(account.accountNo); } @Override public int hashCode() { return accountNo.hashCode(); } public synchronized void draw(double drawAmount){ try{ if(!flag){ wait(); }else{ System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount); balance-=drawAmount; System.out.println("账户余额为:"+balance); flag=false; notifyAll(); } }catch(InterruptedException ex){ ex.printStackTrace(); } } public synchronized void depoist(double depoistAmount){ try{ if(flag){ wait(); }else{ System.out.println(Thread.currentThread().getName()+"存款:"+depoistAmount); balance+=depoistAmount; System.out.println("账户余额为:"+balance); flag=true; notifyAll(); } }catch(InterruptedException ex){ ex.printStackTrace(); } } }
DrawThread.java
public class DrawThread extends Thread { private Account account; private double drawAmount; public DrawThread(String name, Account account, double drawAmount) { super(name); this.account = account; this.drawAmount = drawAmount; } public void run(){ for(int i=0;i<100;i++) { account.draw(drawAmount); } } }
DepositThread.java
public class DepositThread extends Thread { private Account account; private double depositAmount; public DepositThread(String name, Account account, double depositAmount) { super(name); this.account = account; this.depositAmount = depositAmount; } public void run(){ for(int i=0;i<100;i++){ account.depoist(depositAmount); } } }
DrawTest.java
public class DrawTest { public static void main(String[] args){ Account acct=new Account("1234567",0); new DrawThread("取钱者",acct,800).start(); new DepositThread("存款者甲",acct,800).start(); new DepositThread("存钱者乙",acct,800).start(); new DepositThread("存钱者丙",acct,800).start(); } }
结果:
存款者甲存款:800.0
账户余额为:800.0
取钱者取钱:800.0
账户余额为:0.0
从结果可以发现,三个存款者线程随机的向账户中存款,只有一个取款者取钱。但是程序最后被阻塞无法继续向下执行,这是因为三个存款者线程共有300此存款操作,而一个取钱者线程只有100次取钱操作,程序最后被阻塞,如下:
存钱者丙存款:800.0
账户余额为:800.0
这里的阻塞并不是死锁,只是取钱者线程已经执行完毕,而存款者线程还在等待其他线程来取钱而已,并不是等待其他线程释放同步监视器。
二 使用Condition控制线程通信
1.如果程序中直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,就不能使用wait(),notify(),notifyAll()进行线程通信了。Java提供了一个Condition来保持协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock锁,也可以唤醒其他处于等待的线程。Condition实例被绑定在了一个Lock对象上,只要调用Lock对象的newCondition()方法即可。
2.Condition提供了如下三个方法:
await():同wait()方法,但是该方法有更多的变体,如long awaitNanos(long nanosTimeout),void awaitUninterruptibly(),awaitUtil(Date deadline)等
signal():同notify()
signalAll():同notifyAll()
3.Account.java
public class Account { private final Lock lock=new ReentrantLock(); private final Condition cond=lock.newCondition(); private String accountNo; private double balance; private boolean flag=false; public Account(String accountNo,double balance){ this.accountNo=accountNo; this.balance=balance; } //因为账户余额不可以随便更改,所以只为balance提供getter方法 public double getBalance() { return balance; } public String getAccountNo() { return accountNo; } public void setAccountNo(String accountNo) { this.accountNo = accountNo; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Account account = (Account) o; return accountNo.equals(account.accountNo); } @Override public int hashCode() { return accountNo.hashCode(); } public synchronized void draw(double drawAmount){ lock.lock(); try{ if(!flag){ cond.await(); }else{ System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount); balance-=drawAmount; System.out.println("账户余额为:"+balance); flag=false; cond.signalAll(); } }catch(InterruptedException ex){ ex.printStackTrace(); }finally{ lock.unlock(); } } public synchronized void deposit(double depositAmount){ lock.lock(); try{ if(flag){ cond.await(); }else{ System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount); balance+=depositAmount; System.out.println("账户余额为:"+balance); flag=true; cond.signalAll(); } }catch(InterruptedException ex){ ex.printStackTrace(); }finally{ lock.unlock(); } } }
其他的几个java文件不变,运行结果是一样的。
三 使用阻塞队列(BlockingQueue)控制线程通信
1.BlockingQueue是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。它的一个特征是:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。程序的两个线程交替向BlockingQueue中放入、取出元素,即可很好的控制线程的通信。
2.提供了两个支持阻塞的方法:
put(E e):试图把E元素放入BlockingQueue中,如果队列的元素已满,则阻塞该线程;
take():尝试从BlockingQueue的头部取出元素,如果队列的元素已空,则阻塞该线程;
3.BlockingQueue继承了Queue接口,可以使用Queue接口中的方法,归纳起来分为三组:
(1)在队列尾部插入元素,包括add(E e),offer(E e),put(E e),当队列已满时,分别会抛出异常、返回false、阻塞队列;
(2)在队列头部删除并返回删除的元素,包括remove(),poll()和take(),当队列已空时,分别会抛出异常、返回false、阻塞队列;
(3)在队列头部取出但不删除元素,包括element()和peek()方法,当队列已空时,着三个方法分别抛出异常、返回false
4.Java7之后又新增了一些阻塞队列,包含5个实现类:
(1)ArrayBlockingQueue:基于数组实现的BlockingQueue队列;
(2)LinkedBlockingQueue:基于链表实现的BlockingQueue队列;
(3)PriorityBlockingQueue:它并不是标准的阻塞队列,当调用remove()扥方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。判断元素的大小可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
(4)SynchronousQueue:同步队列,对该队列的存取操作必须交替进行;
(5)DelayQueue:它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现,并要求所有的集合元素都要实现Delay接口。
5.举例
public class BlockingQueueTest{ public static void main(String[] args) throws Exception{ BlockingQueue<String> bq=new ArrayBlockingQueue<String>(2); bq.put("java"); bq.put("study"); bq.put("javaee"); //阻塞线程 } }
四 线程池
1.系统启动一个新线程的成本是比较高的,因为涉及到与操作系统的交互,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量生存期很短的线程时,更要考虑使用线程池。线程池在系统启动时就创建大量的空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个线程来执行它的run()或call()方法,当方法结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一个Runnable对象的run()或call()方法。使用线程池还可以控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。
2.Java5之后支持内建线程池,通过一个Executors工厂类来产生线程池,该工厂类包含以下几个静态工厂方法来创建线程池:
(1)newCachedThreadPool():创建一个具有缓存功能的线程池
(2)newFixedThreadPool( int nThreads):创建一个可重用的、具有固定线程数的线程池
(3)newSingleThreadPool():创建一个只有单线程的线程池
(4)newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。
(6)newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
(7)ExecutorService newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争
(8)ExecutorService newWorkStealingPool():上一个方法的简化版本,如果当前机器有4个CPU,则目标并行级别被设置为4,也就是相当于前一个方法传入4作为参数
注意:前三个方法返回一个ExecutorService对象,该对象代表一个线程池,可以执行Runnable或Callable对象所代表的线程;中间两个方法返回ScheduledExecutorService线程池,它可以在指定延迟后执行线程任务;后两个是Java8新增的,充分利用了多CPU并行的能力,这两个方法生成的work stealing池,相当于后台线程池,如果所有的后台线程都死亡了,work stealing池中的线程会自动死亡。
3.ExecutorService里提供了如下三个方法:
(1)Future<?> submit(Runnable task): 将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。这里的Future对象代表Runnable任务的返回值,但run()方法无返回值,所以在run()方法执行结束后返回null。但可以调用Future的isDone(),isCancelled()来获得Runnable对象的执行状态。
(2)<T> Future<T> submit(Runnable task, T result): 这里的result显式指定线程执行结束的返回值,所以Future对象将在run()方法执行结束后返回result
(3)<T> Future<T> submit(Callable<T> task): 这里提交的是Callable对象
4.ScheduledExecutorService提供了4个方法:
(1)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit):指定callable任务将在delay延迟后执行。
(2)ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任务将在delay延迟后执行。
(3)ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 指定command任务将在delay延迟后执行,而且以设定频率重复执行,也就是说,在initialDelay后开始执行,一次在initialDelay+period,initialDelay+2*period...处重复执行。
(4)ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):
创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都会存在给定的延迟。如果任务在某次执行时遇到异常,就会取消后续执行;否则,只会通过程序来显示取消或终止该任务。
5.用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,线程池不再接收新任务,但会将所有已提交任务执行完成;也可以调用shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
6.举例
public class ThreadPoolTest { public static void main(String[] args) throws Exception{ ExecutorService pool= Executors.newFixedThreadPool(6); Runnable target= ()->{ for(int i=0;i<100;i++){ System.out.println(Thread.currentThread().getPriority()+"的i值为:"+i); } }; //向线程池中提交两个线程 pool.submit(target) pool.submit(target); pool.shutdown(); } }
五 ThreadLocal类
1.java为ThreadLocal类增加了泛型支持,使用ThreadLocal类可以简化多线程编程时的并发访问,使用这个工具类可以很简捷的隔离多线程程序的竞争资源。
2.ThreadLocal,是Thread Local Variable (线程局部变量) 的意思,它的功能很简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立的改变自己的副本,而不会与其他线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量一样。
3.ThreadLocal类只提供了三个public方法:
T get():返回此线程局部变量中当前线程副本中的值
void remove():删除此线程局部变量中当前线程的值
void set(T value):设置此线程局部变量中当前线程副本中的值
4.ThreadLocalTest.java
class Account{ //定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量,每个线程都会保留该变量的一个副本 private ThreadLocal<String> name=new ThreadLocal<>(); public Account(String str){ this.name.set(str); //用于访问当前线程的name的副本 System.out.println("---"+this.name.get()); } public String getName(){ return name.get(); } public void setName(String str){ this.name.set(str); } } class MyTest extends Thread{ private Account account; public MyTest(String name, Account account) { super(name); this.account = account; } public void run(){ for(int i=0;i<10;i++){ if(i==6) { account.setName(getName()); } System.out.println(account.getName()+"账户的i值:"+i); } } } public class ThreadLocalTest { public static void main(String[] args){ //虽然两个线程共享同一个账户,即只有一个账户名,但由于账户名是ThreadLocal类型的,所以每个线程都完全拥有各自的账户名副本,因此在i==6之后,将看到两个线程访问同一个账户时出现不同的账户名 Account at=new Account("初始名"); new MyTest("线程甲",at).start(); new MyTest("线程乙",at).start(); } }
结果:
---初始名
null账户的i值:0
null账户的i值:0
null账户的i值:1
null账户的i值:1
null账户的i值:2
null账户的i值:2
null账户的i值:3
null账户的i值:3
null账户的i值:4
null账户的i值:4
null账户的i值:5
null账户的i值:5
线程甲账户的i值:6
线程乙账户的i值:6
线程甲账户的i值:7
线程乙账户的i值:7
线程甲账户的i值:8
线程乙账户的i值:8
线程甲账户的i值:9
线程乙账户的i值:9
实际上账户名有三个副本,主线程一个,另外启动的两个线程各一个,它们的值互不干扰,每个线程都完全拥有自己的ThreadLocal变量。ThreadLocal将需要并发访问的资源复制多份,每个线程拥有一份资源,每个线程都拥有自己的线程副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象与线程相关的状态使用ThreadLocal保存。
5.ThreadLocal并不能代替同步机制,两者面向的问题领域不同,同步机制是为了同步多个线程对相同资源的并发访问,是多个线程之间进行通信的有效方式,而ThreadLocal是为了隔离多个线程的数据共享,从根本上避免多个线程之间对共享资源的竞争。