JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue的差别

JDK7对JDK5中的J.U.C并发工具进行了增强,其中之一就是新增了TransferQueue。java并发相关的JSR规范,可以查看Doug Lea维护的blog。现在简单介绍下这个类的使用方式。

public interface TransferQueue<E> extends BlockingQueue<E>
{
    /**
     * Transfers the element to a waiting consumer immediately, if possible.
     *
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * otherwise returning {@code false} without enqueuing the element.
     *
     * @param e the element to transfer
     * @return {@code true} if the element was transferred, else
     *         {@code false}
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean tryTransfer(E e);

    /**
     * Transfers the element to a consumer, waiting if necessary to do so.
     *
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * else waits until the element is received by a consumer.
     *
     * @param e the element to transfer
     * @throws InterruptedException if interrupted while waiting,
     *         in which case the element is not left enqueued
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    void transfer(E e) throws InterruptedException;

    /**
     * Transfers the element to a consumer if it is possible to do so
     * before the timeout elapses.
     *
     * <p>More precisely, transfers the specified element immediately
     * if there exists a consumer already waiting to receive it (in
     * {@link #take} or timed {@link #poll(long,TimeUnit) poll}),
     * else waits until the element is received by a consumer,
     * returning {@code false} if the specified wait time elapses
     * before the element can be transferred.
     *
     * @param e the element to transfer
     * @param timeout how long to wait before giving up, in units of
     *        {@code unit}
     * @param unit a {@code TimeUnit} determining how to interpret the
     *        {@code timeout} parameter
     * @return {@code true} if successful, or {@code false} if
     *         the specified waiting time elapses before completion,
     *         in which case the element is not left enqueued
     * @throws InterruptedException if interrupted while waiting,
     *         in which case the element is not left enqueued
     * @throws ClassCastException if the class of the specified element
     *         prevents it from being added to this queue
     * @throws NullPointerException if the specified element is null
     * @throws IllegalArgumentException if some property of the specified
     *         element prevents it from being added to this queue
     */
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * Returns {@code true} if there is at least one consumer waiting
     * to receive an element via {@link #take} or
     * timed {@link #poll(long,TimeUnit) poll}.
     * The return value represents a momentary state of affairs.
     *
     * @return {@code true} if there is at least one waiting consumer
     */
    boolean hasWaitingConsumer();

    /**
     * Returns an estimate of the number of consumers waiting to
     * receive elements via {@link #take} or timed
     * {@link #poll(long,TimeUnit) poll}.  The return value is an
     * approximation of a momentary state of affairs, that may be
     * inaccurate if consumers have completed or given up waiting.
     * The value may be useful for monitoring and heuristics, but
     * not for synchronization control.  Implementations of this
     * method are likely to be noticeably slower than those for
     * {@link #hasWaitingConsumer}.
     *
     * @return the number of consumers waiting to receive elements
     */
    int getWaitingConsumerCount();
}

可以看到TransferQueue同时也是一个阻塞队列,它具备阻塞队列的所有特性,主要介绍下上面5个新增API的作用。

1.transfer(E e)若当前存在一个正在等待获取的消费者线程,即立刻将e移交之;否则将元素e插入到队列尾部,并且当前线程进入阻塞状态,直到有消费者线程取走该元素。

public class TransferQueueDemo {

	private static TransferQueue<String> queue = new LinkedTransferQueue<String>();

	public static void main(String[] args) throws Exception {

		new Productor(1).start();

		Thread.sleep(100);

		System.out.println("over.size=" + queue.size());
	}

	static class Productor extends Thread {
		private int id;

		public Productor(int id) {
			this.id = id;
		}

		@Override
		public void run() {
			try {
				String result = "id=" + this.id;
				System.out.println("begin to produce." + result);
				queue.transfer(result);
				System.out.println("success to produce." + result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

可以看到生产者线程会阻塞,因为调用transfer()的时候并没有消费者在等待获取数据。队列长度变成了1,说明元素e没有移交成功的时候,会被插入到阻塞队列的尾部。

2.tryTransfer(E e)若当前存在一个正在等待获取的消费者线程,则该方法会即刻转移e,并返回true;若不存在则返回false,但是并不会将e插入到队列中。这个方法不会阻塞当前线程,要么快速返回true,要么快速返回false。

3.hasWaitingConsumer()和getWaitingConsumerCount()用来判断当前正在等待消费的消费者线程个数。

4.tryTransfer(E e, long timeout, TimeUnit unit) 若当前存在一个正在等待获取的消费者线程,会立即传输给它; 否则将元素e插入到队列尾部,并且等待被消费者线程获取消费掉。若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素从队列中移除。

public class TransferQueueDemo {

	private static TransferQueue<String> queue = new LinkedTransferQueue<String>();

	public static void main(String[] args) throws Exception {

		new Productor(1).start();

		Thread.sleep(100);

		System.out.println("over.size=" + queue.size());//1

		Thread.sleep(1500);

		System.out.println("over.size=" + queue.size());//0
	}

	static class Productor extends Thread {
		private int id;

		public Productor(int id) {
			this.id = id;
		}

		@Override
		public void run() {
			try {
				String result = "id=" + this.id;
				System.out.println("begin to produce." + result);
				queue.tryTransfer(result, 1, TimeUnit.SECONDS);
				System.out.println("success to produce." + result);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

第一次还没到指定的时间,元素被插入到队列中了,所有队列长度是1;第二次指定的时间片耗尽,元素从队列中移除了,所以队列长度是0。

这篇文章中讲了SynchronousQueue的使用方式,可以看到TransferQueue也具有SynchronousQueue的所有功能,但是TransferQueue的功能更强大。这篇文章中提到了这2个API的区别:

TransferQueue is more generic and useful than SynchronousQueue however as it allows you to flexibly decide whether to use normal BlockingQueue
semantics or a guaranteed hand-off. In the case where items are already in the queue, calling transfer will guarantee that all existing queue
items will be processed before the transferred item.

SynchronousQueue implementation uses dual queues (for waiting producers and waiting consumers) and protects both queues with a single lock. The
 LinkedTransferQueue implementation uses CAS operations to form a nonblocking implementation and that is at the heart of avoiding serialization
 bottlenecks.
时间: 2024-10-02 06:54:24

JDK7中TransferQueue的使用以及TransferQueue与SynchronousQueue的差别的相关文章

菜鸟译文(三)——JDK6和JDK7中substring()方法的对比

substring(int beginIndex, int endIndex)方法在JDK6和JDK7中是不同的.了解他们的区别可以让我们更好的使用这个方法.方便起见,以下用substring() 代替 substring(int beginIndex, int endIndex). 1. substring()做了什么? substring(int beginIndex, int endIndex)方法返回一个以beginIndex开头,以endIndex-1结尾的String对象. Stri

jdk7 中Collections.sort 异常

Collections.sort 异常 java.lang.IllegalArgumentException: Comparison method violates its general contract! at java.util.TimSort.mergeHi(TimSort.java:868) at java.util.TimSort.mergeAt(TimSort.java:485) at java.util.TimSort.mergeForceCollapse(TimSort.jav

CSS优先级问题以及jQuery中的.eq()遍历方法和:eq()选择器的差别

在写一个TAB选项卡的时候遇到几个有意思的问题,记录下来 先把代码贴出来 <!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml" xml:lang="

jQuery中的append()和prepend(),after()和before()的差别

jQuery中的append()和preappend(),after()和before()的差别 append()和prepend() 如果 <div class='a'> //<---you want div c to append in this <div class='b'>b</div> </div> 使用 $('.a').append($('.c')); 则会这样: <div class='a'> //<---you wan

jQuery中focusin()和focus()、find()和children()的差别

jQuery中focus()和focusin().focus()和children()的差别 focus()和focusin() focus()和focusin()的差别在于focusin()支持事件的冒泡.以下举例说明: <!doctype html> <html lang="en"> <head> <meta charset="utf-8"> <title>focusin demo</title&

jquery中选择器input:hidden和input[type=hidden]的差别

jquery中选择器input:hidden和input[type=hidden]的差别 关于选择器:hidden的申明, 在jquery申明文档中是如许说的:匹配所有不成见元素,或者type为hidden的元素.而[type=hidden]是查找所有type属性便是hidden的元素.两者是有雷同之处和不合之处的. :hidden匹配所有不成见元素,或者type为hidden的元素,所有样式display便是none的元素和子元素以及type="hidden"的表单位素都在查找的成果

JDK7中匿名内部类中使用局部变量要加final,JDK8中不需要,但jdk会默认加上final

今天看书的时候看到了局部内部类,书上说局部内部类可以访问局部变量,但是必须是final的.因为局部变量在方法调用之后就消失了,使用final声明的话该局部变量会存入堆中,和内部类有一样的声明周期.但是我写了一个局部内部类,竟然可以访问非final的局部变量,请问这是什么回事呢.ps:我的jdk是8 难道和这个有关系? public class jubuneibulei { public void p(int a, int b){ class te{ void print(){ System.ou

JDK6和JDK7中String类下的substring方法的代码对比(仅贴代码,未详述)

返回主页 回到顶端 jdk1.6版本String.java文件中关于substring的代码及描述 1 /** 2 * Returns a new string that is a substring of this string. The 3 * substring begins with the character at the specified index and 4 * extends to the end of this string. <p> 5 * Examples: 6 *

借助JDK7中WatchService实现文件变更监听

import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.support.PropertiesLoaderUtils; import java.io.IOException; import java.nio.file.*; import java.util.Objects; import java.util.Properties; /** * @author Created