【JAVA线程间通信技术】

之前的例子都是多个线程执行同一种任务,下面开始讨论多个线程执行不同任务的情况。

举个例子:有个仓库专门存储货物,有的货车专门将货物送往仓库,有的货车则专门将货物拉出仓库,这两种货车的任务不同,而且为了完成任务需要彼此相互合作,如果仓库中没有货物了而将货物拉出仓库的货车先到达了,那么它只有先等待其它货车将货物送入仓库......这种情况和线程间通信的情况很相似。

一、问题的提出-单生产者单消费者模式。

需求:定义一个容器,存储了字段姓名和性别,一个线程0为姓名和性别赋值,赋值完毕之后另一个线程1取走姓名和性别,接着线程0再为姓名和性别赋值,线程1再取走。。。

思路:定义一个资源类Resource存放姓名和性别,定义一个输入线程Input为姓名和性别赋值,定义一个输出线程Output取走姓名和性别。

1.原始代码

 1 /*
 2 线程安全性问题的产生
 3 */
 4 class Resource
 5 {
 6     String name;
 7     String sex;
 8 }
 9 class Input implements Runnable
10 {
11     Resource r;
12     public Input(){}
13     public Input(Resource r)
14     {
15         this.r=r;
16     }
17     public void run()
18     {
19         boolean flag=false;
20         while(true)
21         {
22             if(flag)
23             {
24                 r.name="Mike";
25                 r.sex="nan";
26             }
27             else
28             {
29                 r.name="丽丽";
30                 r.sex="女女女女女女女女女女女女";
31             }
32             flag=!flag;
33         }
34     }
35 }
36 class Output implements Runnable
37 {
38     Resource r;
39     public Output(){}
40     public Output(Resource r)
41     {
42         this.r=r;
43     }
44     public void run()
45     {
46         while(true)
47         {
48             System.out.println(r.name+"-----------------"+r.sex);
49         }
50     }
51 }
52 public class Demo
53 {
54     public static void main(String args[])
55     {
56         Resource r=new Resource();
57         Input in=new Input(r);
58         Output out=new Output(r);
59         Thread t0=new Thread(in);
60         Thread t1=new Thread(out);
61         t0.start();
62         t1.start();
63     }
64 }

运行结果:

2.改进代码,用加锁解决男女性别紊乱问题

观察结果,我们可以发现最严重的问题就是丽丽性别有时候变成了男,而Mike性别有时候变成了女

原因分析:

多线程操作共享资源的代码不止一条。

很明显,对操作共享资源的代码要加上锁,这里使用资源r即可。

我们可不可以对输入输出线程加上不同的锁?

不行。原因:加上锁的目的是为了让访问同一资源的线程保持唯一,也就是说总是要保持只有一个线程访问资源,事实上虽然加了锁,但是CPU也会在未执行完同步代码块之前就切换到其它线程,如果其他线程加了同样的锁,那么由于上一个线程占有了锁,所以CPU无法进入当前的同步代码块,当切换到上一个线程的时候,会在中断处继续执行。这样就保证了访问资源的线程只有一个。本例中由于这个原因,必须加上同一把锁。

 1 class Resource
 2 {
 3     String name;
 4     String sex;
 5 }
 6 class Input implements Runnable
 7 {
 8     Resource r;
 9     public Input(){}
10     public Input(Resource r)
11     {
12         this.r=r;
13     }
14     public void run()
15     {
16         boolean flag=false;
17         while(true)
18         {
19             synchronized(r)//使用Resource对象锁,因为对于输入输出线程来说,是唯一的
20             {
21                 if(flag)
22                 {
23                     r.name="Mike";
24                     r.sex="nan";
25                 }
26                 else
27                 {
28                     r.name="丽丽";
29                     r.sex="女女女女女女女女女女女女";
30                 }
31             }
32             flag=!flag;
33         }
34     }
35 }
36 class Output implements Runnable
37 {
38     Resource r;
39     public Output(){}
40     public Output(Resource r)
41     {
42         this.r=r;
43     }
44     public void run()
45     {
46         while(true)
47         {
48             synchronized(r)
49             {
50                 System.out.println(r.name+"-----------------"+r.sex);
51             }
52         }
53     }
54 }
55 public class Demo
56 {
57     public static void main(String args[])
58     {
59         Resource r=new Resource();
60         Input in=new Input(r);
61         Output out=new Output(r);
62         Thread input=new Thread(in);
63         Thread output=new Thread(out);
64         input.start();
65         output.start();
66     }
67 }

运行结果:

3.改进代码,用等待唤醒机制解决输出一大片一大片相同数据的情况。

观察运行结果,可以发现虽然性别转为正常,但是输出却还是有问题。怎样改变代码才能解决输出一大片一大片相同数据的情况?

分析:由于线程得到CPU执行权后在时间片用完之前会一直占有,所以会进行多次循环打印。

解决方法:使用等待唤醒机制解决这个问题。

等待唤醒机制中涉及到三个方法:

wait:使得线程进入冻结状态,除非调用notify(可能会唤醒)或者notifyAll方法(一定会唤醒),否则会一直保持冻结状态,注意它将会释放锁。

notify:在当前线程池中唤醒任意一条线程。

notifyAll:唤醒当前线程池中的全部线程。

