同时开10个线程存入和取出100万的数据,结论如下:
DoubleBufferedQueue < ConcurrentLinkedQueue < ArrayBlockingQueue < LinkedBlockingQueue
执行结果如下:
100万 DoubleBufferedQueue入队时间:9510 出队时间:10771
100万 DoubleBufferedQueue入队时间:8169 出队时间:9789
1000万 DoubleBufferedQueue入队时间:98285 出队时间:101088
1000万 DoubleBufferedQueue入队时间:101859 出队时间:105964
100万 ConcurrentLinkedQueue入队时间:10557 出队时间:13716
100万 ConcurrentLinkedQueue入队时间:25298 出队时间:25332
1000万 ConcurrentLinkedQueue队列时间:121868 出队时间:136116
1000万 ConcurrentLinkedQueue队列时间:134306 出队时间:147893
100万 ArrayBlockingQueue入队时间:21080 出队时间:22025
100万 ArrayBlockingQueue入队时间:17689 出队时间:19654
1000万 ArrayBlockingQueue入队时间:194400 出队时间:205968
1000万 ArrayBlockingQueue入队时间:192268 出队时间:197982
100万 LinkedBlockingQueue入队时间:38236 出队时间:52555
100万 LinkedBlockingQueue入队时间:30646 出队时间:38573
1000万 LinkedBlockingQueue入队时间:375669 出队时间:391976
1000万 LinkedBlockingQueue入队时间:701363 出队时间:711217
doubleBufferedQueue:
package test.MoreThread.d; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import test.MoreThread.l.linkedBlockingQueue; import comrt.util.DoubleBufferedQueue; //DoubleBufferedQueue入队时间:9510 出队时间:10771 //DoubleBufferedQueue入队时间:8169 出队时间:9789 public class doubleBufferedQueue { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); public final static int size1 = 1000000; public static DoubleBufferedQueue<Object> queue = new DoubleBufferedQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecDoubleBufferedQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } doubleBufferedQueue.isOver = true; log.info("入队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecDoubleBufferedQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); } } class ExecDoubleBufferedQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < doubleBufferedQueue.size1; i++) { doubleBufferedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } } class ExecDoubleBufferedQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(doubleBufferedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!doubleBufferedQueue.isOver) { doubleBufferedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } }
concurrentLinkedQueue:
package test.MoreThread.c; import java.util.ArrayList; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //ConcurrentLinkedQueue入队时间:10557 出队时间:13716 //ConcurrentLinkedQueue入队时间:25298 出队时间:25332 public class concurrentLinkedQueue { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); public static ConcurrentLinkedQueue<Object> queue = new ConcurrentLinkedQueue<Object>(); public final static int size1 = 1000000; public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService.submit(new Exec()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } concurrentLinkedQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new Exec_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); } } class Exec implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < concurrentLinkedQueue.size1; i++) { concurrentLinkedQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } } class Exec_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(concurrentLinkedQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!concurrentLinkedQueue.isOver) { concurrentLinkedQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } }
arrayBlockingQueue:
package test.MoreThread.a; import java.util.ArrayList; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //ArrayBlockingQueue入队时间:21080 出队时间:22025 //ArrayBlockingQueue入队时间:17689 出队时间:19654 public class arrayBlockingQueue { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); public final static int size1 = 1000000; public static ArrayBlockingQueue<Object> queue = new ArrayBlockingQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { // long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecArrayBlockingQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } arrayBlockingQueue.isOver = true; log.info("队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecArrayBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); } } class ExecArrayBlockingQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < arrayBlockingQueue.size1; i++) { arrayBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } } class ExecArrayBlockingQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(arrayBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!arrayBlockingQueue.isOver) { arrayBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } }
linkedBlockingQueue:
package test.MoreThread.l; import java.util.ArrayList; import java.util.Vector; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //LinkedBlockingQueue入队时间:38236 出队时间:52555 //LinkedBlockingQueue入队时间:30646 出队时间:38573 public class linkedBlockingQueue { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); public final static int size1 = 1000000; public static LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>( size1); public final static int threadNumber = 10; public static boolean isOver = false; public static void main(String[] args) throws InterruptedException, ExecutionException { long timestart = System.currentTimeMillis(); Thread thread1 = new Thread(new Runnable() { public void run() { ExecutorService executorService = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService .submit(new ExecLinkedBlockingQueue()); results.add(future); } long allTime = 0; for (Future<Long> fs : results) { try { allTime += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService.shutdown(); } } linkedBlockingQueue.isOver = true; log.info("入队列总共执行时间:" + allTime); } }); thread1.start(); // log.info("主线程执行时间:" + (System.currentTimeMillis() - timestart)); // System.out.println(linkedBlockingQueue.queue.size()); // ------------------------------ Thread thread2 = new Thread(new Runnable() { public void run() { ExecutorService executorService2 = Executors .newFixedThreadPool(threadNumber); ArrayList<Future<Long>> results_out = new ArrayList<Future<Long>>(); for (int i = 0; i < threadNumber; i++) { Future<Long> future = executorService2 .submit(new ExecLinkedBlockingQueue_Out()); results_out.add(future); } long allTime_out = 0; for (Future<Long> fs : results_out) { try { allTime_out += fs.get(); // log.info("" + fs.get()); } catch (InterruptedException e) { log.info("" + e); return; } catch (ExecutionException e) { log.info("" + e); } finally { executorService2.shutdown(); } } log.info("出队列总共执行时间:" + allTime_out); } }); thread2.start(); } } class ExecLinkedBlockingQueue implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); for (int i = 0; i < linkedBlockingQueue.size1; i++) { linkedBlockingQueue.queue.offer(i); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } } class ExecLinkedBlockingQueue_Out implements Callable<Long> { private static final Logger log = LoggerFactory .getLogger(linkedBlockingQueue.class); @Override public Long call() throws Exception { long time = System.currentTimeMillis(); while (!linkedBlockingQueue.isOver) { linkedBlockingQueue.queue.poll(); } long time2 = System.currentTimeMillis() - time; // log.info("执行时间:" + time2); return time2; } }
DoubleBufferedQueue双缓冲队列
package comrt.util; import java.util.AbstractQueue; import java.util.Collection; import java.util.Iterator; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; //双缓冲队列,线程安全 public class DoubleBufferedQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = 1011398447523020L; public static final int DEFAULT_QUEUE_CAPACITY = 5000000; public static final long DEFAULT_MAX_TIMEOUT = 0; public static final long DEFAULT_MAX_COUNT = 10; private Logger logger = LoggerFactory.getLogger(DoubleBufferedQueue.class.getName()); /** The queued items */ private ReentrantLock readLock; // 写锁 private ReentrantLock writeLock; // 是否满 private Condition notFull; private Condition awake; // 读写数组 private transient E[] writeArray; private transient E[] readArray; // 读写计数 private volatile int writeCount; private volatile int readCount; // 写数组下标指针 private int writeArrayTP; private int writeArrayHP; // 读数组下标指针 private int readArrayTP; private int readArrayHP; private int capacity; public DoubleBufferedQueue(int capacity) { // 默认 this.capacity = DEFAULT_QUEUE_CAPACITY; if (capacity > 0) { this.capacity = capacity; } readArray = (E[]) new Object[capacity]; writeArray = (E[]) new Object[capacity]; readLock = new ReentrantLock(); writeLock = new ReentrantLock(); notFull = writeLock.newCondition(); awake = writeLock.newCondition(); } private void insert(E e) { writeArray[writeArrayTP] = e; ++writeArrayTP; ++writeCount; } private E extract() { E e = readArray[readArrayHP]; readArray[readArrayHP] = null; ++readArrayHP; --readCount; return e; } /** * switch condition: read queue is empty && write queue is not empty * * Notice:This function can only be invoked after readLock is grabbed,or may * cause dead lock * * @param timeout * @param isInfinite * : whether need to wait forever until some other thread awake * it * @return * @throws InterruptedException */ private long queueSwap(long timeout, boolean isInfinite) throws InterruptedException { writeLock.lock(); try { if (writeCount <= 0) { // logger.debug("Write Count:" + writeCount // + ", Write Queue is empty, do not switch!"); try { // logger.debug("Queue is empty, need wait...."); if (isInfinite && timeout <= 0) { awake.await(); return -1; } else if (timeout > 0) { return awake.awaitNanos(timeout); } else { return 0; } } catch (InterruptedException ie) { awake.signal(); throw ie; } } else { E[] tmpArray = readArray; readArray = writeArray; writeArray = tmpArray; readCount = writeCount; readArrayHP = 0; readArrayTP = writeArrayTP; writeCount = 0; writeArrayHP = readArrayHP; writeArrayTP = 0; notFull.signal(); // logger.debug("Queue switch successfully!"); return 0; } } finally { writeLock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null) { throw new NullPointerException(); } long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } writeLock.lockInterruptibly(); try { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (writeCount < writeArray.length) { insert(e); if (writeCount == 1) { awake.signal(); } return true; } // Time out if (nanoTime <= 0) { // logger.debug("offer wait time out!"); return false; } // keep waiting try { // logger.debug("Queue is full, need wait...."); nanoTime = notFull.awaitNanos(nanoTime); } catch (InterruptedException ie) { notFull.signal(); throw ie; } } } finally { writeLock.unlock(); } return false; } // 取 @Override public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanoTime = 0; if (timeout > 0) { nanoTime = unit.toNanos(timeout); } readLock.lockInterruptibly(); try { if (nanoTime > 0) { for (int i = 0; i < DEFAULT_MAX_COUNT; i++) { if (readCount > 0) { return extract(); } if (nanoTime <= 0) { // logger.debug("poll time out!"); return null; } nanoTime = queueSwap(nanoTime, false); } } else { if (readCount > 0) { return extract(); } queueSwap(nanoTime, false); if (readCount > 0) { return extract(); } } } finally { readLock.unlock(); } return null; } // 等待500毫秒 @Override public E poll() { E ret = null; try { ret = poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e) { ret = null; } return ret; } // 查看 @Override public E peek() { E e = null; readLock.lock(); try { if (readCount > 0) { e = readArray[readArrayHP]; } } finally { readLock.unlock(); } return e; } // 默认500毫秒 @Override public boolean offer(E e) { boolean ret = false; try { ret = offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } catch (Exception e2) { ret = false; } return ret; } @Override public void put(E e) throws InterruptedException { // never need to // block offer(e, DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public E take() throws InterruptedException { return poll(DEFAULT_MAX_TIMEOUT, TimeUnit.MILLISECONDS); } @Override public int remainingCapacity() { return this.capacity; } @Override public int drainTo(Collection<? super E> c) { return 0; } @Override public int drainTo(Collection<? super E> c, int maxElements) { return 0; } @Override public Iterator<E> iterator() { return null; } // 当前读队列中还有多少个 @Override public int size() { int size = 0; readLock.lock(); try { size = readCount; } finally { readLock.unlock(); } return size; } /** * 当前已写入的队列大小 * */ public int WriteSize() { int size = 0; writeLock.lock(); try { size = writeCount; } finally { writeLock.unlock(); } return size; } public int unsafeReadSize() { return readCount; } public int unsafeWriteSize() { return writeCount; } public int capacity() { return capacity; } public String toMemString() { return "--read: " + readCount + "/" + capacity + "--write: " + writeCount + "/" + capacity; } // 清理 /* * public void clear() { readLock.lock(); writeLock.lock(); try { readCount * = 0; readArrayHP = 0; writeCount = 0; writeArrayTP = 0; * //logger.debug("Queue clear successfully!"); } finally { * writeLock.unlock(); readLock.unlock(); } } */ }