[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例

在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。

Toast 被生产和消费的对象。

ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。

Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。

Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。

Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。

ThreadHelper 工具类,用于输出线程相关信息。

ProducerConsumerDemo 演示这个场景

代码实现:

Toast 实现

public class Toast {

    private int id;

    public Toast(int id){
        this.id = id;
    }

    public String toString(){
        return " toast#" + id;
    }
}

ToastQueue 实现

import java.util.concurrent.LinkedBlockingQueue;

public class ToastQueue extends LinkedBlockingQueue<Toast> {
    private static final long serialVersionUID = 1L;
}

Producer 循环生产 Toast

import java.util.concurrent.TimeUnit;

public class Producer implements Runnable {

    private ToastQueue toastQueue;
    private int count;

    public Producer(ToastQueue toastQueue){
        this.toastQueue = toastQueue;
        this.count = 0;
    }

    @Override
    public void run() {
        try {
            while (true){
                TimeUnit.MILLISECONDS.sleep(100);

                Toast toast = new Toast(count);
                count++;
                toastQueue.put(toast);
                ThreadHelper.print(" produced " + toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。

import java.util.concurrent.TimeUnit;

public class Processor implements Runnable {

    private ToastQueue initialToastQ;
    private ToastQueue finishedToastQ;

    public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){
        this.initialToastQ = initialToastQ;
        this.finishedToastQ = finishedToastQ;
    }

    @Override
    public void run() {
        try {
            while (true){
                Toast toast = initialToastQ.take();

                ThreadHelper.print(" processing " + toast);

                TimeUnit.MILLISECONDS.sleep(180);

                finishedToastQ.put(toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Consumer 消耗 Toast

public class Consumer implements Runnable {

    private ToastQueue finishedToastQ;

    public Consumer(ToastQueue finishedToastQ){
        this.finishedToastQ = finishedToastQ;
    }

    @Override
    public void run() {
        try {
            while (true){
                Toast toast = finishedToastQ.take();
                ThreadHelper.print(" consumed " + toast);
            }
        }catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

ThreadHelper 线程帮助类

public class ThreadHelper {
    public static void print(String msg){
        System.out.println("[" + Thread.currentThread().getName() + " ] " + msg);
    }
}

演示烤面包的生产、加工、消费的场景

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerDemo {

    public static void main() throws InterruptedException{

        ToastQueue initialToastQ = new ToastQueue();
        ToastQueue finishedToastQ = new ToastQueue();

        ExecutorService exec = Executors.newCachedThreadPool();
        exec.execute(new Producer(initialToastQ));
        exec.execute(new Processor(initialToastQ, finishedToastQ));
        exec.execute(new Consumer(finishedToastQ));

        TimeUnit.SECONDS.sleep(2);
        exec.shutdownNow();
    }
}

输出结果:

[pool-1-thread-2 ]  processing  toast#0
[pool-1-thread-1 ]  produced  toast#0
[pool-1-thread-1 ]  produced  toast#1
[pool-1-thread-2 ]  processing  toast#1
[pool-1-thread-3 ]  consumed  toast#0
[pool-1-thread-1 ]  produced  toast#2
[pool-1-thread-1 ]  produced  toast#3
[pool-1-thread-2 ]  processing  toast#2
[pool-1-thread-3 ]  consumed  toast#1
[pool-1-thread-1 ]  produced  toast#4
[pool-1-thread-1 ]  produced  toast#5
[pool-1-thread-2 ]  processing  toast#3
[pool-1-thread-3 ]  consumed  toast#2
[pool-1-thread-1 ]  produced  toast#6
[pool-1-thread-1 ]  produced  toast#7
[pool-1-thread-2 ]  processing  toast#4
[pool-1-thread-3 ]  consumed  toast#3
[pool-1-thread-1 ]  produced  toast#8
[pool-1-thread-2 ]  processing  toast#5
[pool-1-thread-3 ]  consumed  toast#4
[pool-1-thread-1 ]  produced  toast#9
[pool-1-thread-1 ]  produced  toast#10
[pool-1-thread-2 ]  processing  toast#6
[pool-1-thread-3 ]  consumed  toast#5
[pool-1-thread-1 ]  produced  toast#11
[pool-1-thread-1 ]  produced  toast#12
[pool-1-thread-2 ]  processing  toast#7
[pool-1-thread-3 ]  consumed  toast#6
[pool-1-thread-1 ]  produced  toast#13
[pool-1-thread-1 ]  produced  toast#14
[pool-1-thread-2 ]  processing  toast#8
[pool-1-thread-3 ]  consumed  toast#7
[pool-1-thread-1 ]  produced  toast#15
[pool-1-thread-1 ]  produced  toast#16
[pool-1-thread-2 ]  processing  toast#9
[pool-1-thread-3 ]  consumed  toast#8
[pool-1-thread-1 ]  produced  toast#17
[pool-1-thread-2 ]  processing  toast#10
[pool-1-thread-3 ]  consumed  toast#9
[pool-1-thread-1 ]  produced  toast#18
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at concurrencyProducerConsumer.Consumer.run(Consumer.java:15)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at concurrencyProducerConsumer.Producer.run(Producer.java:19)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at concurrencyProducerConsumer.Processor.run(Processor.java:24)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

参考资料

Page 868, Produer-consumers and queue, Thinking in Java

时间: 2024-10-01 05:18:46

[Java Concurrent] 多线程合作 producer-consumers / queue 的简单案例的相关文章

java中多线程模拟(多生产,多消费,Lock实现同步锁,替代synchronized同步代码块)

import java.util.concurrent.locks.*; class DuckMsg{ int size;//烤鸭的大小 String id;//烤鸭的厂家和标号 DuckMsg(){ } DuckMsg(int size, String id){ this.size=size; this.id=id; } public String toString(){ return id + " 大小为:" + size; } } class Duck{ private int

Java基础--多线程的方方面面

1,什么是线程?线程和进程的区别是什么? 2,什么是多线程?为什么设计多线程? 3,Java种多线程的实现方式是什么?有什么区别? 4,线程的状态控制有哪些方法? 5,线程安全.死锁和生产者--消费者 6,线程的优化有哪些方法? 1,什么是线程?线程和进程的区别是什么? 线程是程序执行的最小单元. 区别: 进程是操作系统进行资源处理和分配的最小单位,而一个进程可以包含多个线程,并共享进程的资源. 2,什么是多线程?为什么设计多线程? 介绍之前,我们需要理解并行和并发的定义: 并行:同一个时刻有多

Java Concurrent

Java Concurrent ExecutorService ExecutorService exec = Executors.newCachedThreadPool(); // create a cached pool ExecutorService exec = Executors.newFixedThreadPool(4); // fixed sized thread pool ExecutorService exec = Executors.newSingleThreadExecuto

Java的多线程编程模型5--从AtomicInteger开始

Java的多线程编程模型5--从AtomicInteger开始 2011-06-23 20:50 11393人阅读 评论(9) 收藏 举报 java多线程编程jniinteger测试 AtomicInteger,一个提供原子操作的Integer的类.在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字.而AtomicInteger则通过一种线程安全的加减操作接口. 来看AtomicInteger提供的接口. //获取当前的值 publ

[Java] 转:多线程 (并发)总结

一概念 二创建多线程方法 三线程常用方法不完整可以自己查阅JDK文档 四线程的生命周期与转换 五同步 六竞争者消费者 七线程池 八JDK 线程工具 线程基础: 1. 创建 2. 状态切换 3. sleep与wait的区别 前者使线程阻塞固定时间后进入Runnable状态,后者使用notify后可以处于可执行状态. 4. synchroized 与 Lock 区别 synchroized 可以针对当前对象.某变量设置相应的对象锁 lock 控制粒度更细,使用ReentrantLook.look()

使用Java开发多线程端口扫描工具(二)

一 介绍 这一篇文章是紧接着上一篇文章(http://www.zifangsky.cn/2015/12/使用java开发多线程端口扫描工具/)写的,端口扫描的原理不用多少,我在上一篇文章中已经说过了,至于目的大家都懂得.在这一篇文章里,我主要是对端口扫描工具的继续完善,以及写出一个比较直观的图形界面出来,以方便我们测试使用.界面如下: 这个工具主要是实现了以下几点功能:(1)两种扫描方式,一种是只扫描常见端口,另一种是设置一个起始和结束端口,依次探测.当然,原理很简单,用for循环就可以了:(2

java concurrent之前戏synchronized

对于多线程共享资源的情况需要进行同步,以避免一个线程的改动被另一个线程的改动所覆盖.最普遍的同步方式就是synchronized.把代码声明为synchronized,有两个重要后果,通常是指该代码具有 原子性(atomicity)和 可见性(visibility). 1.原子性强调的是执行,意味着个时刻,只有一个线程能够执行一段代码,这段代码通过一个monitor object保护.从而防止多个线程在更新共享状态时相互冲突. 2.可见性强调的是结果,它要对付内存缓存和编译器优化的各种反常行为.

java socket 多线程网络传输多个文件

http://blog.csdn.net/njchenyi/article/details/9072845 java socket 多线程网络传输多个文件 2013-06-10 21:26 3596人阅读 评论(1) 收藏 举报  分类: JAVA(158)  由于需要研究了下用 java socket 传输文件,由于需要传输多个文件,因此,采用了多线程设计.客户端每个线程创建一个 socket 连接,每个 socket 连接负责传输一个文件,服务端的ServerSocket每次 accept

Java的多线程机制:缓存一致性和CAS

Java的多线程机制:缓存一致性和CAS 一.总线锁定和缓存一致性 这是两个操作系统层面的概念.随着多核时代的到来,并发操作已经成了很正常的现象,操作系统必须要有一些机制和原语,以保证某些基本操作的原子性,比如处理器需要保证读一个字节或写一个字节是原子的,那么它是如何实现的呢?有两种机制:总线锁定和缓存一致性. 我们知道,CPU和物理内存之间的通信速度远慢于CPU的处理速度,所以CPU有自己的内部缓存,根据一些规则将内存中的数据读取到内部缓存中来,以加快频繁读取的速度.我们假设在一台PC上只有一