针对这个问题,使用等待唤醒机制的过程当input线程赋完值之后先唤醒output线程让它取走数据,自己则先睡一会儿等待output线程唤醒自己;output线程取走数据之后,先唤醒input线程,自己再去睡一会儿等待input线程唤醒自己。。。。如此循环即可达到目的。为此需要一个标志变量flag标识当前资源的状态,true标识资源存在,output线程可以取走数据,而input线程则需要等待output线程取走数据;false标识资源为空,input线程可以输入数据,而output线程则要等待input线程输入数据。

 1 class Resource
 2 {
 3     boolean flag=false;//表名一开始的时候并没有姓名和性别,需要输入。
 4     String name;
 5     String sex;
 6 }
 7 class Input implements Runnable
 8 {
 9     Resource r;
10     public Input(){}
11     public Input(Resource r)
12     {
13         this.r=r;
14     }
15     public void run()
16     {
17         boolean flag=false;
18         while(true)
19         {
20             synchronized(r)//使用Resource对象锁,因为对于输入输出线程来说,是唯一的
21             {
22                 if(r.flag==true)
23                     try
24                     {
25                         r.wait();//让输入线程被等待
26                     }
27                     catch (InterruptedException e){}
28
29                 if(flag)
30                 {
31                     r.name="Mike";
32                     r.sex="nan";
33                 }
34                 else
35                 {
36                     r.name="丽丽";
37                     r.sex="女女女女女女女女女女女女";
38                 }
39                 r.flag=true;
40                 r.notify();//唤醒输出线程。
41             }
42             flag=!flag;
43         }
44     }
45 }
46 class Output implements Runnable
47 {
48     Resource r;
49     public Output(){}
50     public Output(Resource r)
51     {
52         this.r=r;
53     }
54     public void run()
55     {
56         while(true)
57         {
58             synchronized(r)
59             {
60                 if(r.flag==false)
61                     try
62                     {
63                         r.wait();//让输出线程被等待
64                     }
65                     catch (InterruptedException e)
66                     {
67                     }
68                 System.out.println(r.name+"-----------------"+r.sex);
69                 r.flag=false;
70                 r.notify();//告诉生产线已经没有货了
71             }
72         }
73     }
74 }
75 public class Demo
76 {
77     public static void main(String args[])
78     {
79         Resource r=new Resource();
80         Input in=new Input(r);
81         Output out=new Output(r);
82         Thread input=new Thread(in);
83         Thread output=new Thread(out);
84         input.start();
85         output.start();
86     }
87 }

4.代码优化

观察代码可以发现,资源类中的字段均为友好型的,在实际开发中应当使用私有形变量并且提供对外的访问方法。

代码优化之后:

 1 class Resource
 2 {
 3     boolean flag=false;//表名一开始的时候并没有姓名和性别,需要输入。
 4     private String name;
 5     private String sex;
 6     public synchronized void set(String name,String sex)
 7     {
 8         if(this.flag==true)
 9         {
10             try
11             {
12                 this.wait();
13             }
14             catch (InterruptedException e)
15             {
16             }
17         }
18         this.name=name;
19         this.sex=sex;
20         this.flag=true;
21         this.notify();
22     }
23     public synchronized void out()
24     {
25         if(this.flag==false)
26         {
27             try
28             {
29                 this.wait();
30             }
31             catch (InterruptedException e)
32             {
33             }
34         }
35         System.out.println(this.name+"---------++--------"+this.sex);
36         this.flag=false;
37         this.notify();
38     }
39 }
40 class Input implements Runnable
41 {
42     Resource r;
43     public Input(){}
44     public Input(Resource r)
45     {
46         this.r=r;
47     }
48     public void run()
49     {
50         boolean flag=false;
51         while(true)
52         {
53             if(flag)
54                 r.set("Mike","nan");
55             else
56                 r.set("丽丽","女女女女女女女女女");
57
58             flag=!flag;
59         }
60     }
61 }
62 class Output implements Runnable
63 {
64     Resource r;
65     public Output(){}
66     public Output(Resource r)
67     {
68         this.r=r;
69     }
70     public void run()
71     {
72         while(true)
73         {
74             r.out();
75         }
76     }
77 }
78 public class Demo
79 {
80     public static void main(String args[])
81     {
82         Resource r=new Resource();
83         Input in=new Input(r);
84         Output out=new Output(r);
85         Thread input=new Thread(in);
86         Thread output=new Thread(out);
87         input.start();
88         output.start();
89     }
90 }

代码出现了较大的改动,并且代码结构结构变得清晰合理。

二、多生产者、多消费者问题

1.单生产者、单消费者问题

为了便于说明,先从单生产者单消费者问题开始讲解。

