用阻塞队列和线程池简单实现生产者和消费者场景

本例子仅仅是博主学习阻塞队列和后的一些小实践,并不是真正的应用场景!

生产者消费者场景是我们应用中最常见的场景,我们可以通过ReentrantLock的Condition和对线程进行wait,notify同通信来实现生产者和消费者场景,前者可以实现多生产者和多消费者模式,后者仅可以实现一生产者,一消费者模式。

今天我们就利用阻塞队列来实现下生产者和消费者模式(里面还利用了线程池)。

看过我关于阻塞队列博文的朋友已经知道,阻塞队列其实就是由ReentrantLock实现的!

场景就不描述了,为简单的多生产者和多消费者!

上代码:

生产线程:

public class ProductThread extends Thread {
    private int taskNum;
    private ArrayBlockingQueue queue;
    public ProductThread(int taskNum,ArrayBlockingQueue queue) {
        this.taskNum = taskNum;
        this.queue = queue;
    }
    public void run() {
        try {
            //模拟生产
            Thread.currentThread().sleep(5000);
            System.out.println("开始生产");
            queue.add(taskNum);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

消费线程:

public class ConsumerThread extends Thread {
    private ArrayBlockingQueue queue;
    public ConsumerThread(ArrayBlockingQueue queue) {
        this.queue = queue;
    }

    public void run() {
        System.out.println("准备消费");
            int taskNum;
            try {
                taskNum = (int) queue.take();
                System.out.println("消费了"+taskNum);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }
}

主线程(主方法):

public class ProductAndConsumer {

    public static void main(String[] args) {
        ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(20);
        //为多生产者和多消费者分别开创的线程池
        ThreadPoolExecutor productPool =
                new ThreadPoolExecutor(10,20,60,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(5),new ThreadPoolExecutor.CallerRunsPolicy());
        ThreadPoolExecutor consumerPool =
                new ThreadPoolExecutor(10,20,60,TimeUnit.MILLISECONDS,new ArrayBlockingQueue(5),new ThreadPoolExecutor.CallerRunsPolicy());

        System.out.println("start");
        for(int i = 0;i<100;i++) {

            productPool.execute(new ProductThread(i,queue));
            consumerPool.execute(new ConsumerThread(queue));
        }
        productPool.shutdown();
        consumerPool.shutdown();
    }
}

上面的代码,我为生产者和消费者分别开了线程池,因为在实际的应用中可能出现生产者和消费者不对等的情况,所以我们应该根据实际情况来设定线程池的参数,以适应不同场景!

在创建线程池的时候要加一个饱和处理的方式,我们上一节写线程池的饱和处理的时候有提及到,如果不加参数,会默认选择抛出异常,我上面选择的饱和策略是将饱和的任务交给调用线程(即主线程)处理,这个也应该根据实际情况来定,我们同样可以实现RejectedExecutionHandler接口,自己来定义这个饱和异常!

以上仅是博主学习自己写的例子,可能在一些情况下考虑的不周到,还请指教!

2018.4.18 21:31

原文地址:https://www.cnblogs.com/MindMrWang/p/8877731.html

时间: 2024-11-02 21:56:52

用阻塞队列和线程池简单实现生产者和消费者场景的相关文章

Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具

锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = false; private Thread lockingThread = null; public synchronized void lock() throws InterruptedExpection { while(isLocked){ wait(); } isLocked = true; loc

阻塞队列和线程池

一.阻塞队列 1.介绍阻塞队列会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒). 2.实现ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小.并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列.LinkedBlockingQueue:基于链表实现的一

基于ThreadPoolExecutor,自定义线程池简单实现

一.线程池作用 在上一篇随笔中有提到多线程具有同一时刻处理多个任务的特点,即并行工作,因此多线程的用途非常广泛,特别在性能优化上显得尤为重要.然而,多线程处理消耗的时间包括创建线程时间T1.工作时间T2.销毁线程时间T3,创建和销毁线程需要消耗一定的时间和资源,如果能够减少这部分的时间消耗,性能将会进一步提高,线程池就能够很好解决问题.线程池在初始化时会创建一定数量的线程,当需要线程执行任务时,从线程池取出线程,当任务执行完成后,线程置回线程池成为空闲线程,等待下一次任务.JDK1.5提供了一个

线程池简单代码

condition.h #ifndef _CONDITION_H_#define _CONDITION_H_ #include <pthread.h> //封装一个互斥量和条件变量作为状态typedef struct condition{ pthread_mutex_t pmutex; pthread_cond_t pcond;}condition_t; //对状态的操作函数int condition_init(condition_t *cond);int condition_lock(con

线程进阶之线程队列、线程池和协程

本节目录: 1.线程队列 2.线程池 3.协程 一.线程队列 线程之间的通信我们列表行不行呢,当然行,那么队列和列表有什么区别呢? queue队列 :使用import queue,用法与进程Queue一样 queue is especially useful in threaded programming when information must be exchanged safely between multiple threads. class queue.Queue(maxsize=0)

基于队列的线程池

import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /**  * Created 

Java线程学习整理--4---一个简单的生产者、消费者模型

 1.简单的小例子: 下面这个例子主要观察的是: 一个对象的wait()和notify()使用情况! 当一个对象调用了wait(),那么当前掌握该对象锁标记的线程,就会让出CPU的使用权,转而进入该对象的等待池中等待唤醒,这里说明一下,每一个对象都有一个独立的等待池和锁池! 等待池:上述的wait()后的线程会进入等待池中,处于下图线程声明周期(简单示意图) 中的这个状态,等待池中的线程任然具有对象的锁标记,但是处于休眠状态,不是可运行状态! 当该对象调用notify方法之后,就会在等待池中系统

使用队列queue实现一个简单的生产者消费者模型

一.生产者消费者模型 我们去超市商店等地购买商品时,我们大部分人都会说自己是消费者,而超市的各大供货商.工厂等,自然而然地也就成了我们的生产者.如此一来,生产者有了,消费者也有了,那么将二者联系起来的超市又该作何理解呢?诚然,它本身是作为一座交易场所而诞生. 上述情形类比到实际的软件开发过程中,经常会发现:某个线程或模块的代码负责生产数据(工厂),而生产出来的数据却不得不交给另一模块(消费者)来对其进行处理,在这之间使用了队列.栈等类似超市的东西来存储数据(超市),这就抽象除了我们的生产者/消费

并发无锁队列学习之二【单生产者单消费者】

1.前言 最近工作比较忙,加班较多,每天晚上回到家10点多了.我不知道自己还能坚持多久,既然选择了就要做到最好.写博客的少了.总觉得少了点什么,需要继续学习.今天继续上个开篇写,介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长