java ReentrantLock结合条件队列 实现生产者-消费者模式

 1 package reentrantlock;
 2
 3 import java.util.ArrayList;
 4
 5 public class ProviderAndConsumerTest {
 6
 7     static ProviderAndConsumer providerAndConsumer  =  new ProviderAndConsumer();
 8
 9     public static void main(String[] args) throws InterruptedException {
10
11 //        new Thread(new GetRunnable(), "消费者002").start();
12 //        Thread.sleep(1000);
13 //        new Thread(new PutRunnable(), "生产者001").start();
14
15         ArrayList<Thread> provider = new ArrayList<>();
16         for (int i = 0; i < 3; i++){
17             provider.add(new Thread(new PutRunnable(), "生产者00"+ (i+1)));
18         }
19
20         ArrayList<Thread> consumer = new ArrayList<>();
21         for (int i = 0; i < 3; i++){
22             consumer.add(new Thread(new GetRunnable(), "      消费者--99"+ (i+1)));
23         }
24
25
26         for (Thread i :
27                 consumer) {
28             i.start();
29         }
30
31         // 先让消费者线程全部饥饿,进入消费者条件队列中
32         Thread.sleep(10000);
33
34         for (Thread i :
35                 provider) {
36             i.start();
37         }
38
39
40
41
42     }
43
44     static class PutRunnable implements Runnable {
45
46
47         @Override
48         public void run() {
49             for (int i = 0; i < 3; i++){
50                 providerAndConsumer.put("   (" +Thread.currentThread().getName() + "_data_" + i + ")");
51                 try {
52                     // 调整睡眠时间,等同于调整生产者生产数据的频率,但是不准,因为跟生产者内部逻辑执行时间有很大关系
53                     Thread.sleep(500,1);
54                 } catch (InterruptedException e) {
55                     e.printStackTrace();
56                 }
57             }
58         }
59     }
60
61     static class GetRunnable implements Runnable{
62
63         @Override
64         public void run() {
65             while(true){
66                 providerAndConsumer.get();
67                 try {
68                     // 调整睡眠时间,等同于调整消费者消费数据的频率,但是不准,因为跟消费者内部执行时间有很大关系
69                     Thread.sleep(10,1);
70                 } catch (InterruptedException e) {
71                     e.printStackTrace();
72                 }
73             }
74         }
75     }
76 }
  1 package reentrantlock;
  2
  3 import java.util.concurrent.locks.Condition;
  4 import java.util.concurrent.locks.ReentrantLock;
  5
  6 public class ProviderAndConsumer {
  7
  8     ReentrantLock reentrantLock = new ReentrantLock();
  9     Condition notEmpty = reentrantLock.newCondition();
 10     Condition notFull = reentrantLock.newCondition();
 11     int maxSize = 3;
 12     int putIndex = 0;
 13     int getIndex = 0;
 14     int realDataCount = 0;
 15     Object[] queue = new Object[maxSize];
 16
 17     boolean isConsumerWait = false;
 18     boolean isProviderWait = false;
 19
 20
 21     public void put(Object data){
 22         System.out.println(Thread.currentThread().getName() + "线程,-----尝试加锁-----");
 23         reentrantLock.lock();
 24         System.out.println(Thread.currentThread().getName() + "线程,-----加锁成功-----");
 25         try {
 26
 27             while (realDataCount == queue.length){
 28                 System.out.println(Thread.currentThread().getName() + "线程,-----数据满了,只好等待----" +
 29                         "哈哈要进生产者条件队列啦");
 30                 isProviderWait = true;
 31                 notFull.await();
 32             }
 33
 34             queue[putIndex] = data;
 35             realDataCount++;
 36
 37             putIndex++;
 38             if (putIndex == queue.length) putIndex = 0;
 39             System.out.println(Thread.currentThread().getName() + "线程,成功生产一个数据=" + data.toString());
 40
 41             if (isConsumerWait){
 42                 System.out.println(Thread.currentThread().getName() + "线程," +
 43                         " 未 激活消费者条件队列节点前,获取消费者队列长度"
 44                         + reentrantLock.getWaitQueueLength(notEmpty));
 45             }
 46             notEmpty.signal();
 47             if (isConsumerWait){
 48                 int length = reentrantLock.getWaitQueueLength(notEmpty);
 49                 if (length == 0){
 50                     isConsumerWait = false;
 51                 }
 52                 System.out.println(Thread.currentThread().getName() + "线程," +
 53                         " 已 激活消费者条件队列节点后,获取消费者队列长度"
 54                         + reentrantLock.getWaitQueueLength(notEmpty));
 55             }
 56             // 调整睡眠时间,等同于调整生产者持有锁的时间
 57             Thread.sleep(1000);
 58         } catch (InterruptedException e) {
 59             e.printStackTrace();
 60         } finally {
 61             System.out.println(Thread.currentThread().getName() + "线程,成功解锁");
 62             reentrantLock.unlock();
 63         }
 64
 65     }
 66
 67
 68     public void get(){
 69         System.out.println("\t\t" + Thread.currentThread().getName() + "线程,尝试加锁");
 70         reentrantLock.lock();
 71         System.out.println("\t\t" + Thread.currentThread().getName() + "线程,-----加锁成功----");
 72
 73         try {
 74
 75             while (realDataCount == 0){
 76                 System.out.println("\t\t" + Thread.currentThread().getName() + "线程,-----没有数据,只好等待----"+
 77                         "哈哈 要进消费者条件队列啦");
 78                 isConsumerWait = true;
 79                 notEmpty.await();
 80             }
 81
 82             System.out.println("\t\t" + Thread.currentThread().getName() + "线程,成功消费一个数据=" + queue[getIndex]);
 83             realDataCount--;
 84
 85             getIndex++;
 86             if (getIndex == queue.length) getIndex = 0;
 87
 88             if (isProviderWait){
 89                 System.out.println("\t\t" + Thread.currentThread().getName() + "线程," +
 90                         " 未 激活生产者条件队列节点前,获取生产者队列长度"
 91                         + reentrantLock.getWaitQueueLength(notFull));
 92             }
 93             notFull.signal();
 94             if (isProviderWait){
 95                 int length = reentrantLock.getWaitQueueLength(notFull);
 96                 if (length == 0){
 97                     isProviderWait = false;
 98                 }
 99                 System.out.println("\t\t" + Thread.currentThread().getName() + "线程," +
100                         " 已 激活生产者条件队列节点后,获取生产者队列长度"
101                         + length);
102             }
103
104         } catch (InterruptedException e) {
105             e.printStackTrace();
106         } finally {
107             System.out.println("\t\t" + Thread.currentThread().getName() + "线程,成功解锁");
108             reentrantLock.unlock();
109         }
110     }
111
112
113
114
115
116
117 }