需求:生产者生产一个烤鸭,消费者消费一个烤鸭。。

 1 /*
 2 单生产者单消费者问题,和上一个问题非常相似
 3 */
 4 class Resource
 5 {
 6     private boolean flag=false;
 7     private String name;
 8     private int count=1;
 9     public synchronized void set(String name )
10     {
11         if(this.flag)
12             try
13             {
14                 this.wait();
15             }
16             catch (InterruptedException e)
17             {
18             }
19         this.name=name+count++;
20         System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
21         this.flag=true;
22         this.notify();
23     }
24     public synchronized void out()
25     {
26         if(!this.flag)
27             try
28             {
29                 this.wait();
30             }
31             catch (InterruptedException e)
32             {
33             }
34         System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
35         this.flag=false;
36         notify();
37     }
38 }
39 class Procedure implements Runnable
40 {
41     private Resource r;
42     public Procedure(Resource r)
43     {
44         this.r=r;
45     }
46     public void run()
47     {
48         while(true)
49         {
50             r.set("烤鸭");
51         }
52     }
53 }
54
55 class Consumer implements Runnable
56 {
57     private Resource r;
58     public Consumer(Resource r)
59     {
60         this.r=r;
61     }
62     public void run()
63     {
64         while(true)
65         {
66             r.out();
67         }
68     }
69 }
70 public class Demo
71 {
72     public static void main(String args[])
73     {
74         Resource r=new Resource();
75         Procedure pro=new Procedure(r);
76         Consumer con=new Consumer(r);
77         Thread t0=new Thread(pro);
78         Thread t1=new Thread(con);
79         t0.start();
80         t1.start();
81     }
82 }

观察上述运行结果,可以发现线程0生产,线程1消费,没有任何问题。

2.多生产者,多消费者问题

如果需要增加生产者、消费者的数量,是不是简单地增加多个线程就可以了?

 1 /*
 2 现在讨论多生产者、多消费者问题。
 3 1.多生产了没消费的问题。
 4 2.多消费了同一件的问题。
 5 */
 6 class Resource
 7 {
 8     private boolean flag=false;
 9     private String name;
10     private int count=1;
11     public synchronized void set(String name )
12     {
13         if(this.flag)
14             try
15             {
16                 this.wait();//t0 t1(活)
17             }
18             catch (InterruptedException e)
19             {
20             }
21         this.name=name+count++;
22         System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
23         this.flag=true;
24         this.notify();
25     }
26     public synchronized void out()
27     {
28         if(!this.flag)
29             try
30             {
31                 this.wait();//t2 t3
32             }
33             catch (InterruptedException e)
34             {
35             }
36         System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
37         this.flag=false;
38         notify();
39     }
40 }
41 class Procedure implements Runnable
42 {
43     private Resource r;
44     public Procedure(Resource r)
45     {
46         this.r=r;
47     }
48     public void run()
49     {
50         while(true)
51         {
52             r.set("烤鸭");
53         }
54     }
55 }
56
57 class Consumer implements Runnable
58 {
59     private Resource r;
60     public Consumer(Resource r)
61     {
62         this.r=r;
63     }
64     public void run()
65     {
66         while(true)
67         {
68             r.out();
69         }
70     }
71 }
72 public class Demo
73 {
74     public static void main(String args[])
75     {
76         Resource r=new Resource();
77         Procedure pro=new Procedure(r);
78         Consumer con=new Consumer(r);
79         Thread t0=new Thread(pro);
80         Thread t1=new Thread(pro);
81         Thread t2=new Thread(con);
82         Thread t3=new Thread(con);
83         t0.start();
84         t1.start();
85         t2.start();
86         t3.start();
87     }
88 }

我们可以发现生产了几个烤鸭只有一个被消费或者同一个烤鸭被消费了多次,这是一种线程安全性问题的现象。

3.解决多生产、少消费的情况和消费同一个烤鸭被消费多次的情况

为什么会发生这种现象?

解析我们知道线程0、1只能运行输入线程的代码,2,3只能运行输出线程的代码。

假设线程0开始生产,生产完毕之后进入冻结状态,CPU切换到线程1,线程1发现有烤鸭,所以也进入冻结状态;线程2开始消费,并唤醒线程0,线程0加入堵塞队列,CPU切换到线程3,线程3发现没有烤鸭了,所以也进入堵塞状态。这时候线程池中有三个线程,分别是生产者线程1和消费者线程2、3,能够执行任务的只有线程0,线程0开始运作,它将不再判断flag而直接生产烤鸭,同时唤醒线程池中的一个线程,并进入冻结状态。关键问题就来了,它将会唤醒哪个线程?如果是线程2或者线程3,都不会出现问题,但是如果唤醒了线程1,那么线程1将不再判断flag而直接生产烤鸭,同时唤醒线程池中的一个线程。这时候问题已经很明显了,连续生产了两个烤鸭。如果下一个唤醒的线程是线程2或者3,将会被消费掉一个烤鸭,但是如果唤醒的是刚刚进入冻结状态的线程0,那么将会继续生产烤鸭。。极端情况就是连续一大片都在生产烤鸭而没有消费者消费烤鸭。

同理,连续消费同一个烤鸭多次也就不足为奇了。

