多生产者多消费者问题
以生产馒头 消费馒头为例。
class Resource { private String name; private int count = 1; private boolean flag = false; public synchronized void set(String name) { if (flag) { try { this.wait(); } catch (Exception e) { // TODO: handle exception } } this.name = name + count; count++; System.out.println(Thread.currentThread().getName()+"---生产者----"+this.name); flag = true; notify(); } public synchronized void out() { if (!flag) { try { this.wait(); } catch (Exception e) { // TODO: handle exception } } System.out.println(Thread.currentThread().getName()+"---------消费者--------"+this.name); flag = false; notify(); } } class Producer implements Runnable { private Resource r; Producer(Resource r) { this.r = r; } public void run() { while(true) { r.set("馒头"); } } } class Consumer implements Runnable { private Resource r; Consumer(Resource r) { this.r = r; } public void run() { while(true) { r.out(); } } } public class Main { public static void main(String[] args) { Resource r = new Resource(); Producer pro = new Producer(r); Consumer con = new Consumer(r); Thread t0 = new Thread(pro); Thread t1 = new Thread(pro); Thread t2 = new Thread(con); Thread t3 = new Thread(con); t0.start(); t1.start(); t2.start(); t3.start(); } }
上述代码容易出现一个问题,就是同一个馒头被多次消费,或者有的馒头没有被消费。单生者,单消费者不会出现这个问题
原因:notify 唤醒的线程是任意一个,比如t0 t1都是等待状态,t0活了之后,t1也可能被唤醒,所以就会产生多个馒头而没有被消费的情况,针对这个问题,flag的判断可以改为循环,这样却容易造成了死锁情况:t0 t1被wait了,t2正常消费一次,flag = false,唤醒了t0,因为循环t2 t3被wait了,t0执行一次,flag = true,假设唤醒t1,判断t0进行wait,while,t1也进入了等待,死锁状态
PS:冻结状态的线程被唤醒后本次就不参与判断,向下执行,所以简单来说,多个馒头没有被消费问题的产生,是因为冻结状态的线程不再继续参与判断造成的,而死锁是因为循环判断造成的
多生产多消费问题解决:
notify,改为notifyAll,当前本类线程中的一个正常执行后,唤醒所有线程,当前同类线程因为while,自然会继续等待,对方任意一个线程执行一次,对方剩余线程继续等待,这样就实现了生成对应消费
class Resource { private String name; private int count = 1; private boolean flag = false; public synchronized void set(String name) { while (flag) { try { this.wait(); } catch (Exception e) { // TODO: handle exception } } this.name = name + count; count++; System.out.println(Thread.currentThread().getName()+"---生产者----"+this.name); flag = true; notifyAll(); } public synchronized void out() { while (!flag) { try { this.wait(); } catch (Exception e) { // TODO: handle exception } } System.out.println(Thread.currentThread().getName()+"---------消费者--------"+this.name); flag = false; notifyAll(); } }
总结:
if判断,只会判断一次,容易造成不该唤醒的线程被唤醒了,出现问题
while判断,虽然解决了线程获取执行权后,是否要运行
notify,只能唤醒任意一个线程,但是唤醒本方线程,没有意义,且while+notify = 死锁
notifyAll,解决了本方线程一定会唤醒对方线程
所以,多生产多消费问题 = while + notifyAll,但是也造成了效率低的问题(本方不该唤醒,也被唤醒了)
新特性(JDK1.5升级)
如何不唤醒本方,只唤醒对方呢
void Fu()//前期 { synchronized (obj) //关于锁的操作是隐式的,只有锁自己知道 { code.... } } //后期升级,将锁这一事物封装成了对象 Lock L = new ReentrantLock();,将操作锁的隐式方式定义到了对象中 void Fu() { L.lock();//获取锁 code.. L.unlock();//释放锁 }
接口Lock
Lock
实现提供了比使用 synchronized
方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的 Condition
对象。
Condition
将 Object
监视器方法(wait
、notify
和
notifyAll
)分解成截然不同的对象,以便通过将这些对象与任意 Lock
实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock
替代了
方法和语句的使用,
synchronizedCondition
替代了 Object 监视器方法的使用。
Lock l = ...; l.lock(); try { // access the resource protected by this lock } finally { l.unlock(); }
Lock接口:替代了同步代码块或同步函数,将同步锁的隐式操作变成现实操作,更为灵活,可以一个锁加多个监视器
方法:
L.lock();//获取锁
L.unlock();//释放锁,一般与finally代码块连用
Condition接口:实现了Object中wait(),notify(),notifyAll()方法,将这些监视器方法进行了单独的封装,变成Condition监视器对象,可以任意锁进行组合
await() 相当于 wait
signal() 相当于 notify
signalAll(); 相当于 notifyAll
import java.util.concurrent.locks.*; class Resource { private String name; private int count = 1; private boolean flag = false; //创建一个锁 Lock L = new ReentrantLock(); //通过已有的锁,获取该锁监视器对象(Condition) //Condition con = L.newCondition(); //通过已有的锁,获取两组监视器,一个监视生产者,一个监视消费者 Condition pro_con = L.newCondition(); Condition consu_con = L.newCondition(); public void set(String name) { L.lock(); try { while (flag) { try { pro_con.await();//线程冻结 }catch (Exception e) { // TODO: handle exception } } this.name = name + count; count++; System.out.println(Thread.currentThread().getName()+"---生产者5.0----"+this.name); flag = true; //con.signalAll();//唤醒所有线程 consu_con.signal();//唤醒消费者线程,不用All } catch (Exception e) { // TODO: handle exception } finally { L.unlock();//释放锁 } } public void out() { L.lock(); try { while (!flag) { try { consu_con.await(); } catch (Exception e) { // TODO: handle exception } } System.out.println(Thread.currentThread().getName()+"---------消费者5.0--------"+this.name); flag = false; //con.signalAll(); pro_con.signal(); } catch (Exception e) { // TODO: handle exception } finally { L.unlock(); } } } class Producer implements Runnable { private Resource r; Producer(Resource r) { this.r = r; } public void run() { while(true) { r.set("馒头"); } } } class Consumer implements Runnable { private Resource r; Consumer(Resource r) { this.r = r; } public void run() { while(true) { r.out(); } } } public class Main { public static void main(String[] args) { Resource r = new Resource(); Producer pro = new Producer(r); Consumer con = new Consumer(r); Thread t0 = new Thread(pro); Thread t1 = new Thread(pro); Thread t2 = new Thread(con); Thread t3 = new Thread(con); t0.start(); t1.start(); t2.start(); t3.start(); } }
实际开发是这样的代码
class BoundedBuffer {//缓冲区 final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100];//创建一个大的容器 int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }