java 可伸缩阻塞队列实现

最近一年多写的最虐心的代码。必须好好复习java并发了。搞了一晚上终于测试都跑通过了,特此纪念,以资鼓励!

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 实现可调整大小的阻塞队列,支持数据迁移平衡reader,writer读取吸入速度,达到最大吞吐
 * @author hanjie
 *
 */
public class RecordBuffer {

    public static final Record CLOSE_RECORD = new Record() {

        @Override
        public Object getColumnValue(String columnName) {
            // TODO Auto-generated method stub
            return null;
        }
    };

    public static final Record SWITCH_QUEUE_RECORD = new Record() {

        @Override
        public Object getColumnValue(String columnName) {
            // TODO Auto-generated method stub
            return null;
        }
    };

    public Lock switchingQueueLock = new ReentrantLock();
    public Condition readerSwitched = switchingQueueLock.newCondition();
    public Condition writerSwitched = switchingQueueLock.newCondition();
    public Condition switchFinished = switchingQueueLock.newCondition();

    public volatile boolean readerSwitchSuccess = true;
    public volatile boolean writerSwitchSuccess = true;
    public volatile boolean switchingQueue = false;
    public volatile boolean closed = false;
    private volatile ArrayBlockingQueue<Record> queue;
    private TaskCounter taskCounter;

    public RecordBuffer(TaskCounter taskCounter, int size) {
        this.queue = new ArrayBlockingQueue<Record>(size);
        this.taskCounter = taskCounter;
    }

    public void resize(int newSize) {
        try {

            if(closed){
                return;
            }

            switchingQueueLock.lock();
            try {
                //double check下,要不可能writer收到CLOSED_record已经 退出了。writerSwitched.await() 会hang住
                if(closed){
                    return;
                }
                this.switchingQueue = true;

                ArrayBlockingQueue<Record> oldQueue = queue;
                queue = new ArrayBlockingQueue<Record>(newSize);
                this.readerSwitchSuccess = false;
                this.writerSwitchSuccess = false;

                //先拯救下writer,可能writer刚好阻塞到take上,失败也没关系,说明老队列不空,writer不会阻塞到take
                oldQueue.offer(SWITCH_QUEUE_RECORD);

                while (!writerSwitchSuccess) {
                    writerSwitched.await();
                }
                //writer先切换队列,然后reader可能阻塞在最后一个put上,清空下老队列拯救reader,让它顺利醒来
                transferOldQueueRecordsToNewQueue(oldQueue);

                while (!readerSwitchSuccess) {
                    readerSwitched.await();
                }
                //前面的清空,刚好碰到reader要put最后一个,非阻塞式清空动作就有残留最后一个put
                transferOldQueueRecordsToNewQueue(oldQueue);

                this.switchingQueue = false;
                this.switchFinished.signalAll();

            } finally {
                switchingQueueLock.unlock();
            }

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void transferOldQueueRecordsToNewQueue(ArrayBlockingQueue<Record> oldQueue)
            throws InterruptedException {
        List<Record> oldRecords = new ArrayList<Record>(oldQueue.size());
        Record record = null;
        while ((record = oldQueue.poll()) != null) {
            oldRecords.add(record);
        }
        // 转移老队列剩下的记录到新队列
        for (int i = 0; i < oldRecords.size(); i++) {
            queue.put(oldRecords.get(i));
        }
    }

    public void close() {
        this.closed = true;
        switchingQueueLock.lock();
        try {
            //如果正在切换队列, 等切换做完才能,发送最后一个CLOSE
            while (switchingQueue) {
                switchFinished.await();
            }

            this.queue.put(CLOSE_RECORD);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        finally{
            switchingQueueLock.unlock();
        }
    }

    public void put(Record record) {
        try {

            if (!queue.offer(record)) {
                taskCounter.incrBufferFullCount();
                if (!readerSwitchSuccess) {
                    notifyReaderSwitchSuccess();
                }
                queue.put(record);
            }
            taskCounter.incrReadCount();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void notifyReaderSwitchSuccess() {
        System.out.println("reader switch");
        switchingQueueLock.lock();
        try {
            readerSwitchSuccess = true;
            readerSwitched.signalAll();
        } finally {
            switchingQueueLock.unlock();
        }
    }

    public Record take() {
        try {

            Record record = queue.poll();
            //如果拿到了切换记录,则切换队列重试
            if(record == SWITCH_QUEUE_RECORD){
                if (!writerSwitchSuccess) {
                    notifyWriterSwitchSuccess();
                }
                record = queue.poll();
            }

            if (record == null) {
                taskCounter.incrBufferEmptyCount();

                //调用take先检查是否正在切换,保证拿到新的队列
                if (!writerSwitchSuccess) {
                    notifyWriterSwitchSuccess();
                }
                record = queue.take();
                //如果很不幸刚好在take阻塞时候,切换,只能发送一个切换记录将其唤醒
                if(record == SWITCH_QUEUE_RECORD){
                    if (!writerSwitchSuccess) {
                        notifyWriterSwitchSuccess();
                    }
                    record = queue.take();
                }
            }
            if (record == CLOSE_RECORD) {
                if (!writerSwitchSuccess) {
                    notifyWriterSwitchSuccess();
                }
                return null;
            }
            taskCounter.incrWriteCount();
            return record;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void notifyWriterSwitchSuccess() {

        System.out.println("writer switch");
        switchingQueueLock.lock();
        try {
            writerSwitchSuccess = true;
            writerSwitched.signalAll();
        } finally {
            switchingQueueLock.unlock();
        }

    }

}
时间: 2024-10-06 22:40:31

java 可伸缩阻塞队列实现的相关文章

Java多线程 阻塞队列和并发集合

转载:大关的博客 Java多线程 阻塞队列和并发集合 本章主要探讨在多线程程序中与集合相关的内容.在多线程程序中,如果使用普通集合往往会造成数据错误,甚至造成程序崩溃.Java为多线程专门提供了特有的线程安全的集合类,通过下面的学习,您需要掌握这些集合的特点是什么,底层实现如何.在何时使用等问题. 3.1 BlockingQueue接口 java阻塞队列应用于生产者消费者模式.消息传递.并行任务执行和相关并发设计的大多数常见使用上下文. BlockingQueue在Queue接口基础上提供了额外

Java多线程——阻塞队列

现在,通过前几篇的总结,我们对Java多线程已经有所了解了,但是它们都是一些Java并发程序设计基础的底层构建块.对于实际编程来说,我们应该尽可能的远离底层结构.使用那些由并发处理的专业人士实现的较高层次的结构要方便的多,安全的多. 阻塞队列 对于许多线程问题.可以通过使用一个或多个队列以优雅且安全的方式将其形式化.生产者线程向队列插入元素,消费者线程则取出他们.使用队列,可以安全地从一个线程向另一个线程传递数据. 阻塞队列的方法 方法 正常动作 特殊情况下动作 add 添加一个元素 如果队列满

Java中阻塞队列的使用

http://blog.csdn.net/qq_35101189/article/details/56008342 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所

java 多线程阻塞队列 与 阻塞方法与和非阻塞方法

Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾.队列都是线程安全的,内部已经实现安全措施,不用我们担心 Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDK API就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如Link

29、java中阻塞队列

阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞.试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素.同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,如从队列中移除一个或者多个元素,或者完全清空队列,下图展示了如何通过阻塞队列来合作: 线程1往阻塞队列中添加元素,而线程2从阻塞队列中移除元素 使用BlockingQueue的关键技术点如

Java实现阻塞队列的两种方式

方式一:/** * 使用非阻塞队列PriorityQueue及wait/notify方法实现一个阻塞队列**/class MyBlockingQueue {    public final static int queueSize = 10;    public static final PriorityQueue<Integer> queue = new PriorityQueue();}    class Producer extends Thread {    public void r

JAVA实现阻塞队列

package 多线程并发; import java.util.Stack; /** * Created by z84102272 on 2018/7/17. */ public class BlockQueueImpl { private final static Object pushLock = new Object(); //push的锁 private final static Object popLock = new Object(); //pop的锁 private Stack<O

java 阻塞队列 LinkedBlockingQueue ArrayBlockingQueue 分析

BlockingQueue是阻塞队列接口类,该接口继承了Queue接口 BlockingQueue实现类常见的有以下几种. ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里.有界也就意味着,它不能够存储无限多数量的元素.它有一个同一时间能够存储元素数量的上限.你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改). D

java并发之阻塞队列LinkedBlockingQueue与ArrayBlockingQueue

Java中阻塞队列接口BlockingQueue继承自Queue接口,并提供put.take阻塞方法.两个主要的阻塞类实现是ArrayBlockingQueue和LinkedBlockingQueue.阻塞队列的主要方法 public interface BlockingQueue<E> extends Queue<E> { //将指定的元素插入到此队列的尾部(如果立即可行且不会超过该队列的容量) //在成功时返回 true,如果此队列已满,则抛IllegalStateExcept