解决方法很明显是由于线程醒过来之后没有在此判断flag造成的,所以我们将if改成while即可循环判断

 1 class Resource
 2 {
 3     private boolean flag=false;
 4     private String name;
 5     private int count=1;
 6     public synchronized void set(String name )
 7     {
 8         while(this.flag)
 9             try
10             {
11                 this.wait();//t0 t1(活)
12             }
13             catch (InterruptedException e)
14             {
15             }
16         this.name=name+count++;
17         System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
18         this.flag=true;
19         this.notify();
20     }
21     public synchronized void out()
22     {
23         while(!this.flag)
24             try
25             {
26                 this.wait();//t2 t3
27             }
28             catch (InterruptedException e)
29             {
30             }
31         System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
32         this.flag=false;
33         notify();
34     }
35 }
36 class Procedure implements Runnable
37 {
38     private Resource r;
39     public Procedure(Resource r)
40     {
41         this.r=r;
42     }
43     public void run()
44     {
45         while(true)
46         {
47             r.set("烤鸭");
48         }
49     }
50 }
51
52 class Consumer implements Runnable
53 {
54     private Resource r;
55     public Consumer(Resource r)
56     {
57         this.r=r;
58     }
59     public void run()
60     {
61         while(true)
62         {
63             r.out();
64         }
65     }
66 }
67 public class Demo
68 {
69     public static void main(String args[])
70     {
71         Resource r=new Resource();
72         Procedure pro=new Procedure(r);
73         Consumer con=new Consumer(r);
74         Thread t0=new Thread(pro);
75         Thread t1=new Thread(pro);
76         Thread t2=new Thread(con);
77         Thread t3=new Thread(con);
78         t0.start();
79         t1.start();
80         t2.start();
81         t3.start();
82     }
83 }

运行的结果是停在了第四行,光标不断闪烁但没有输出,我们很容易的就发现了,这是一种死锁现象。

4.解决死锁问题。

原因分析:

假设线程0开始生产,生产完毕之后进入冻结状态,CPU切换到线程1,线程1发现有烤鸭,所以也进入冻结状态;线程2开始消费,并唤醒线程0,线程0加入堵塞队列,CPU切换到线程3,线程3发现没有烤鸭了,所以也进入堵塞状态。这时候线程池中有三个线程,分别是生产者线程1和消费者线程2、3,能够执行任务的只有线程0,线程0开始运作(这部分分析和上面的分析相同),判断完flag,它将会生产一个烤鸭,同时唤醒线程池中的一个线程。这时候关键问题就来了,它将唤醒哪个线程?如果是线程2或者3,也没有问题,运行的时候会发现很“和谐”,但是如果唤醒的是线程1,线程1醒来之后会先判断标记,发现标记为true,所以它将继续睡,进入冻结状态,而线程0进入下一次循环并判断标记,也进入冻结状态。所以,现在就发生了四个线程全部处于冻结状态的情况,这也是一种死锁情况。

解决思路:

很明显,由于notify唤醒的线程有可能是对方的线程,但也有可能是己方的线程,当唤醒的是己方的线程时,在一定条件下就会产生死锁。如果我们能够只唤醒对方的线程,那么问题就解决了。但是现在先不讨论那种情况,该怎么解决这个问题?答案就是使用notifyAll方法唤醒全部线程。

 1 /*
 2 现在讨论多生产者、多消费者问题。
 3 解决多生产者多消费者问题出现的死锁现象:将notify改成notifyAll,这样将会唤醒本方线程和对方线程
 4
 5 */
 6 class Resource
 7 {
 8     private boolean flag=false;
 9     private String name;
10     private int count=1;
11     public synchronized void set(String name )
12     {
13         while(this.flag)
14             try
15             {
16                 this.wait();//t0 t1(活)
17             }
18             catch (InterruptedException e)
19             {
20             }
21         this.name=name+count++;
22         System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
23         this.flag=true;
24         this.notifyAll();
25     }
26     public synchronized void out()
27     {
28         while(!this.flag)
29             try
30             {
31                 this.wait();//t2 t3
32             }
33             catch (InterruptedException e)
34             {
35             }
36         System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
37         this.flag=false;
38         notifyAll();
39     }
40 }
41 class Procedure implements Runnable
42 {
43     private Resource r;
44     public Procedure(Resource r)
45     {
46         this.r=r;
47     }
48     public void run()
49     {
50         while(true)
51         {
52             r.set("烤鸭");
53         }
54     }
55 }
56
57 class Consumer implements Runnable
58 {
59     private Resource r;
60     public Consumer(Resource r)
61     {
62         this.r=r;
63     }
64     public void run()
65     {
66         while(true)
67         {
68             r.out();
69         }
70     }
71 }
72 public class Demo
73 {
74     public static void main(String args[])
75     {
76         Resource r=new Resource();
77         Procedure pro=new Procedure(r);
78         Consumer con=new Consumer(r);
79         Thread t0=new Thread(pro);
80         Thread t1=new Thread(pro);
81         Thread t2=new Thread(con);
82         Thread t3=new Thread(con);
83         t0.start();
84         t1.start();
85         t2.start();
86         t3.start();
87     }
88 }

现象:

现在问题全部解决了。但是很明显,由于notifyAll方法唤醒了全部线程,所以它的效率不高,特别是在线程数量特别多的情况下尤其明显。

三、多生产者、多消费者代码优化。

1.JDK1.5新特性。

JDK1.5针对多线程编程中存在的效率不高的问题做出了优化,特别是针对notifyAll方法,做出了重大改进。

新工具所在包:java.util.concurrent.locks

1.1接口:Lock。

此接口封装了获取锁的方法lock()和释放锁的方法unlock(),这样就将synchronized获取锁和释放锁的隐式过程显式化。

由于Lock是接口名,不能直接new对象,所以要使用实现此接口的已知类,比如ReentrantLock,使用Lock lock=new ReentrantLock();即可得到Lock接口的实例

这样就完成了自定义锁的过程。

