并发队列之:BlockingQueue和ConcurrentLinkedQueue

一.并行和并发区别:

  • 并行:是指两者同时执行一件事。比如赛跑,两个人都在不停的往前跑;

  • 并发:是指资源有限的情况下,两者交替轮流使用资源。比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。

二.什么叫线程安全

  • 线程安全就是说多线程访问同一代码,不会产生不确定的结果。

  • 反之线程不安全就是,多线程在访问同一代码时,会发生不确定因素,例如死锁,数据不一致性等。

三.LinkedBlockingQueue

LinkedBlockingQueue是一个线程安全的阻塞队列,它实现了BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法,这两个方法正是队列操作的阻塞版本。

由于LinkedBlockingQueue实现是线程安全的,实现了先进先出等特性,是作为生产者消费者的首选;LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE;其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

示例代码:

package com.lky.test;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.junit.Test;

/**
 * @Title: testBlockQueue.java
 * @Package com.lky.test
 * @Description:多线程模拟实现生产者消费者模型(阻塞队列)
 * @author lky
 * @date 2015年10月24日 下午5:08:01
 * @version V1.0
 */
public class testBlockQueue {

    private Log log = LogFactory.getLog(testBlockQueue.class);

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定义阻塞队列
     * @author lky
     * @date 2015年10月24日 下午5:07:28
     * @version V1.0
     */
    public class Basket {
        // 队列的最大容量为3
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 如果队列不满,则放入,否则阻塞等待
        public void produce(String apple) throws InterruptedException {
            basket.put(apple);
        }

        // 如果队列不为空,则取出,否则阻塞等待
        public String consumer() throws InterruptedException {
            return basket.take();
        }
    }

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定义生产者
     * @author lky
     * @date 2015年10月24日 下午5:18:17
     * @version V1.0
     */
    public class Produce implements Runnable {
        private Basket basket;
        private String fruit;

        public Produce(String fruit, Basket basket) {
            this.basket = basket;
            this.fruit = fruit;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    log.info("[" + Thread.currentThread().getName() + "]" + "开始生产apple----->" + this.fruit);
                    basket.produce(fruit);
                    log.info("apple生产完毕!!!!");
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                log.error("生产苹果异常!!!!!");
            }
        }
    }

    /**
     * @Title: testBlockQueue.java
     * @Package com.lky.test
     * @Description: 定义消费者
     * @author lky
     * @date 2015年10月24日 下午5:24:31
     * @version V1.0
     */
    public class Consumer implements Runnable {
        private Basket basket;

        public Consumer(Basket basket) {
            this.basket = basket;
        }

        @Override
        public void run() {
            try {
                while (true) {
                    String fruit = basket.consumer();
                    log.info("[" + Thread.currentThread().getName() + "]" + "取到一个水果: " + fruit);
                    Thread.sleep(1000);
                }

            } catch (Exception e) {
                log.error("消费者取苹果异常!!!!");
            }
        }
    }

    @Test
    public void test() {
        System.out.println(Runtime.getRuntime().availableProcessors());//获取当前系统的cpu数目

        Basket basket = new Basket();
        Produce produce1 = new Produce("apple", basket);
        Produce produce2 = new Produce("banna", basket);
        Consumer consumer = new Consumer(basket);

        // 新建一个线程池
        ExecutorService service = Executors.newCachedThreadPool();

        service.submit(produce1);
        service.submit(produce2);
        service.submit(consumer);

        try {
            Thread.sleep(20000);
        } catch (Exception e) {
            log.error("程序异常错误!!!!");
        }
        service.shutdown();
    }
}

四 .ConcurrentLinkedQueue

ConcurrentLinkedQueue是Queue的一个安全实现.Queue中元素按FIFO原则进行排序.采用CAS操作,来保证元素的一致性。

示例代码:

package com.lky.test;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* @Title: testNBlockQueue.java
* @Package com.lky.test
* @Description: 多线程模拟实现生产者消费者模型(非阻塞式队列)
* @author lky
* @date 2015年10月24日 下午8:02:14
* @version V1.0
 */
public class testNBlockQueue {
    private static Log log = LogFactory.getLog(testNBlockQueue.class);
    private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();

    private static int count = 2;
    private static CountDownLatch latch = new CountDownLatch(count);

    private static class Poll implements Runnable {
        @Override
        public void run() {
                while (!queue.isEmpty()) {
                    log.info(Thread.currentThread().getName() + "消费一个商品: " + queue.poll());
                }
                latch.countDown();
        }
    }

    public static void main(String args[]) throws InterruptedException {
        long timeStart = System.currentTimeMillis();
        ExecutorService eService = Executors.newFixedThreadPool(4);

        // 生产商品
        for (int i = 0; i < 100000; ++i) {
            queue.offer(i);
        }

        // 消费者
        for (int i = 0; i < count; ++i) {
            eService.submit(new Poll());
        }

        latch.await();// 使得主线程阻塞,直到latch.getCount()为0
        System.out.println("Cost time: " + (System.currentTimeMillis() - timeStart));
        eService.shutdown();
    }
}

总结:在Java多线程应用中,队列的使用率很高,多数生产消费模型的首选数据结构就是队列(先进先出)。Java提供的线程安全的Queue可以分为阻塞队列和非阻塞队列,其中阻塞队列的典型例子是BlockingQueue,非阻塞队列的典型例子是ConcurrentLinkedQueue,在实际应用中要根据实际需要选用阻塞队列或者非阻塞队列。

时间: 2024-10-08 23:14:57

并发队列之:BlockingQueue和ConcurrentLinkedQueue的相关文章

Java并发包——Blockingqueue,ConcurrentLinkedQueue,Executors

