之前的例子都是多个线程执行同一种任务,下面开始讨论多个线程执行不同任务的情况。
举个例子:有个仓库专门存储货物,有的货车专门将货物送往仓库,有的货车则专门将货物拉出仓库,这两种货车的任务不同,而且为了完成任务需要彼此相互合作,如果仓库中没有货物了而将货物拉出仓库的货车先到达了,那么它只有先等待其它货车将货物送入仓库......这种情况和线程间通信的情况很相似。
一、问题的提出-单生产者单消费者模式。
需求:定义一个容器,存储了字段姓名和性别,一个线程0为姓名和性别赋值,赋值完毕之后另一个线程1取走姓名和性别,接着线程0再为姓名和性别赋值,线程1再取走。。。
思路:定义一个资源类Resource存放姓名和性别,定义一个输入线程Input为姓名和性别赋值,定义一个输出线程Output取走姓名和性别。
1.原始代码
1 /* 2 线程安全性问题的产生 3 */ 4 class Resource 5 { 6 String name; 7 String sex; 8 } 9 class Input implements Runnable 10 { 11 Resource r; 12 public Input(){} 13 public Input(Resource r) 14 { 15 this.r=r; 16 } 17 public void run() 18 { 19 boolean flag=false; 20 while(true) 21 { 22 if(flag) 23 { 24 r.name="Mike"; 25 r.sex="nan"; 26 } 27 else 28 { 29 r.name="丽丽"; 30 r.sex="女女女女女女女女女女女女"; 31 } 32 flag=!flag; 33 } 34 } 35 } 36 class Output implements Runnable 37 { 38 Resource r; 39 public Output(){} 40 public Output(Resource r) 41 { 42 this.r=r; 43 } 44 public void run() 45 { 46 while(true) 47 { 48 System.out.println(r.name+"-----------------"+r.sex); 49 } 50 } 51 } 52 public class Demo 53 { 54 public static void main(String args[]) 55 { 56 Resource r=new Resource(); 57 Input in=new Input(r); 58 Output out=new Output(r); 59 Thread t0=new Thread(in); 60 Thread t1=new Thread(out); 61 t0.start(); 62 t1.start(); 63 } 64 }
运行结果:
2.改进代码,用加锁解决男女性别紊乱问题
观察结果,我们可以发现最严重的问题就是丽丽性别有时候变成了男,而Mike性别有时候变成了女
原因分析:
多线程操作共享资源的代码不止一条。
很明显,对操作共享资源的代码要加上锁,这里使用资源r即可。
我们可不可以对输入输出线程加上不同的锁?
不行。原因:加上锁的目的是为了让访问同一资源的线程保持唯一,也就是说总是要保持只有一个线程访问资源,事实上虽然加了锁,但是CPU也会在未执行完同步代码块之前就切换到其它线程,如果其他线程加了同样的锁,那么由于上一个线程占有了锁,所以CPU无法进入当前的同步代码块,当切换到上一个线程的时候,会在中断处继续执行。这样就保证了访问资源的线程只有一个。本例中由于这个原因,必须加上同一把锁。
1 class Resource 2 { 3 String name; 4 String sex; 5 } 6 class Input implements Runnable 7 { 8 Resource r; 9 public Input(){} 10 public Input(Resource r) 11 { 12 this.r=r; 13 } 14 public void run() 15 { 16 boolean flag=false; 17 while(true) 18 { 19 synchronized(r)//使用Resource对象锁,因为对于输入输出线程来说,是唯一的 20 { 21 if(flag) 22 { 23 r.name="Mike"; 24 r.sex="nan"; 25 } 26 else 27 { 28 r.name="丽丽"; 29 r.sex="女女女女女女女女女女女女"; 30 } 31 } 32 flag=!flag; 33 } 34 } 35 } 36 class Output implements Runnable 37 { 38 Resource r; 39 public Output(){} 40 public Output(Resource r) 41 { 42 this.r=r; 43 } 44 public void run() 45 { 46 while(true) 47 { 48 synchronized(r) 49 { 50 System.out.println(r.name+"-----------------"+r.sex); 51 } 52 } 53 } 54 } 55 public class Demo 56 { 57 public static void main(String args[]) 58 { 59 Resource r=new Resource(); 60 Input in=new Input(r); 61 Output out=new Output(r); 62 Thread input=new Thread(in); 63 Thread output=new Thread(out); 64 input.start(); 65 output.start(); 66 } 67 }
运行结果:
3.改进代码,用等待唤醒机制解决输出一大片一大片相同数据的情况。
观察运行结果,可以发现虽然性别转为正常,但是输出却还是有问题。怎样改变代码才能解决输出一大片一大片相同数据的情况?
分析:由于线程得到CPU执行权后在时间片用完之前会一直占有,所以会进行多次循环打印。
解决方法:使用等待唤醒机制解决这个问题。
等待唤醒机制中涉及到三个方法:
wait:使得线程进入冻结状态,除非调用notify(可能会唤醒)或者notifyAll方法(一定会唤醒),否则会一直保持冻结状态,注意它将会释放锁。
notify:在当前线程池中唤醒任意一条线程。
notifyAll:唤醒当前线程池中的全部线程。
针对这个问题,使用等待唤醒机制的过程:当input线程赋完值之后先唤醒output线程让它取走数据,自己则先睡一会儿等待output线程唤醒自己;output线程取走数据之后,先唤醒input线程,自己再去睡一会儿等待input线程唤醒自己。。。。如此循环即可达到目的。为此需要一个标志变量flag标识当前资源的状态,true标识资源存在,output线程可以取走数据,而input线程则需要等待output线程取走数据;false标识资源为空,input线程可以输入数据,而output线程则要等待input线程输入数据。
1 class Resource 2 { 3 boolean flag=false;//表名一开始的时候并没有姓名和性别,需要输入。 4 String name; 5 String sex; 6 } 7 class Input implements Runnable 8 { 9 Resource r; 10 public Input(){} 11 public Input(Resource r) 12 { 13 this.r=r; 14 } 15 public void run() 16 { 17 boolean flag=false; 18 while(true) 19 { 20 synchronized(r)//使用Resource对象锁,因为对于输入输出线程来说,是唯一的 21 { 22 if(r.flag==true) 23 try 24 { 25 r.wait();//让输入线程被等待 26 } 27 catch (InterruptedException e){} 28 29 if(flag) 30 { 31 r.name="Mike"; 32 r.sex="nan"; 33 } 34 else 35 { 36 r.name="丽丽"; 37 r.sex="女女女女女女女女女女女女"; 38 } 39 r.flag=true; 40 r.notify();//唤醒输出线程。 41 } 42 flag=!flag; 43 } 44 } 45 } 46 class Output implements Runnable 47 { 48 Resource r; 49 public Output(){} 50 public Output(Resource r) 51 { 52 this.r=r; 53 } 54 public void run() 55 { 56 while(true) 57 { 58 synchronized(r) 59 { 60 if(r.flag==false) 61 try 62 { 63 r.wait();//让输出线程被等待 64 } 65 catch (InterruptedException e) 66 { 67 } 68 System.out.println(r.name+"-----------------"+r.sex); 69 r.flag=false; 70 r.notify();//告诉生产线已经没有货了 71 } 72 } 73 } 74 } 75 public class Demo 76 { 77 public static void main(String args[]) 78 { 79 Resource r=new Resource(); 80 Input in=new Input(r); 81 Output out=new Output(r); 82 Thread input=new Thread(in); 83 Thread output=new Thread(out); 84 input.start(); 85 output.start(); 86 } 87 }
4.代码优化
观察代码可以发现,资源类中的字段均为友好型的,在实际开发中应当使用私有形变量并且提供对外的访问方法。
代码优化之后:
1 class Resource 2 { 3 boolean flag=false;//表名一开始的时候并没有姓名和性别,需要输入。 4 private String name; 5 private String sex; 6 public synchronized void set(String name,String sex) 7 { 8 if(this.flag==true) 9 { 10 try 11 { 12 this.wait(); 13 } 14 catch (InterruptedException e) 15 { 16 } 17 } 18 this.name=name; 19 this.sex=sex; 20 this.flag=true; 21 this.notify(); 22 } 23 public synchronized void out() 24 { 25 if(this.flag==false) 26 { 27 try 28 { 29 this.wait(); 30 } 31 catch (InterruptedException e) 32 { 33 } 34 } 35 System.out.println(this.name+"---------++--------"+this.sex); 36 this.flag=false; 37 this.notify(); 38 } 39 } 40 class Input implements Runnable 41 { 42 Resource r; 43 public Input(){} 44 public Input(Resource r) 45 { 46 this.r=r; 47 } 48 public void run() 49 { 50 boolean flag=false; 51 while(true) 52 { 53 if(flag) 54 r.set("Mike","nan"); 55 else 56 r.set("丽丽","女女女女女女女女女"); 57 58 flag=!flag; 59 } 60 } 61 } 62 class Output implements Runnable 63 { 64 Resource r; 65 public Output(){} 66 public Output(Resource r) 67 { 68 this.r=r; 69 } 70 public void run() 71 { 72 while(true) 73 { 74 r.out(); 75 } 76 } 77 } 78 public class Demo 79 { 80 public static void main(String args[]) 81 { 82 Resource r=new Resource(); 83 Input in=new Input(r); 84 Output out=new Output(r); 85 Thread input=new Thread(in); 86 Thread output=new Thread(out); 87 input.start(); 88 output.start(); 89 } 90 }
代码出现了较大的改动,并且代码结构结构变得清晰合理。
二、多生产者、多消费者问题
1.单生产者、单消费者问题
为了便于说明,先从单生产者单消费者问题开始讲解。
需求:生产者生产一个烤鸭,消费者消费一个烤鸭。。
1 /* 2 单生产者单消费者问题,和上一个问题非常相似 3 */ 4 class Resource 5 { 6 private boolean flag=false; 7 private String name; 8 private int count=1; 9 public synchronized void set(String name ) 10 { 11 if(this.flag) 12 try 13 { 14 this.wait(); 15 } 16 catch (InterruptedException e) 17 { 18 } 19 this.name=name+count++; 20 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 21 this.flag=true; 22 this.notify(); 23 } 24 public synchronized void out() 25 { 26 if(!this.flag) 27 try 28 { 29 this.wait(); 30 } 31 catch (InterruptedException e) 32 { 33 } 34 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 35 this.flag=false; 36 notify(); 37 } 38 } 39 class Procedure implements Runnable 40 { 41 private Resource r; 42 public Procedure(Resource r) 43 { 44 this.r=r; 45 } 46 public void run() 47 { 48 while(true) 49 { 50 r.set("烤鸭"); 51 } 52 } 53 } 54 55 class Consumer implements Runnable 56 { 57 private Resource r; 58 public Consumer(Resource r) 59 { 60 this.r=r; 61 } 62 public void run() 63 { 64 while(true) 65 { 66 r.out(); 67 } 68 } 69 } 70 public class Demo 71 { 72 public static void main(String args[]) 73 { 74 Resource r=new Resource(); 75 Procedure pro=new Procedure(r); 76 Consumer con=new Consumer(r); 77 Thread t0=new Thread(pro); 78 Thread t1=new Thread(con); 79 t0.start(); 80 t1.start(); 81 } 82 }
观察上述运行结果,可以发现线程0生产,线程1消费,没有任何问题。
2.多生产者,多消费者问题
如果需要增加生产者、消费者的数量,是不是简单地增加多个线程就可以了?
1 /* 2 现在讨论多生产者、多消费者问题。 3 1.多生产了没消费的问题。 4 2.多消费了同一件的问题。 5 */ 6 class Resource 7 { 8 private boolean flag=false; 9 private String name; 10 private int count=1; 11 public synchronized void set(String name ) 12 { 13 if(this.flag) 14 try 15 { 16 this.wait();//t0 t1(活) 17 } 18 catch (InterruptedException e) 19 { 20 } 21 this.name=name+count++; 22 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 23 this.flag=true; 24 this.notify(); 25 } 26 public synchronized void out() 27 { 28 if(!this.flag) 29 try 30 { 31 this.wait();//t2 t3 32 } 33 catch (InterruptedException e) 34 { 35 } 36 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 37 this.flag=false; 38 notify(); 39 } 40 } 41 class Procedure implements Runnable 42 { 43 private Resource r; 44 public Procedure(Resource r) 45 { 46 this.r=r; 47 } 48 public void run() 49 { 50 while(true) 51 { 52 r.set("烤鸭"); 53 } 54 } 55 } 56 57 class Consumer implements Runnable 58 { 59 private Resource r; 60 public Consumer(Resource r) 61 { 62 this.r=r; 63 } 64 public void run() 65 { 66 while(true) 67 { 68 r.out(); 69 } 70 } 71 } 72 public class Demo 73 { 74 public static void main(String args[]) 75 { 76 Resource r=new Resource(); 77 Procedure pro=new Procedure(r); 78 Consumer con=new Consumer(r); 79 Thread t0=new Thread(pro); 80 Thread t1=new Thread(pro); 81 Thread t2=new Thread(con); 82 Thread t3=new Thread(con); 83 t0.start(); 84 t1.start(); 85 t2.start(); 86 t3.start(); 87 } 88 }
我们可以发现生产了几个烤鸭只有一个被消费或者同一个烤鸭被消费了多次,这是一种线程安全性问题的现象。
3.解决多生产、少消费的情况和消费同一个烤鸭被消费多次的情况
为什么会发生这种现象?
解析:我们知道线程0、1只能运行输入线程的代码,2,3只能运行输出线程的代码。
假设线程0开始生产,生产完毕之后进入冻结状态,CPU切换到线程1,线程1发现有烤鸭,所以也进入冻结状态;线程2开始消费,并唤醒线程0,线程0加入堵塞队列,CPU切换到线程3,线程3发现没有烤鸭了,所以也进入堵塞状态。这时候线程池中有三个线程,分别是生产者线程1和消费者线程2、3,能够执行任务的只有线程0,线程0开始运作,它将不再判断flag而直接生产烤鸭,同时唤醒线程池中的一个线程,并进入冻结状态。关键问题就来了,它将会唤醒哪个线程?如果是线程2或者线程3,都不会出现问题,但是如果唤醒了线程1,那么线程1将不再判断flag而直接生产烤鸭,同时唤醒线程池中的一个线程。这时候问题已经很明显了,连续生产了两个烤鸭。如果下一个唤醒的线程是线程2或者3,将会被消费掉一个烤鸭,但是如果唤醒的是刚刚进入冻结状态的线程0,那么将会继续生产烤鸭。。极端情况就是连续一大片都在生产烤鸭而没有消费者消费烤鸭。
同理,连续消费同一个烤鸭多次也就不足为奇了。
解决方法:很明显是由于线程醒过来之后没有在此判断flag造成的,所以我们将if改成while即可循环判断
1 class Resource 2 { 3 private boolean flag=false; 4 private String name; 5 private int count=1; 6 public synchronized void set(String name ) 7 { 8 while(this.flag) 9 try 10 { 11 this.wait();//t0 t1(活) 12 } 13 catch (InterruptedException e) 14 { 15 } 16 this.name=name+count++; 17 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 18 this.flag=true; 19 this.notify(); 20 } 21 public synchronized void out() 22 { 23 while(!this.flag) 24 try 25 { 26 this.wait();//t2 t3 27 } 28 catch (InterruptedException e) 29 { 30 } 31 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 32 this.flag=false; 33 notify(); 34 } 35 } 36 class Procedure implements Runnable 37 { 38 private Resource r; 39 public Procedure(Resource r) 40 { 41 this.r=r; 42 } 43 public void run() 44 { 45 while(true) 46 { 47 r.set("烤鸭"); 48 } 49 } 50 } 51 52 class Consumer implements Runnable 53 { 54 private Resource r; 55 public Consumer(Resource r) 56 { 57 this.r=r; 58 } 59 public void run() 60 { 61 while(true) 62 { 63 r.out(); 64 } 65 } 66 } 67 public class Demo 68 { 69 public static void main(String args[]) 70 { 71 Resource r=new Resource(); 72 Procedure pro=new Procedure(r); 73 Consumer con=new Consumer(r); 74 Thread t0=new Thread(pro); 75 Thread t1=new Thread(pro); 76 Thread t2=new Thread(con); 77 Thread t3=new Thread(con); 78 t0.start(); 79 t1.start(); 80 t2.start(); 81 t3.start(); 82 } 83 }
运行的结果是停在了第四行,光标不断闪烁但没有输出,我们很容易的就发现了,这是一种死锁现象。
4.解决死锁问题。
原因分析:
假设线程0开始生产,生产完毕之后进入冻结状态,CPU切换到线程1,线程1发现有烤鸭,所以也进入冻结状态;线程2开始消费,并唤醒线程0,线程0加入堵塞队列,CPU切换到线程3,线程3发现没有烤鸭了,所以也进入堵塞状态。这时候线程池中有三个线程,分别是生产者线程1和消费者线程2、3,能够执行任务的只有线程0,线程0开始运作(这部分分析和上面的分析相同),判断完flag,它将会生产一个烤鸭,同时唤醒线程池中的一个线程。这时候关键问题就来了,它将唤醒哪个线程?如果是线程2或者3,也没有问题,运行的时候会发现很“和谐”,但是如果唤醒的是线程1,线程1醒来之后会先判断标记,发现标记为true,所以它将继续睡,进入冻结状态,而线程0进入下一次循环并判断标记,也进入冻结状态。所以,现在就发生了四个线程全部处于冻结状态的情况,这也是一种死锁情况。
解决思路:
很明显,由于notify唤醒的线程有可能是对方的线程,但也有可能是己方的线程,当唤醒的是己方的线程时,在一定条件下就会产生死锁。如果我们能够只唤醒对方的线程,那么问题就解决了。但是现在先不讨论那种情况,该怎么解决这个问题?答案就是使用notifyAll方法唤醒全部线程。
1 /* 2 现在讨论多生产者、多消费者问题。 3 解决多生产者多消费者问题出现的死锁现象:将notify改成notifyAll,这样将会唤醒本方线程和对方线程 4 5 */ 6 class Resource 7 { 8 private boolean flag=false; 9 private String name; 10 private int count=1; 11 public synchronized void set(String name ) 12 { 13 while(this.flag) 14 try 15 { 16 this.wait();//t0 t1(活) 17 } 18 catch (InterruptedException e) 19 { 20 } 21 this.name=name+count++; 22 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 23 this.flag=true; 24 this.notifyAll(); 25 } 26 public synchronized void out() 27 { 28 while(!this.flag) 29 try 30 { 31 this.wait();//t2 t3 32 } 33 catch (InterruptedException e) 34 { 35 } 36 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 37 this.flag=false; 38 notifyAll(); 39 } 40 } 41 class Procedure implements Runnable 42 { 43 private Resource r; 44 public Procedure(Resource r) 45 { 46 this.r=r; 47 } 48 public void run() 49 { 50 while(true) 51 { 52 r.set("烤鸭"); 53 } 54 } 55 } 56 57 class Consumer implements Runnable 58 { 59 private Resource r; 60 public Consumer(Resource r) 61 { 62 this.r=r; 63 } 64 public void run() 65 { 66 while(true) 67 { 68 r.out(); 69 } 70 } 71 } 72 public class Demo 73 { 74 public static void main(String args[]) 75 { 76 Resource r=new Resource(); 77 Procedure pro=new Procedure(r); 78 Consumer con=new Consumer(r); 79 Thread t0=new Thread(pro); 80 Thread t1=new Thread(pro); 81 Thread t2=new Thread(con); 82 Thread t3=new Thread(con); 83 t0.start(); 84 t1.start(); 85 t2.start(); 86 t3.start(); 87 } 88 }
现象:
现在问题全部解决了。但是很明显,由于notifyAll方法唤醒了全部线程,所以它的效率不高,特别是在线程数量特别多的情况下尤其明显。
三、多生产者、多消费者代码优化。
1.JDK1.5新特性。
JDK1.5针对多线程编程中存在的效率不高的问题做出了优化,特别是针对notifyAll方法,做出了重大改进。
新工具所在包:java.util.concurrent.locks
1.1接口:Lock。
此接口封装了获取锁的方法lock()和释放锁的方法unlock(),这样就将synchronized获取锁和释放锁的隐式过程显式化。
由于Lock是接口名,不能直接new对象,所以要使用实现此接口的已知类,比如ReentrantLock,使用Lock lock=new ReentrantLock();即可得到Lock接口的实例。
这样就完成了自定义锁的过程。
应当注意,由于获取锁和释放锁的过程显式化,我们必须将释放锁的动作放在finally块中才能保证线程的安全性。如果线程在未释放所之前发生了异常,那么它将停止程序的运行,并返回上一级调用处,但是它仍然是锁的占有者,这样别的线程将不会有机会访问共享资源。
1.2接口:Condition。
Condition接口封装了三个重要的方法:await() 、signal() 、signalAll(),这三个方法对应着Object类中的wait() 、notify() 、notifyAll()方法。
为什么要使用Condition接口?
由于Lock接口的出现,导致了不使用synchronized关键字也可以,但是自定义锁上并没有这三个重要的方法,这是因为JDK1.5规定了一把锁上可以有多个监视器,如果这三个方法称为Lock接口中的成员,将会使得一把锁上只能有一个监视器,和原来相比就没有了优势。使用Condition接口的最大好处就是可以指定唤醒的线程是对方的上线程还是己方的线程,这样就能大大的提高工作效率。
怎么获得实现了Condition接口的对象?
使用Lock接口中的方法:newCondition();
即Condition con=lock.newCondition();
2.代码演示
2.1改造成和原来的代码相同的效果。
1 /* 2 现在讨论多生产者、多消费者问题。 3 使用JDK1.5之后的新特性 4 改变之后和原来的效果相同,效率没有提高。 5 */ 6 import java.util.concurrent.locks.*; 7 class Resource 8 { 9 private boolean flag=false; 10 private String name; 11 private int count=1; 12 Lock lock=new ReentrantLock(); 13 Condition con=lock.newCondition(); 14 public void set(String name ) 15 { 16 lock.lock(); 17 try 18 { 19 while(this.flag) 20 try 21 { 22 con.await();//t0 t1(活) 23 } 24 catch (InterruptedException e) 25 { 26 } 27 this.name=name+count++; 28 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 29 this.flag=true; 30 con.signalAll(); 31 } 32 finally 33 { 34 lock.unlock(); 35 } 36 } 37 public void out() 38 { 39 lock.lock(); 40 try 41 { 42 while(!this.flag) 43 try 44 { 45 con.await();//t2 t3 46 } 47 catch (InterruptedException e) 48 { 49 } 50 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 51 this.flag=false; 52 con.signalAll(); 53 } 54 finally 55 { 56 lock.unlock(); 57 } 58 } 59 } 60 class Procedure implements Runnable 61 { 62 private Resource r; 63 public Procedure(Resource r) 64 { 65 this.r=r; 66 } 67 public void run() 68 { 69 while(true) 70 { 71 r.set("烤鸭"); 72 } 73 } 74 } 75 76 class Consumer implements Runnable 77 { 78 private Resource r; 79 public Consumer(Resource r) 80 { 81 this.r=r; 82 } 83 public void run() 84 { 85 while(true) 86 { 87 r.out(); 88 } 89 } 90 } 91 public class Demo 92 { 93 public static void main(String args[]) 94 { 95 Resource r=new Resource(); 96 Procedure pro=new Procedure(r); 97 Consumer con=new Consumer(r); 98 Thread t0=new Thread(pro); 99 Thread t1=new Thread(pro); 100 Thread t2=new Thread(con); 101 Thread t3=new Thread(con); 102 t0.start(); 103 t1.start(); 104 t2.start(); 105 t3.start(); 106 } 107 }
运行的效果和原来相同,既没有死锁发生,也没有其他线程安全性问题。但是效率和以前完全相同,因为也唤醒了对方的线程。
2.2优化后的代码,只将对方线程唤醒。
1 import java.util.concurrent.locks.*; 2 class Resource 3 { 4 private boolean flag=false; 5 private String name; 6 private int count=1; 7 Lock lock=new ReentrantLock(); 8 Condition procedurers=lock.newCondition(); 9 Condition consumers=lock.newCondition(); 10 public void set(String name ) 11 { 12 lock.lock(); 13 try 14 { 15 while(this.flag) 16 try 17 { 18 procedurers.await();//t0 t1(活) 19 } 20 catch (InterruptedException e) 21 { 22 } 23 this.name=name+count++; 24 System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name); 25 this.flag=true; 26 consumers.signal(); 27 } 28 finally 29 { 30 lock.unlock(); 31 } 32 } 33 public void out() 34 { 35 lock.lock(); 36 try 37 { 38 while(!this.flag) 39 try 40 { 41 consumers.await();//t2 t3 42 } 43 catch (InterruptedException e) 44 { 45 } 46 System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name); 47 this.flag=false; 48 procedurers.signal(); 49 } 50 finally 51 { 52 lock.unlock(); 53 } 54 } 55 } 56 class Procedure implements Runnable 57 { 58 private Resource r; 59 public Procedure(Resource r) 60 { 61 this.r=r; 62 } 63 public void run() 64 { 65 while(true) 66 { 67 r.set("烤鸭"); 68 } 69 } 70 } 71 72 class Consumer implements Runnable 73 { 74 private Resource r; 75 public Consumer(Resource r) 76 { 77 this.r=r; 78 } 79 public void run() 80 { 81 while(true) 82 { 83 r.out(); 84 } 85 } 86 } 87 public class Demo 88 { 89 public static void main(String args[]) 90 { 91 Resource r=new Resource(); 92 Procedure pro=new Procedure(r); 93 Consumer con=new Consumer(r); 94 Thread t0=new Thread(pro); 95 Thread t1=new Thread(pro); 96 Thread t2=new Thread(con); 97 Thread t3=new Thread(con); 98 t0.start(); 99 t1.start(); 100 t2.start(); 101 t3.start(); 102 } 103 }
运行的效果和以前完全相同,但是效率却要更高。
四、真正的生产者消费者问题。
之前讨论的多生产者多消费者问题所用的资源类中只能存放1个烤鸭,多个生产者争着生产这一个烤鸭,多个消费者争着消费这一个烤鸭,这在实际生活中是不存在的。应当改造成这样:有一个可以存放多个烤鸭的容器,生产者将烤鸭生产完毕之后放入容器,消费者在后面消费,当容器满了,生产者停下等待消费者消费,消费者一旦消费掉一个烤鸭,就告诉生产者容器内可以存放新烤鸭了,生产者于是开始生产新的烤鸭并放在空位上;消费者发现容器空了,则等待生产者生产出烤鸭,生产者一旦生产出烤鸭就立即通知消费者容器内已经有烤鸭了,可以消费了。这才是一个生产者消费者问题的实际流程。
容器可以使用数组,生产者消费者都需要一个指针变量,表示下一个生产出的烤鸭的位置和下一个将要消费的烤鸭所在的位置。此外,应当有一个变量标识当前容器内有多少烤鸭,以便于决定生产者和消费者在容器已满和容器为空的时候的动作。
此外,需要一把同步锁,这是必须的,锁上有两个监视器,这是为了提高效率,原因前面已经解释过。
变量说民:
lock:锁名。
notFull:生产者监视器。
notEmpty:消费者监视器。
items:对象数组,存放“烤鸭”
putptr:指示生产者下一个生产出来的烤鸭应当存放在什么位置。
takeptr:指示消费者下一个将要消费的烤鸭所在的位置。
count:指示当前容器中的烤鸭数量,初始值是0。
容器的大小是5。
1.供大于求
1 import java.util.concurrent.locks.*; 2 class BoundedBuffer { 3 final Lock lock = new ReentrantLock(); 4 final Condition notFull = lock.newCondition(); 5 final Condition notEmpty = lock.newCondition(); 6 7 final Object[] items = new Object[5]; 8 int putptr, takeptr, count=0; 9 10 public void put(Object x) throws InterruptedException {//生产线 11 lock.lock(); 12 try { 13 while (count == items.length) 14 notFull.await();//生产线已经饱和,无需再生产,等待消费,进入冻结状态。 15 items[putptr] = x; //生产下一个产品。 16 17 18 System.out.println(Thread.currentThread().getName()+":----生产"+items[putptr]+putptr); 19 20 21 if (++putptr == items.length) putptr = 0;//如果生产到了尾部,则从头开始生产。 22 ++count;//数量+1 23 notEmpty.signal();//唤醒消费者开始消费 24 } finally { 25 lock.unlock(); 26 } 27 } 28 29 public Object take() throws InterruptedException { 30 lock.lock(); 31 try { 32 while (count == 0) //如果生产线上没有货物,则等待生产者生产,进入冻结状态。 33 notEmpty.await(); 34 Object x = items[takeptr]; //生产线上有货物了,消费货物 35 36 System.out.println(Thread.currentThread().getName()+":-----------消费"+items[takeptr]+takeptr); 37 38 39 if (++takeptr == items.length) takeptr = 0;//消费到了最后一个产品,则从头开始找货物 40 --count;//货物数量自减 41 notFull.signal();//唤醒生产线开始生产 42 return x; 43 } finally { 44 lock.unlock(); 45 } 46 } 47 } 48 49 class Procedure implements Runnable//生产者线程 50 { 51 public BoundedBuffer b; 52 public Procedure(BoundedBuffer b) 53 { 54 this.b=b; 55 } 56 public void run() 57 { 58 while(true) 59 { 60 try{ 61 Thread.sleep(1000); 62 b.put("烤鸭"); 63 } 64 catch(InterruptedException e) 65 { 66 } 67 } 68 } 69 } 70 class Consumer implements Runnable//消费者线程 71 { 72 public BoundedBuffer b; 73 public Consumer(BoundedBuffer b) 74 { 75 this.b=b; 76 } 77 public void run() 78 { 79 while(true) 80 { 81 82 try 83 { 84 Thread.sleep(1000); 85 b.take(); 86 } 87 catch(InterruptedException e) 88 { 89 } 90 } 91 } 92 } 93 public class Demo 94 { 95 public static void main(String args[])//使用三个生产者和两个消费者以便于观察现象 96 { 97 BoundedBuffer b=new BoundedBuffer(); 98 Procedure p=new Procedure(b); 99 Consumer c=new Consumer(b); 100 101 Thread t0=new Thread(p); 102 Thread t1=new Thread(p); 103 Thread t2=new Thread(p); 104 105 //Thread t3=new Thread(c); 106 Thread t4=new Thread(c); 107 Thread t5=new Thread(c); 108 109 t0.start(); 110 t1.start(); 111 t2.start(); 112 //t3.start(); 113 t4.start(); 114 t5.start(); 115 } 116 }
程序采用了生产延时和消费延时的方式,以便于观察现象但是不影响最终结果。
运行结果:
分析上述输出结果,我们可以发现一开始生产者的生产和消费者的消费不平衡,即生产者的生产>消费者的消费,但是后来生产者的生产和消费者的消费就达到了平衡,即生产者生产一个,消费者消费一个。这句话听起来没有问题,但其实是错误的,事实上是消费者消费掉一个,生产者生产出一个。原因就是在本程序中,生产者为3个,而消费者为2个,本程序没有给线程定义优先级,所以他们得到CPU的执行权的概率基本相同(生产能力和消费能力相同,此次分析建立在这个基础之上)。所以程序很快就进入了饱和状态。即最开始的时候生产者已经将容器填满,等消费者消费掉一个,生产者就生产一个。我们可以发现,当稳定的时候,消费者消费烤鸭的标号总是==生产者生产烤鸭的编号+1,这就是供大于求的证明。事实上这时候容器一直处于满的状态。
举一个不太恰当的例子:操场上两个人甲乙一起跑步,甲跑的快,很快就跑完了两圈,但是这时候乙才跑完了一圈,这时候甲乙就又碰面了,貌似是跑的一样快,但是其实甲已经多跑了一圈,本应当超过乙继续跑,但是乙却挡在甲的前面不让甲过去,这样甲就不得不让乙先跑,乙跑一步,甲就在后面跟着乙跑一步。在这个例子中,甲相当于生产者,乙相当于消费者,这是供大于求的情况。
2.供小于求
我们将生产者的数量改成2,将消费者的数量改为3,再看看结果怎么样。
1 import java.util.concurrent.locks.*; 2 class BoundedBuffer { 3 final Lock lock = new ReentrantLock(); 4 final Condition notFull = lock.newCondition(); 5 final Condition notEmpty = lock.newCondition(); 6 7 final Object[] items = new Object[5]; 8 int putptr, takeptr, count=0; 9 10 public void put(Object x) throws InterruptedException {//生产线 11 lock.lock(); 12 try { 13 while (count == items.length) 14 notFull.await();//生产线已经饱和,无需再生产,等待消费,进入冻结状态。 15 items[putptr] = x; //生产下一个产品。 16 17 18 System.out.println(Thread.currentThread().getName()+":----生产"+items[putptr]+putptr); 19 20 21 if (++putptr == items.length) putptr = 0;//如果生产到了尾部,则从头开始生产。 22 ++count;//数量+1 23 notEmpty.signal();//唤醒消费者开始消费 24 } finally { 25 lock.unlock(); 26 } 27 } 28 29 public Object take() throws InterruptedException { 30 lock.lock(); 31 try { 32 while (count == 0) //如果生产线上没有货物,则等待生产者生产,进入冻结状态。 33 notEmpty.await(); 34 Object x = items[takeptr]; //生产线上有货物了,消费货物 35 36 System.out.println(Thread.currentThread().getName()+":-----------消费"+items[takeptr]+takeptr); 37 38 39 if (++takeptr == items.length) takeptr = 0;//消费到了最后一个产品,则从头开始找货物 40 --count;//货物数量自减 41 notFull.signal();//唤醒生产线开始生产 42 return x; 43 } finally { 44 lock.unlock(); 45 } 46 } 47 } 48 49 class Procedure implements Runnable//生产者线程 50 { 51 public BoundedBuffer b; 52 public Procedure(BoundedBuffer b) 53 { 54 this.b=b; 55 } 56 public void run() 57 { 58 while(true) 59 { 60 try{ 61 Thread.sleep(1000); 62 b.put("烤鸭"); 63 } 64 catch(InterruptedException e) 65 { 66 } 67 } 68 } 69 } 70 class Consumer implements Runnable//消费者线程 71 { 72 public BoundedBuffer b; 73 public Consumer(BoundedBuffer b) 74 { 75 this.b=b; 76 } 77 public void run() 78 { 79 while(true) 80 { 81 82 try 83 { 84 Thread.sleep(1000); 85 b.take(); 86 } 87 catch(InterruptedException e) 88 { 89 } 90 } 91 } 92 } 93 public class Demo 94 { 95 public static void main(String args[])//使用三个生产者和两个消费者以便于观察现象 96 { 97 BoundedBuffer b=new BoundedBuffer(); 98 Procedure p=new Procedure(b); 99 Consumer c=new Consumer(b); 100 101 Thread t0=new Thread(p); 102 Thread t1=new Thread(p); 103 //Thread t2=new Thread(p); 104 105 Thread t3=new Thread(c); 106 Thread t4=new Thread(c); 107 Thread t5=new Thread(c); 108 109 t0.start(); 110 t1.start(); 111 //t2.start(); 112 t3.start(); 113 t4.start(); 114 t5.start(); 115 } 116 }
这个现象就很容易解释了,因为生产者的数量为2<消费者的数量3,所以一开始的时候就出现了供不应求的情况。这时候的现象就是生产者生产出一个,消费者就消费掉一个,我们可以看到生产者生产出的烤鸭编号总是等于消费者消费的烤鸭编号,这就是供不应求的证明。
再举一个不太恰当地例子:监跑老师的运动能力很强,他要和学生一起跑以监督学生的跑步质量。监跑老师一直跟在学生后面催着学生赶快跑,学生跑一步,监跑老师就跑一步。