应当注意,由于获取锁和释放锁的过程显式化,我们必须将释放锁的动作放在finally块中才能保证线程的安全性。如果线程在未释放所之前发生了异常,那么它将停止程序的运行,并返回上一级调用处,但是它仍然是锁的占有者,这样别的线程将不会有机会访问共享资源。

1.2接口:Condition。

Condition接口封装了三个重要的方法:await() 、signal() 、signalAll(),这三个方法对应着Object类中的wait() 、notify() 、notifyAll()方法。

为什么要使用Condition接口?

由于Lock接口的出现,导致了不使用synchronized关键字也可以,但是自定义锁上并没有这三个重要的方法,这是因为JDK1.5规定了一把锁上可以有多个监视器,如果这三个方法称为Lock接口中的成员,将会使得一把锁上只能有一个监视器,和原来相比就没有了优势。使用Condition接口的最大好处就是可以指定唤醒的线程是对方的上线程还是己方的线程,这样就能大大的提高工作效率

怎么获得实现了Condition接口的对象?

使用Lock接口中的方法:newCondition();

Condition con=lock.newCondition();

2.代码演示

2.1改造成和原来的代码相同的效果。

  1 /*
  2 现在讨论多生产者、多消费者问题。
  3 使用JDK1.5之后的新特性
  4 改变之后和原来的效果相同,效率没有提高。
  5 */
  6 import java.util.concurrent.locks.*;
  7 class Resource
  8 {
  9     private boolean flag=false;
 10     private String name;
 11     private int count=1;
 12     Lock lock=new ReentrantLock();
 13     Condition con=lock.newCondition();
 14     public void set(String name )
 15     {
 16         lock.lock();
 17         try
 18         {
 19             while(this.flag)
 20             try
 21             {
 22                 con.await();//t0 t1(活)
 23             }
 24             catch (InterruptedException e)
 25             {
 26             }
 27             this.name=name+count++;
 28             System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
 29             this.flag=true;
 30             con.signalAll();
 31         }
 32         finally
 33         {
 34             lock.unlock();
 35         }
 36     }
 37     public void out()
 38     {
 39         lock.lock();
 40         try
 41         {
 42             while(!this.flag)
 43             try
 44             {
 45                 con.await();//t2 t3
 46             }
 47             catch (InterruptedException e)
 48             {
 49             }
 50             System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
 51             this.flag=false;
 52             con.signalAll();
 53         }
 54         finally
 55         {
 56             lock.unlock();
 57         }
 58     }
 59 }
 60 class Procedure implements Runnable
 61 {
 62     private Resource r;
 63     public Procedure(Resource r)
 64     {
 65         this.r=r;
 66     }
 67     public void run()
 68     {
 69         while(true)
 70         {
 71             r.set("烤鸭");
 72         }
 73     }
 74 }
 75
 76 class Consumer implements Runnable
 77 {
 78     private Resource r;
 79     public Consumer(Resource r)
 80     {
 81         this.r=r;
 82     }
 83     public void run()
 84     {
 85         while(true)
 86         {
 87             r.out();
 88         }
 89     }
 90 }
 91 public class Demo
 92 {
 93     public static void main(String args[])
 94     {
 95         Resource r=new Resource();
 96         Procedure pro=new Procedure(r);
 97         Consumer con=new Consumer(r);
 98         Thread t0=new Thread(pro);
 99         Thread t1=new Thread(pro);
100         Thread t2=new Thread(con);
101         Thread t3=new Thread(con);
102         t0.start();
103         t1.start();
104         t2.start();
105         t3.start();
106     }
107 }

运行的效果和原来相同,既没有死锁发生,也没有其他线程安全性问题。但是效率和以前完全相同,因为也唤醒了对方的线程。

2.2优化后的代码,只将对方线程唤醒。

  1 import java.util.concurrent.locks.*;
  2 class Resource
  3 {
  4     private boolean flag=false;
  5     private String name;
  6     private int count=1;
  7     Lock lock=new ReentrantLock();
  8     Condition procedurers=lock.newCondition();
  9     Condition consumers=lock.newCondition();
 10     public void set(String name )
 11     {
 12         lock.lock();
 13         try
 14         {
 15             while(this.flag)
 16             try
 17             {
 18                 procedurers.await();//t0 t1(活)
 19             }
 20             catch (InterruptedException e)
 21             {
 22             }
 23             this.name=name+count++;
 24             System.out.println(Thread.currentThread().getName()+"------------生产了"+this.name);
 25             this.flag=true;
 26             consumers.signal();
 27         }
 28         finally
 29         {
 30             lock.unlock();
 31         }
 32     }
 33     public void out()
 34     {
 35         lock.lock();
 36         try
 37         {
 38             while(!this.flag)
 39             try
 40             {
 41                 consumers.await();//t2 t3
 42             }
 43             catch (InterruptedException e)
 44             {
 45             }
 46             System.out.println(Thread.currentThread().getName()+"**********************消费了"+this.name);
 47             this.flag=false;
 48             procedurers.signal();
 49         }
 50         finally
 51         {
 52             lock.unlock();
 53         }
 54     }
 55 }
 56 class Procedure implements Runnable
 57 {
 58     private Resource r;
 59     public Procedure(Resource r)
 60     {
 61         this.r=r;
 62     }
 63     public void run()
 64     {
 65         while(true)
 66         {
 67             r.set("烤鸭");
 68         }
 69     }
 70 }
 71
 72 class Consumer implements Runnable
 73 {
 74     private Resource r;
 75     public Consumer(Resource r)
 76     {
 77         this.r=r;
 78     }
 79     public void run()
 80     {
 81         while(true)
 82         {
 83             r.out();
 84         }
 85     }
 86 }
 87 public class Demo
 88 {
 89     public static void main(String args[])
 90     {
 91         Resource r=new Resource();
 92         Procedure pro=new Procedure(r);
 93         Consumer con=new Consumer(r);
 94         Thread t0=new Thread(pro);
 95         Thread t1=new Thread(pro);
 96         Thread t2=new Thread(con);
 97         Thread t3=new Thread(con);
 98         t0.start();
 99         t1.start();
100         t2.start();
101         t3.start();
102     }
103 }

