七、curator recipes之阻塞队列SimpleDistributedQueue

简介

Java在单机环境实现了BlockQueue阻塞队列,与之类似的curator实现了分布式场景下的阻塞队列,SimpleDistributedQueue

官方文档:http://curator.apache.org/curator-recipes/simple-distributed-queue.html

javaDoc:http://curator.apache.org/apidocs/org/apache/curator/framework/recipes/queue/SimpleDistributedQueue.html

注意:zookeeper虽然可以实现队列,但是官方并不推荐使用zookeeper来做队列,具体请参考官网

代码示例

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.queue.SimpleDistributedQueue;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class SimpleQueueDemo {
    private static CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181", new ExponentialBackoffRetry(3000, 2));
    private static String path = "/queue/path001";
    public static void main(String[] args) throws InterruptedException {
        client.start();
        System.out.println("started");
        SimpleDistributedQueue queue = new SimpleDistributedQueue(client, path);
        new Thread(() -> {
            try {
                System.out.println("sleeping");
                Thread.sleep(3000);
                System.out.println("sleep end");
                new SimpleDistributedQueue(client, path).offer("lay".getBytes("utf-8"));
                System.out.println("offered");
            } catch (Exception e) {
                System.out.println("exception");
                e.printStackTrace();
            }
        }).start();
        System.out.println("polling");
        String data = null;
        try {
            data = new String(queue.take());
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("data=" + data);
        client.close();
    }
}

输出结果

started
polling
sleeping
sleep end
offered
data=lay

主线程会阻塞直到offer了数据

原文地址:https://www.cnblogs.com/lay2017/p/10264503.html

时间: 2024-10-20 18:53:01

七、curator recipes之阻塞队列SimpleDistributedQueue的相关文章

聊聊并发(七)——Java中的阻塞队列

1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出 插入方法 add(e) offer(e) put

聊聊并发(七)Java中的阻塞队列

什么是阻塞队列 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Queue

并发容器(三)非阻塞队列的并发容器

??本文将介绍除了阻塞队列外的并发容器: ConcurrentHashMap.CopyOnWriteArrayList.CopyOnWriteArraySet.ConcurrentSkipListMap.ConcurrentSkipListSet.ConcurrentLinkedQueue: 1. CopyOnWriteArrayList 是 ArrayList 的线程安全的实现,同时也可用于代替 Vector .底层实现是一个数组,其中所有可变操作(add.set 等等)都是通过对底层数组进行

并发容器(二)阻塞队列详细介绍

1. 什么是阻塞队列? 阻塞队列(BlockingQueue) 是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 2. Java里的阻塞队列 JDK7提供了7个阻塞队列.分别是 ArrayBlockingQueue : 一个由数组结构组成的有界阻塞

JAVA多线程提高十二:阻塞队列应用

一.类相关属性 接口BlockingQueue<E>定义: public interface BlockingQueue<E> extends Queue<E> { boolean add(E e); boolean offer(E e); void put(E e) throws InterruptedException; boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedExcepti

阻塞队列和线程池

一.阻塞队列 1.介绍阻塞队列会对当前线程产生阻塞,比如一个线程从一个空的阻塞队列中取元素,此时线程会被阻塞直到阻塞队列中有了元素.当队列中有元素后,被阻塞的线程会自动被唤醒(不需要我们编写代码去唤醒). 2.实现ArrayBlockingQueue:基于数组实现的一个阻塞队列,在创建ArrayBlockingQueue对象时必须制定容量大小.并且可以指定公平性与非公平性,默认情况下为非公平的,即不保证等待时间最长的队列最优先能够访问队列.LinkedBlockingQueue:基于链表实现的一

BlockingQueu 阻塞队列

java.util.concurrent public interface BlockingQueue<E> extends Queue<E> 简介 当阻塞队列插入数据时: 如果队列已经满了,线程则会阻塞等待队列中元素被取出后在插入. 当从阻塞队列中取数据时,如果队列是空的,则线程会阻塞等待队列中有新元素. 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列: 这两个附加的操作是: 在队列为空时,获取元素的线程会等待队列变为非空. 当队列满时,存储元素的线程会等待队

Callable,阻塞队列,线程池问题

一.说说Java创建多线程的方法 1. 通过继承Thread类实现run方法   2. 通过实现Runnable接口 3. 通过实现Callable接口 4. 通过线程池获取 二. 可以写一个Callable的案例吗?如何调用Callable接口 /*是一个带返回值的多线程类,如果需要有线程返回的结果,就需要使用此类*/ class MyThread implements Callable<Integer> { @Override public Integer call() { return

caffe数据读取的双阻塞队列说明

caffe的datareader类中 class QueuePair { public: explicit QueuePair(int size); ~QueuePair(); BlockingQueue<T*> free_; BlockingQueue<T*> full_; DISABLE_COPY_AND_ASSIGN(QueuePair); }; 这个就是双阻塞队列,先将free队列填充到最大长度,然后按照如下规则: 1,每当生产者push时,先将full队列pop,如果fu