背景 通过做以下一个小的接口系统gate,了解一下mina和java并发包里的东西.A系统为javaweb项目,B为C语言项目,gate是本篇须要完毕的系统. 需求 1. A为集群系统,并发较高,会批量发送给gate消息,而且接受gate返回的消息. 2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B.须要保证性能: 3. B负责处理消息,并返回处理结果,B为gate提供提供六个port,一个port可有三个长连接(须由gate发送心跳保持长

项目积累——Blockingqueue,ConcurrentLinkedQueue,Executors

背景 通过做下面一个小的接口系统gate,了解一下mina和java并发包里的东西.A系统为javaweb项目,B为C语言项目,gate是本篇需要完成的系统. 需求 1. A为集群系统,并发较高,会批量发送给gate消息,并且接受gate返回的消息: 2. gate独立部署,将从A接受到的消息压入队列,与B建立连接后,将每条消息验证签名等工作后,发送给B,需要保证性能: 3. B负责处理消息,并返回处理结果,B为gate提供提供六个端口,一个端口可有三个长连接(须由gate发送心跳保持长连接,否

阻塞队列和并发队列

在并发编程中我们有时候需要使用线程安全的队列.要实现一个线程安全的队列有两种实现方式:一种是使用阻塞算法,另一种是使用非阻塞算法. 使用阻塞算法的队列可以用一个锁(入队和出队用同一把锁)或两个锁(入队和出队用不同的锁)等方式来实现,其中阻塞队列的典型是:BlockingQueue; 非阻塞的实现方式则可以使用循环CAS的方式来实现,非阻塞队列的典型例子是ConcurrentLinkedQueue. 注:并行与并发的区别: 1.并行是指两者同时执行一件事,比如赛跑,两个人都在不停的往前跑: 2.并

多线程编程:阻塞、并发队列的使用总结

最近,一直在跟设计的任务调度模块周旋,目前终于完成了第一阶段的调试.今天,我想借助博客园平台把最近在设计过程中,使用队列和集合的一些基础知识给大家总结一下,方便大家以后直接copy.本文都是一些没有技术含量的东西,只是做个总结,牛哥还请绕路. 老习惯,还是先跟各位纸上谈会儿兵,首先说说队列,他主要分为并发队列和阻塞队列,在多线程业务场景中使用最为普遍,我就主要结合我所做过的业务谈谈我对它们的看法,关于它们的API和官方解释就不提了. 并发队列 并发队列:最常见的业务场景就是多个线程共享同一个队列

并发队列

Java队列中存在有界和无界2种,区别在于有界就是有限制的,无界是无限的. 在并发中存在阻塞和非阻塞. 阻塞:生产者写入队列慢的时候就会进入阻塞,等待消费者消费.当一个线程试图对一个空队列进行出队列操作时,它将会被阻塞,除非有另一个线程进行了入队列操作. 第一:ConcurrentLinkedQueue  ConcurrentLinkedQueue :是一个适用于高并发场景下的队列,通过无锁的方式,实现 了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于Blockin

java并发包提供的三种常用并发队列实现

java并发包中提供了三个常用的并发队列实现,分别是:ConcurrentLinkedQueue.LinkedBlockingQueue和ArrayBlockingQueue. ConcurrentLinkedQueue使用的是CAS原语无锁队列实现,是一个异步队列,入队速度很快,出队进行了加锁,性能稍慢: LinkedBlockingQueue也是阻塞队列,入队和出队都用了加锁,当队空的时候线程会暂时阻塞:当队空的时候线程会暂时阻塞: ArrayBlockingQueue是初始容器固定的阻塞队

看秒杀系统的时候看到的关于并发队列的介绍,摘抄如下

并发队列的选择 Java的并发包提供了三个常用的并发队列实现,分别是:ArrayBlockingQueue.ConcurrentLinkedQueue 和 LinkedBlockingQueue  . ArrayBlockingQueue是初始容量固定的阻塞队列,我们可以用来作为数据库模块成功竞拍的队列,比如有10个商品,那么我们就设定一个10大小的数组队列. ConcurrentLinkedQueue使用的是CAS原语无锁队列实现,是一个异步队列,入队的速度很快,出队进行了加锁,性能稍慢. L

秒杀系统:并发队列 接口设计 并发请求数据安全处理

看秒杀系统的时候看到的关于并发队列的介绍,摘抄如下 并发队列的选择 Java的并发包提供了三个常用的并发队列实现,分别是:ArrayBlockingQueue.ConcurrentLinkedQueue 和 LinkedBlockingQueue  . ArrayBlockingQueue是初始容量固定的阻塞队列,我们可以用来作为数据库模块成功竞拍的队列,比如有10个商品,那么我们就设定一个10大小的数组队列. ConcurrentLinkedQueue使用的是CAS原语无锁队列实现,是一个异步

19.并发容器之BlockingQueue

1. BlockingQueue简介 在实际编程中,会经常使用到JDK中Collection集合框架中的各种容器类如实现List,Map,Queue接口的容器类,但是这些容器类基本上不是线程安全的,除了使用Collections可以将其转换为线程安全的容器,Doug Lea大师为我们都准备了对应的线程安全的容器,如实现List接口的CopyOnWriteArrayList(关于CopyOnWriteArrayList可以看这篇文章),实现Map接口的ConcurrentHashMap(关于Con

同步,异步,串行队列,并发队列,全局队列,主队列等概念的总结

同步,异步,串行队列,并发队列,全局队列,主队列等概念的总结 在GCD函数中, 我们常常碰到同步,异步,串行队列,并发队列,全局队列,主队列等概念,而这些概念又常常组合在一起, 十分头疼, 这篇文章就来梳理一下这些烦人的概念. 不想看长篇大论的, 直接看文章末尾的表格即可! 在此之前, GCD中还涉及到两个十分重要的概念, 就是任务和队列 任务(Task): 你需要执行的操作 队列(Queue): 存放任务的容器 GCD中两个重要的函数, 一个同步执行, 一个异步执行 dispatch_asyn