Java_并发线程_CompletionService

1.CompletionService源码分析

CompletionService内部实现还是维护了一个可阻塞的队列,通过代理设计模式,从而操作队列。

    /**
     * Creates an ExecutorCompletionService using the supplied
     * executor for base task execution and a
     * {@link LinkedBlockingQueue} as a completion queue.
     *
     * @param executor the executor to use
     * @throws NullPointerException if executor is {@code null}
     */
    public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>(); //新建一个完成队列
    }
	//通过submit提交Callable任务对象
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));//线程池执行task对象
        return f;
    }
    /**
     * FutureTask extension to enqueue upon completion
     */
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() {
        	completionQueue.add(task); //执行玩后将task返回对象放置于完成队列
        }
        private final Future<V> task;
    }
	//通过take方法取得Future对象
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

2.实例

	public static void main(String[] args) {

		ExecutorService threadPool = Executors.newFixedThreadPool(3);

		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool);
		//将任务添加至threadPool池中,但是只分配3个Thread对象
		for (int i = 1; i <= 10; i++) {
			final int seq = i;
			completionService.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					Thread.sleep(new Random().nextInt(5000));
					return seq;
				}
			});
		}

		for (int i = 0; i < 10; i++) {
			try {
				//completionService.take(), 至于call方法执行完成,take阻塞采用数据
				//future.get() 阻塞, 只有当call执行完成,
				System.out.println(completionService.take().get());
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
	}
时间: 2024-10-10 00:46:33

Java_并发线程_CompletionService的相关文章

Java_并发线程_Condition

1.概述 使用Condition应在Lock的前提下,请先参见Java_并发线程_Lock.ReadWriteLock一文.在synchronized同步代码块中使用了obj的锁对象,然后通过obj.notify()和obj.wait()来配合处理多线程的问题.然而,同样lock和condition配合使用同样可以完成同样的功能,condition只有配合lock使用才有意义,只不过lock更加的灵活,使用的格式如下. //lock 与 Condition private static Reen

Java_并发线程_Semaphore、CountDownLatch、CyclicBarrier、Exchanger

1.Semaphore 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用.Java并发库Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可.比如在Windo

Java_并发线程_Futrue、FutureTask、Callable

1.Futrue public interface Future<V> //Future 表示异步计算的结果 ExecutorService threadPool = Executors.newSingleThreadExecutor(); Future<String> future = threadPool.submit(new Callable<String>() { public String call() throws Exception { Thread.sl

c++11 条件变量 生产者-消费者 并发线程

http://baptiste-wicht.com/posts/2012/04/c11-concurrency-tutorial-advanced-locking-and-condition-variables.html ? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 struc

Java 并发 线程同步

Java 并发 线程同步 @author ixenos 同步 1.异步线程本身包含了执行时需要的数据和方法,不需要外部提供的资源和方法,在执行时也不关心与其并发执行的其他线程的状态和行为 2.然而,大多数实际的多线程应用中,两个或两个以上的线程需要共享对同一数据的存取,这将产生同步问题(可见性和同步性的丢失) 比如两个线程同时执行指令account[to] += amount,这不是原子操作,可能被处理如下: a)将account[to]加载到寄存器 b)增加amount c)将结果写回acco

Java 并发 线程属性

Java 并发 线程属性 @author ixenos 线程优先级 1.每当线程调度器有机会选择新线程时,首先选择具有较高优先级的线程 2.默认情况下,一个线程继承它的父线程的优先级 当在一个运行的线程A里,创建另一个线程B的时候,那么A是父线程,B是子线程.当在一个运行的线程A里,创建线程B,然后又创建了线程C,这时候虽然B比C创建早,可是B并不是C的父线程,而A是B和C的父线程. 3.线程的优先级高度依赖于系统,当虚拟机依赖于宿主机平台的线程实现机制时,Java线程的优先级被映射到宿主机平台

Java 并发 线程的优先级

Java 并发 线程的优先级 @author ixenos 低优先级线程的执行时刻 1.在任意时刻,当有多个线程处于可运行状态时,运行系统总是挑选一个优先级最高的线程执行,只有当线程停止.退出或者由于某些原因不执行的时候,低优先级的线程才可能被执行 2.两个优先级相同的线程同时等待执行时,那么运行系统会以round-robin的方式选择一个线程执行(即轮询调度,以该算法所定的)(Java的优先级策略是抢占式调度!) 3.被选中的线程可因为一下原因退出,而给其他线程执行的机会: 1) 一个更高优先

控制每次线程池的并发线程的最大个数

[本人原创],欢迎交流和分形技术,转载请附上如下内容: 作者:itshare [转自]http://www.cnblogs.com/itshare/ 1. 实验目的:       使用线程池的时候,有时候需要考虑服务器的最大线程数目和程序最快执行所有业务逻辑的取舍.并非逻辑线程越多也好,而且新的逻辑线程必须会在线程池的等待队列中等待 ,直到线程池中工作的线程执行完毕,才会有系统线程取出等待队列中的逻辑线程,进行CPU运算. 2.  解决问题:     <a>如果不考虑服务器实际可支持的最大并行

使用CountDownLatch和CyclicBarrier处理并发线程

闲话不说,首先看一段代码: { IValueCallback remoteCallback = new IValueCallback.Stub() { <strong><span style="color:#ff0000;">(B)</span></strong> public void onReceiveValue(final Bundle value) throws RemoteException { synchronized (sy