原文地址:https://www.cnblogs.com/cnblogszs/p/10361385.html

时间: 2024-07-30 21:05:40

java ReentrantLock结合条件队列 实现生产者-消费者模式的相关文章

Java并发(基础知识)—— 阻塞队列和生产者消费者模式

1.阻塞队列 BlockingQueue是线程安全的Queue版本,从它的名字就可以看出,它是一个支持阻塞的Queue实现:当向空BlockingQueue请求数据时,它会阻塞至BlockingQueue非空:当向一个已满BlockingQueue插入数据时,线程会阻塞至BlockingQueue可插入. BlockingQueue 的方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:第一种是抛出一个异常,第二种是返回一个特殊值(null 或 fa

阻塞队列和生产者-消费者模式、DelayQueue

1.ArrayDeque, (数组双端队列) 2.PriorityQueue, (优先级队列) 3.ConcurrentLinkedQueue, (基于链表的并发队列) 4.DelayQueue,                                         (延期阻塞队列)(阻塞队列实现了BlockingQueue接口) 5.ArrayBlockingQueue,           (基于数组的并发阻塞队列) 6.LinkedBlockingQueue,        (基

并发编程—— 阻塞队列和生产者-消费者模式

Java并发编程实践 目录 并发编程—— ConcurrentHashMap 并发编程—— 阻塞队列和生产者-消费者模式 概述 第1部分 为什么要使用生产者和消费者模式 第2部分 什么是生产者消费者模式 第3部分 代码示例 第1部分 为什么要使用生产者和消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费

阻塞队列和生产者-消费者模式

阻塞队列提供了可阻塞的put和take方法.如果队列满了put将阻塞到有空间可用,如果队列为空,take将阻塞到有元素可用.队列可以是有界和无界的,无界的队列put将不会阻塞. 阻塞队列支持生产者消费者模式,该模式将找出需要完成的工作,和执行工作分开.生产者-消费者模式能简化开发过程,因为消除了生产者和消费者之间的代码依赖性,此外,该模式还将生产数据的过程和使用数据的过程解耦开来. 在基于阻塞队列构建的生产者-消费者设计中个,当数据生成时,生产者把数据放入队列,当消费者处理数据时,将从队列中获取

java中多线程通信实例:生产者消费者模式

线程间的通信: 其实就是多个线程再操作同一个资源,但是操作的动作不同   当某个线程进入synchronized块后,共享数据的状态不一定满足该线程的需要,需要其他线程改变共享数据的状态后才能运行,而由于当时线程对共享资源时独占的,它必须解除对共享资源的锁定的状态,通知其他线程可以使用该共享资源. Java中的 wait(),notify(),notifyAll()可以实现线程间的通信. 生产者--消费者问题是典型的线程同步和通信问题 /** * 生产者和消费者问题,生产者生成出产品,消费者去购

Java 并发编程(四)阻塞队列和生产者-消费者模式

阻塞队列 阻塞队列提供了可阻塞的 put 和 take 方法,以及支持定时的 offer 和 poll 方法.如果队列已经满了,那么put方法将阻塞直到有空间可以用:如果队列为空,那么take方法将一直阻塞直到有元素可用.队列可以使有界的,也可以是无界的,无界队列永远都不会充满,因此无界队列上的put方法永远不会阻塞.一种常见的阻塞生产者-消费者模式就是线程池与工作队列的组合,在 Executor 任务执行框架中就体现了这种模式. 意义:该模式能简化开发过程,因为他消除了生产者和消费者类之间的代

Java线程同步与死锁、生产者消费者模式以及任务调度等

一.Thread类基本信息方法 package Threadinfo; public class MyThread implements Runnable{ private boolean flag = true; private int num = 0; @Override public void run() { while(flag) { System.out.println(Thread.currentThread().getName()+"-->"+num++); } }

Java的设计模式(7)— 生产者-消费者模式

生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案.这个模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程.生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务.生产者和消费者之间通过共享内存缓存区进行通信,这样就避免了生产者和消费者直接通信,从而将生产者和消费者解耦.不管是生产高于消费,还是消费高于生产,缓存区的存在可以确保系统的正常运行.这个模式有以下几种角色: 生产者:用于提交用户的请求,提取用户任务,装入内存缓冲区. 消费者:在

阻塞队列实现生产者消费者模式

阻塞队列的特点:当队列元素已满的时候,阻塞插入操作: 当队列元素为空的时候,阻塞获取操作: 生产者线程:Producer 1 package test7; 2 3 import java.util.concurrent.BlockingQueue; 4 5 public class Producer implements Runnable{ 6 7 private final BlockingQueue queue; 8 public Producer(BlockingQueue queue){