运行的效果和以前完全相同,但是效率却要更高。

四、真正的生产者消费者问题。

之前讨论的多生产者多消费者问题所用的资源类中只能存放1个烤鸭,多个生产者争着生产这一个烤鸭,多个消费者争着消费这一个烤鸭,这在实际生活中是不存在的。应当改造成这样:有一个可以存放多个烤鸭的容器,生产者将烤鸭生产完毕之后放入容器,消费者在后面消费,当容器满了,生产者停下等待消费者消费,消费者一旦消费掉一个烤鸭,就告诉生产者容器内可以存放新烤鸭了,生产者于是开始生产新的烤鸭并放在空位上;消费者发现容器空了,则等待生产者生产出烤鸭,生产者一旦生产出烤鸭就立即通知消费者容器内已经有烤鸭了,可以消费了。这才是一个生产者消费者问题的实际流程。

容器可以使用数组,生产者消费者都需要一个指针变量,表示下一个生产出的烤鸭的位置和下一个将要消费的烤鸭所在的位置。此外,应当有一个变量标识当前容器内有多少烤鸭,以便于决定生产者和消费者在容器已满和容器为空的时候的动作。

此外,需要一把同步锁,这是必须的,锁上有两个监视器,这是为了提高效率,原因前面已经解释过。

变量说民:

lock:锁名。

notFull:生产者监视器。

notEmpty:消费者监视器。

items:对象数组,存放“烤鸭”

putptr:指示生产者下一个生产出来的烤鸭应当存放在什么位置。

takeptr:指示消费者下一个将要消费的烤鸭所在的位置。

count:指示当前容器中的烤鸭数量,初始值是0。

容器的大小是5。

1.供大于求

  1 import java.util.concurrent.locks.*;
  2 class BoundedBuffer {
  3    final Lock lock = new ReentrantLock();
  4    final Condition notFull  = lock.newCondition();
  5    final Condition notEmpty = lock.newCondition();
  6
  7    final Object[] items = new Object[5];
  8    int putptr, takeptr, count=0;
  9
 10    public void put(Object x) throws InterruptedException {//生产线
 11      lock.lock();
 12      try {
 13        while (count == items.length)
 14          notFull.await();//生产线已经饱和,无需再生产,等待消费,进入冻结状态。
 15        items[putptr] = x; //生产下一个产品。
 16
 17
 18        System.out.println(Thread.currentThread().getName()+":----生产"+items[putptr]+putptr);
 19
 20
 21        if (++putptr == items.length) putptr = 0;//如果生产到了尾部,则从头开始生产。
 22        ++count;//数量+1
 23        notEmpty.signal();//唤醒消费者开始消费
 24      } finally {
 25        lock.unlock();
 26      }
 27    }
 28
 29    public Object take() throws InterruptedException {
 30      lock.lock();
 31      try {
 32        while (count == 0) //如果生产线上没有货物,则等待生产者生产,进入冻结状态。
 33          notEmpty.await();
 34        Object x = items[takeptr]; //生产线上有货物了,消费货物
 35
 36        System.out.println(Thread.currentThread().getName()+":-----------消费"+items[takeptr]+takeptr);
 37
 38
 39        if (++takeptr == items.length) takeptr = 0;//消费到了最后一个产品,则从头开始找货物
 40        --count;//货物数量自减
 41        notFull.signal();//唤醒生产线开始生产
 42        return x;
 43      } finally {
 44        lock.unlock();
 45      }
 46    }
 47  }
 48
 49  class Procedure implements Runnable//生产者线程
 50  {
 51      public BoundedBuffer b;
 52      public Procedure(BoundedBuffer b)
 53      {
 54          this.b=b;
 55      }
 56      public void run()
 57      {
 58         while(true)
 59          {
 60             try{
 61                 Thread.sleep(1000);
 62             b.put("烤鸭");
 63             }
 64             catch(InterruptedException e)
 65              {
 66              }
 67          }
 68      }
 69  }
 70  class Consumer implements Runnable//消费者线程
 71  {
 72      public BoundedBuffer b;
 73      public Consumer(BoundedBuffer b)
 74      {
 75          this.b=b;
 76      }
 77      public void run()
 78      {
 79         while(true)
 80          {
 81
 82             try
 83             {
 84                 Thread.sleep(1000);
 85                 b.take();
 86             }
 87             catch(InterruptedException e)
 88              {
 89              }
 90          }
 91      }
 92  }
 93 public class Demo
 94 {
 95     public static void main(String args[])//使用三个生产者和两个消费者以便于观察现象
 96     {
 97         BoundedBuffer b=new BoundedBuffer();
 98         Procedure p=new Procedure(b);
 99         Consumer c=new Consumer(b);
100
101         Thread t0=new Thread(p);
102         Thread t1=new Thread(p);
103         Thread t2=new Thread(p);
104
105         //Thread t3=new Thread(c);
106         Thread t4=new Thread(c);
107         Thread t5=new Thread(c);
108
109         t0.start();
110         t1.start();
111         t2.start();
112         //t3.start();
113         t4.start();
114         t5.start();
115     }
116 }

程序采用了生产延时和消费延时的方式,以便于观察现象但是不影响最终结果。

运行结果:

分析上述输出结果,我们可以发现一开始生产者的生产和消费者的消费不平衡,即生产者的生产>消费者的消费,但是后来生产者的生产和消费者的消费就达到了平衡,即生产者生产一个,消费者消费一个。这句话听起来没有问题,但其实是错误的,事实上是消费者消费掉一个,生产者生产出一个原因就是在本程序中,生产者为3个,而消费者为2个,本程序没有给线程定义优先级,所以他们得到CPU的执行权的概率基本相同(生产能力和消费能力相同,此次分析建立在这个基础之上)。所以程序很快就进入了饱和状态。即最开始的时候生产者已经将容器填满,等消费者消费掉一个,生产者就生产一个。我们可以发现,当稳定的时候,消费者消费烤鸭的标号总是==生产者生产烤鸭的编号+1,这就是供大于求的证明。事实上这时候容器一直处于满的状态。

举一个不太恰当的例子:操场上两个人甲乙一起跑步,甲跑的快,很快就跑完了两圈,但是这时候乙才跑完了一圈,这时候甲乙就又碰面了,貌似是跑的一样快,但是其实甲已经多跑了一圈,本应当超过乙继续跑,但是乙却挡在甲的前面不让甲过去,这样甲就不得不让乙先跑,乙跑一步,甲就在后面跟着乙跑一步。在这个例子中,甲相当于生产者,乙相当于消费者,这是供大于求的情况。

2.供小于求

我们将生产者的数量改成2,将消费者的数量改为3,再看看结果怎么样。

  1 import java.util.concurrent.locks.*;
  2 class BoundedBuffer {
  3    final Lock lock = new ReentrantLock();
  4    final Condition notFull  = lock.newCondition();
  5    final Condition notEmpty = lock.newCondition();
  6
  7    final Object[] items = new Object[5];
  8    int putptr, takeptr, count=0;
  9
 10    public void put(Object x) throws InterruptedException {//生产线
 11      lock.lock();
 12      try {
 13        while (count == items.length)
 14          notFull.await();//生产线已经饱和,无需再生产,等待消费,进入冻结状态。
 15        items[putptr] = x; //生产下一个产品。
 16
 17
 18        System.out.println(Thread.currentThread().getName()+":----生产"+items[putptr]+putptr);
 19
 20
 21        if (++putptr == items.length) putptr = 0;//如果生产到了尾部,则从头开始生产。
 22        ++count;//数量+1
 23        notEmpty.signal();//唤醒消费者开始消费
 24      } finally {
 25        lock.unlock();
 26      }
 27    }
 28
 29    public Object take() throws InterruptedException {
 30      lock.lock();
 31      try {
 32        while (count == 0) //如果生产线上没有货物,则等待生产者生产,进入冻结状态。
 33          notEmpty.await();
 34        Object x = items[takeptr]; //生产线上有货物了,消费货物
 35
 36        System.out.println(Thread.currentThread().getName()+":-----------消费"+items[takeptr]+takeptr);
 37
 38
 39        if (++takeptr == items.length) takeptr = 0;//消费到了最后一个产品,则从头开始找货物
 40        --count;//货物数量自减
 41        notFull.signal();//唤醒生产线开始生产
 42        return x;
 43      } finally {
 44        lock.unlock();
 45      }
 46    }
 47  }
 48
 49  class Procedure implements Runnable//生产者线程
 50  {
 51      public BoundedBuffer b;
 52      public Procedure(BoundedBuffer b)
 53      {
 54          this.b=b;
 55      }
 56      public void run()
 57      {
 58         while(true)
 59          {
 60             try{
 61                 Thread.sleep(1000);
 62             b.put("烤鸭");
 63             }
 64             catch(InterruptedException e)
 65              {
 66              }
 67          }
 68      }
 69  }
 70  class Consumer implements Runnable//消费者线程
 71  {
 72      public BoundedBuffer b;
 73      public Consumer(BoundedBuffer b)
 74      {
 75          this.b=b;
 76      }
 77      public void run()
 78      {
 79         while(true)
 80          {
 81
 82             try
 83             {
 84                 Thread.sleep(1000);
 85                 b.take();
 86             }
 87             catch(InterruptedException e)
 88              {
 89              }
 90          }
 91      }
 92  }
 93 public class Demo
 94 {
 95     public static void main(String args[])//使用三个生产者和两个消费者以便于观察现象
 96     {
 97         BoundedBuffer b=new BoundedBuffer();
 98         Procedure p=new Procedure(b);
 99         Consumer c=new Consumer(b);
100
101         Thread t0=new Thread(p);
102         Thread t1=new Thread(p);
103         //Thread t2=new Thread(p);
104
105         Thread t3=new Thread(c);
106         Thread t4=new Thread(c);
107         Thread t5=new Thread(c);
108
109         t0.start();
110         t1.start();
111         //t2.start();
112         t3.start();
113         t4.start();
114         t5.start();
115     }
116 }

