生产者及消费者问题,是线程操作中的一个经典案列。但由于线程运行的不确定性,生产者及消费者可能会产生一些问题:
试想,如果生产者线程向存储数据空间添加了部分信息,但没有添加全部,这时就切换到消费者线程,这时消费者线程将会把已经添加了的部分信息,后上一次的信息混淆了,导致出错。
或者,若生产者放数据,与消费者取数据的速度不匹配,也会出现问题:即可能会出现,生产者放了多条数据,消费者才取了一条,导致数据丢失;或生产者只放了一条数据,但消费者已经取了多条,这会导致重复取出数据。
举例说明:
class Info {
private String name = null;
private String gender = null;
private String school = null;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getSchool() {
return school;
}
public void setSchool(String school) {
this.school = school;
}
@Override
public String toString() {
return "Info [name=" + name + ", gender=" + gender + ", school="
+ school + "]";
}
}
class Produ implements Runnable {
private Info info = null;
public Produ(Info info) {
this.info = info;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
if (i % 2 == 1) {
this.info.setName("qiu");
Thread.sleep((int)Math.random()*100);//通过加入延时,使问题更加容易显露出来
this.info.setSchool("whut");
this.info.setGender("boy");
} else {
this.info.setName("cai");
Thread.sleep((int)Math.random()*100);
this.info.setSchool("scut");
this.info.setGender("girl");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Consum implements Runnable{
private Info info;
public Consum(Info info) {
this.info = info;
}
@Override
public void run() {
try {
for (int i = 0; i < 10; i++) {
System.out.println(info);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProducerAndConsume {
public static void main(String args[]){
Info info = new Info();
Produ producer = new Produ(info);
Consum consumer = new Consum(info);
new Thread(producer).start();
new Thread(consumer).start();
}
}
运行结果(由于线程的不确定性,所以每次运行的结果每次都不一样,这里就只粘一次运行结果):
Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=cai, gender=null, school=null]
Info [name=qiu, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
Info [name=qiu, gender=boy, school=whut]
从上述例子来看,之前提到的问题在结果上已经展现出来了。
解决方法1:
通过加入线程等待与唤醒,wait()和notify()(或者notifyAll),但实现过程是麻烦,并且会增加类与类之间的耦合,因此不推荐使用。这里也不多加介绍了。
解决方法2:
通过阻塞队列来解决这种任务协作问题。
在java.util.concurrent.BlockingQueue接口提供了阻塞队列。实现类分别有ArrayBlockingQueue,LinkedBlockingQueue,DelayQueue,PriorityBlockingQueue,SynchronousQueue等,
有了阻塞队列就不用担心本文一开始所担心的问题了。因为若队列为空,而此时消费者想从队列中获取对象,此时,阻塞队列会挂起消费者任务,等到有更多元素可用时才恢复消费者任务。更不用担心若消费者取出的速度小于生产者放入的速度的情况,因为生产者所生产的都会存入队列里等待消费者取出。
下面使用LinkedBlockingQueue阻塞队列来修改上面的例子。(其实可以使用ArrayBlockingQueue,两者的区别只是后者初始化时已经确定了队列的长度,即具有固定尺寸的,而前者是无界队列.)
在上述例子添加阻塞队列便可以解决上述两点问题了,代码如下:
import java.util.concurrent.LinkedBlockingQueue;
class Info {
private String name = null;
private String gender = null;
private String school = null;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public String getSchool() {
return school;
}
public void setSchool(String school) {
this.school = school;
}
@Override
public String toString() {
return "Info [name=" + name + ", gender=" + gender + ", school="
+ school + "]";
}
}
class Produ implements Runnable {
private Info info = null;
private LinkedBlockingQueue<Info> infoQueue;
public Produ(LinkedBlockingQueue<Info> infoQueue) {
this.infoQueue = infoQueue;
}
public void run() {
try {
for (int i = 0; i < 10; i++) {
info = new Info();
if (i % 2 == 1) {
info.setName("qiu");
Thread.sleep((int)Math.random()*100);
info.setSchool("whut");
info.setGender("boy");
} else {
info.setName("cai");
Thread.sleep((int)Math.random()*100);
info.setSchool("scut");
info.setGender("girl");
}
infoQueue.put(info);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
class Consum implements Runnable {
private Info info;
private LinkedBlockingQueue<Info> infoQueue;
public Consum(LinkedBlockingQueue<Info> infoQueue) {
this.infoQueue = infoQueue;
}
public void run() {
try {
while((info = infoQueue.take())!=null){
System.out.println(info);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class ProducerAndConsume {
public static LinkedBlockingQueue<Info> infoQueue = new LinkedBlockingQueue<Info>();
public static void main(String args[]) {
Produ producer = new Produ(infoQueue);
Consum consumer = new Consum(infoQueue);
new Thread(producer).start();
new Thread(consumer).start();
}
}
运行结果:
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
Info [name=cai, gender=girl, school=scut]
Info [name=qiu, gender=boy, school=whut]
从运行结果可以看出,通过添加阻塞队列确实可以解决生产者及消费者之间由于效率不匹配所导致的问题。
因此通过引入阻塞队列,不需要像引入wait()与notifyAll()来增加类与类之间的耦合来处理生产者与消费者效率不统一的问题,仅仅在每个类里面与阻塞队列进行通信即可。