这个现象就很容易解释了,因为生产者的数量为2<消费者的数量3,所以一开始的时候就出现了供不应求的情况。这时候的现象就是生产者生产出一个,消费者就消费掉一个,我们可以看到生产者生产出的烤鸭编号总是等于消费者消费的烤鸭编号,这就是供不应求的证明

再举一个不太恰当地例子:监跑老师的运动能力很强,他要和学生一起跑以监督学生的跑步质量。监跑老师一直跟在学生后面催着学生赶快跑,学生跑一步,监跑老师就跑一步。

时间: 2024-11-05 09:09:35

【JAVA线程间通信技术】的相关文章

JAVA线程间共享变量

import java.util.HashMap; import java.util.Map; import java.util.Random; public class ThreadScopeShareData { static Map<Thread, Integer> dataMap = new HashMap<Thread, Integer>(); public static void main(String[] args) { for (int i = 0; i <

Java——线程间通信问题

 wait和sleep区别: 1.wait可以指定时间可以不指定.     sleep必须指定时间. 2.在同步时,对cpu的执行权和锁的处理不同.     wait:释放执行权,释放锁.     sleep:释放执行权,不释放锁. /* * 等待/唤醒机制 * 设计的方法: * 1.wait():让线程处于等待状态,被wait的线程会被存储到线程池中. * 2.notify():唤醒线程池中的一个线程(任意) * 3.notifyAll():唤醒线程池中的所有线程. * 这些方法都必须定义

说说Java线程间通信

序言 正文 [一] Java线程间如何通信? 线程间通信的目标是使线程间能够互相发送信号,包括如下几种方式: 1.通过共享对象通信 线程间发送信号的一个简单方式是在共享对象的变量里设置信号值:线程A在一个同步块里设置boolean型成员变量hasDataToProcess为true,线程B也在同步块里读取hasDataToProcess这个成员变量:线程A和B必须获得指向一个MySignal共享实例的引用,以便进行通信:如果它们持有的引用指向不同的MySingal实例,那么彼此将不能检测到对方的

JAVA线程间协作:wait.notify.notifyAll

JAVA的进程同步是通过synchronized()来实现的,须要说明的是,JAVA的synchronized()方法相似于操作系统概念中的相互排斥内存块.在JAVA中的Object类型中.都是带有一个内存锁的,在有线程获取该内存锁后.其它线程无法訪问该内存.从而实现JAVA中简单的同步.相互排斥操作. 明确这个原理.就能理解为什么synchronized(this)与synchronized(static XXX)的差别了.synchronized就是针对内存区块申请内存锁,thiskeywo

Java线程间通信之wait/notify

Java中的wait/notify/notifyAll可用来实现线程间通信,是Object类的方法,这三个方法都是native方法,是平台相关的,常用来实现生产者/消费者模式.先来我们来看下相关定义: wait() :调用该方法的线程进入WATTING状态,只有等待另外线程的通知或中断才会返回,调用wait()方法后,会释放对象的锁. wait(long):超时等待最多long毫秒,如果没有通知就超时返回. notify() : 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前

java线程间通信

等待通知机制的实现 方法wait()的作用是使当前执行代码的线程进行等待,wait()方法是object类的方法,该方法的作用是将当前线程置入"预执行队列中",并且在wait()所在的代码行处停止执行,直到接到通知,或者被中断为止. 在调用wait()方法执行,线程需要先获得该对象的对象级别锁,也就是说,只能在同步方法,或者同步块中调用wait()方法,在执行wait()方法后,当前线程释放锁,在从wait()方法返回前,线程与其他线程竞争重新获得锁,如果调用wait()是没有持有适当

黑马程序员——java——线程间的通讯 生产者与消费者

线程间的的通讯  生产者与消费者 public class TestDemos3 { public static void main(String[] args) { Res r = new Res(); Input in = new Input(r); Output out = new Output(r); Thread t1 = new Thread(in); Thread t2 = new Thread(out); t1.start(); t2.start(); } } class Res

JAVA线程间的状态转换

线程间的状态转换:  1. 新建(new):新创建了一个线程对象. 2. 可运行(runnable):线程对象创建后,其他线程(比如main线程)调用了该对象的start()方法.该状态的线程位于可运行线程池中,等待被线程调度选中,获取cpu 的使用权 . 3. 运行(running):可运行状态(runnable)的线程获得了cpu 时间片(timeslice) ,执行程序代码. 4. 阻塞(block):阻塞状态是指线程因为某种原因放弃了cpu 使用权,也即让出了cpu timeslice,

Java 线程间通讯

/* 线程间通讯: 多个线程在处理同一资源,但是任务却不同. */ package com.cwcec.test; class Input implements Runnable { Resource r; public Input(Resource r) { this.r = r; } public void run() { int x = 0; while(true) { synchronized (r) { if(x == 0) { r.name = "Mike"; r.sex