Java并发编程(6)- J.U.C组件拓展

J.U.C-FutureTask

在Java中一般通过继承Thread类或者实现Runnable接口这两种方式来创建线程,但是这两种方式都有个缺陷,就是不能在执行完成后获取执行的结果,因此Java 1.5之后提供了Callable和Future接口,通过它们就可以在任务执行完毕之后得到任务的执行结果。

而FutureTask则是J.U.C中的类,但不是AQS的子类,FutureTask是一个可删除的异步计算类。这个类提供了Future接口的的基本实现,使用相关方法启动和取消计算,查询计算是否完成,并检索计算结果。只有在计算完成时才能使用get方法检索结果;如果计算尚未完成,get方法将会阻塞。一旦计算完成,计算就不能重新启动或取消(除非使用runAndReset方法调用计算)。

Runnable与Callable以及Future接口对比:

Runnable是一个接口,在它里面只声明了一个run()方法。由于run()方法返回值为void类型,所以在执行完任务之后无法返回任何结果:

public interface Runnable {
    public abstract void run();
}

Callable接口也只声明了一个方法,这个方法叫做call()。Callable接口定义如下:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

可以看到Callable是个泛型接口,泛型V就是要call()方法返回的类型。Callable接口和Runnable接口很像,都可以被另外一个线程执行,但是正如前面所说的,Runnable不会返回数据也不能抛出异常。

Future也是一个接口,Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。说白了Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成以及获取执行结果。其中执行结果通过get方法获取,该方法会阻塞直到任务返回结果。Future接口的定义如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

cancel()方法用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。

isCanceled()方法用于判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。

isDone()方法用于判断任务是否已经完成,如果完成则返回true,否则返回false。需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。

get()方法用于获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常,如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。

get(long timeout,Timeunit unit)是带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。

综上,Future主要提供了三种功能:

  1. 判断任务是否完成;
  2. 能够中断任务;
  3. 能够获取任务执行结果。


因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。FutureTask的父类是RunnableFuture,而RunnableFuture则继承了Runnable和Future这两个接口。所以由此可知,FutureTask最终也属于是Callable类型的任务。如果往FutureTask的构造函数传入Runnable的话,也会被转换成Callable类型。

FutureTask继承图如下:

可以看到,FutureTask实现了RunnableFuture接口,则RunnableFuture接口继承了Runnable接口和Future接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算结果。

使用场景:

假设有一个很费时的逻辑需要计算,并且需要返回计算的结果,但这个结果又不是马上需要的。那么这时就可以使用FutureTask,用另外一个线程去进行计算,而当前线程在得到这个计算结果之前,就可以去执行其他的操作,等到需要这个结果时再通过Future得到即可。

FutureTask有两个构造器,支持传入Callable和Runnable类型,在使用 Runnable 时,需要多指定一个返回结果类型:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

使用示例

1.Future基本使用示例:

@Slf4j
public class FutureExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 使用lambda创建callable任务,使用Future接收任务执行的结果
        Future<String> future = executorService.submit(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);

            return "Done";
        });

        log.info("do something in main");
        Thread.sleep(1000);
        // 获取执行结果
        String result = future.get();
        log.info("result: {}", result);
        executorService.shutdown();
    }
}

2.FutureTask基本使用示例:

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
        // 构建FutureTask实例,使用lambda创建callable任务
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);

            return "Done";
        });

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(futureTask);

        log.info("do something in main");
        Thread.sleep(1000);
        // 获取执行结果
        String result = futureTask.get();
        log.info("result: {}", result);
        executorService.shutdown();
    }
}

从以上两个示例可以看到,Future和FutureTask的使用方式是很相似的,毕竟FutureTask就是Future的一个实现。


J.U.C-ForkJoin

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,其思想和map-reduce非常类似。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:

工作窃取算法:

Fork/Join框架主要采用的是工作窃取(work-stealing)算法,该算法是指某个线程从其他队列里窃取任务来执行。工作窃取的运行流程图如下:

那么为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

所以对于Fork/Join框架而言,当一个任务正在等待它使用join操作创建的子任务的结束时,执行这个任务的线程(工作线程)查找其他未被执行的任务并开始它的执行。通过这种方式,线程充分利用它们的运行时间,从而提高了应用程序的性能。

为实现这个目标,Fork/Join框架执行的任务有以下局限性:

  • 任务只能使用fork()join()操作,作为同步机制。如果使用其他同步机制,工作线程不能执行其他任务,当它们在同步操作时。比如,在Fork/Join框架中,你使任务进入睡眠,那么在这睡眠期间内,正在执行这个任务的工作线程将不会执行其他任务。
  • 任务不应该执行I/O操作,如读或写数据文件。
  • 任务不能抛出检查异常,它必须包括必要的代码来处理它们。

Fork/Join框架的核心主要是以下两个类:

  • ForkJoinPool:它实现ExecutorService接口和work-stealing算法。它管理工作线程和提供关于任务的状态和它们执行的信息。
  • ForkJoinTask: 它是将在ForkJoinPool中执行的任务的基类。它提供在任务中执行fork()join()操作的机制,并且这两个方法控制任务的状态。通常, 为了实现你的Fork/Join任务,你将实现两个子类的子类的类:RecursiveAction对于没有返回结果的任务和RecursiveTask 对于返回结果的任务。

Fork/Join使用示例,完成1+2+3+4...+n的计算,代码如下:

package org.zero.concurrency.demo.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

/**
 * @program: concurrency-demo
 * @description: ForkJoin 使用示例
 * @author: 01
 * @create: 2018-10-19 20:12
 **/
@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    private ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任务足够小就直接计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4...+100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //执行一个任务
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

J.U.C-BlockingQueue

在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题,从名字也可以知道它是线程安全的。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。

首先,最基本的来说, BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。所以 BlockingQueue 主要应用于生产者消费者场景。

BlockingQueue 是一个接口,继承自 Queue,所以其实现类也可以作为 Queue 的实现来使用,而 Queue 又继承自 Collection 接口。

BlockingQueue 对插入操作、移除操作、获取元素操作提供了四种不同的方法用于不同的场景中使用,总结如下表:

- Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Insert remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

说明:

1、Throws Exceptions :如果不能立即执行就抛出异常
2、Special Value:如果不能立即执行就返回一个特殊的值(null 或 true/false,取决于具体的操作)
3、Blocks:如果不能立即执行就阻塞等待此操作,直到这个操作成功
4、Times Out:如果不能立即执行就阻塞一段时间,直到成功或者超时指定时间

BlockingQueue 的实现类:

ArrayBlockingQueue:它是一个有界的阻塞队列,内部实现是数组,需在初始化时指定容量大小,一旦指定大小就不能再变。采用FIFO方式存储元素:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;
    ...
}            

DelayQueue:阻塞内部元素,DelayQueue内部元素必须实现Delayed接口,Delayed接口又继承了Comparable接口,原因在于DelayQueue内部元素需要排序,一般情况下按元素过期时间优先级排序:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

DalayQueue内部采用PriorityQueue与ReentrantLock实现:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    ...
}

LinkedBlockingQueue:使用独占锁实现的阻塞队列,大小配置可选,如果初始化时指定了大小,那么它就是有边界的。不指定就无边界(最大整型值)。内部实现是链表,采用FIFO形式保存数据。

public LinkedBlockingQueue() {
    // 不指定大小,无边界采用默认值,最大整型值
    this(Integer.MAX_VALUE);
}

PriorityBlockingQueue:带优先级的×××阻塞队列,无边界队列,允许插入null。插入的对象必须实现Comparator接口,队列优先级的排序规则就是按照我们对Comparable接口的实现来指定的。我们可以从PriorityBlockingQueue中获取一个迭代器,但这个迭代器并不保证能按照优先级的顺序进行迭代:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    ...

    public boolean add(E e) {
        return offer(e);
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] es;
        while ((n = size) >= (cap = (es = queue).length))
            tryGrow(es, cap);
        try {
            //必须实现Comparator接口
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftUpComparable(n, e, es);
            else
                siftUpUsingComparator(n, e, es, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    ...
}        

SynchronousQueue:同步阻塞队列,只能插入一个元素,×××非缓存队列,不存储元素。其内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,当然遍历这个队列的操作也是不允许的:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    ...
}    

参考:

http://www.importnew.com/28053.html

原文地址:http://blog.51cto.com/zero01/2306673

时间: 2024-11-08 22:14:18

Java并发编程(6)- J.U.C组件拓展的相关文章

JAVA并发编程J.U.C学习总结

前言 学习了一段时间J.U.C,打算做个小结,个人感觉总结还是非常重要,要不然总感觉知识点零零散散的. 有错误也欢迎指正,大家共同进步: 另外,转载请注明链接,写篇文章不容易啊,http://www.cnblogs.com/chenpi/p/5614290.html 本文目录如下,基本上涵盖了J.U.C的主要内容: JSR 166及J.U.C Executor框架(线程池. Callable .Future) AbstractQueuedSynchronizer(AQS框架) Locks & C

【Java并发编程实战】—–“J.U.C”:CountDownlatch

上篇博文([Java并发编程实战]-–"J.U.C":CyclicBarrier)LZ介绍了CyclicBarrier.CyclicBarrier所描述的是"允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务".而CountDownlatch和它也有一点点相似之处:CountDownlatch所描述的是"在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待".在JDK API中是这样阐述的: 用给定的计数 初始化 Co

【Java并发编程实战】—–“J.U.C”:ReentrantReadWriteLock

ReentrantLock实现了标准的互斥操作,也就是说在某一时刻只有有一个线程持有锁.ReentrantLock采用这种独占的保守锁直接,在一定程度上减低了吞吐量.在这种情况下任何的"读/读"."读/写"."写/写"操作都不能同时发生.然而在实际的场景中我们就会遇到这种情况:有些资源并发的访问中,它大部分时间都是执行读操作,写操作比较少,但是读操作并不影响数据的一致性,如果在进行读操作时采用独占的锁机制,这样势必会大大降低吞吐量.所以如果能够做

【Java并发编程实战】—–“J.U.C”:Semaphore

信号量Semaphore是一个控制访问多个共享资源的计数器,它本质上是一个"共享锁". Java并发提供了两种加锁模式:共享锁和独占锁.前面LZ介绍的ReentrantLock就是独占锁.对于独占锁而言,它每次只能有一个线程持有,而共享锁则不同,它允许多个线程并行持有锁,并发访问共享资源. 独占锁它所采用的是一种悲观的加锁策略,  对于写而言为了避免冲突独占是必须的,但是对于读就没有必要了,因为它不会影响数据的一致性.如果某个只读线程获取独占锁,则其他读线程都只能等待了,这种情况下就限

【Java并发编程实战】—–“J.U.C”:CyclicBarrier

在上篇博客([Java并发编程实战]-–"J.U.C":Semaphore)中,LZ介绍了Semaphore,下面LZ介绍CyclicBarrier.在JDK API中是这么介绍的: 一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点 (common barrier point).在涉及一组固定大小的线程的程序中,这些线程必须不时地互相等待,此时 CyclicBarrier 很有用.因为该 barrier 在释放等待线程后可以重用,所以称它为循环 的 barrier. Cy

【Java并发编程实战】-----“J.U.C”:CLH队列锁

在前面介绍的几篇博客中总是提到CLH队列,在AQS中CLH队列是维护一组线程的严格按照FIFO的队列.他能够确保无饥饿,严格的先来先服务的公平性.下图是CLH队列节点的示意图: 在CLH队列的节点QNode中包含有一个locked的字段,该字段表示该节点是否需要获取锁,为true表示需要获取,为false表示不需要获取.在CLH队列中,节点与节点之间并不是通过next指针来连接的而是通过myPred所指向节点的变化情况来影响的myNode的行为. 假设有两个线程(线程A.线程B).开始线程A需要

Java并发编程与高并发解决方案

第1章 课程准备   1-1 课程导学    1-2 并发编程初体验   1-3 并发与高并发基本概念第2章 并发基础   2-1 CPU多级缓存-缓存一致性   2-2 CPU多级缓存-乱序执行优化   2-3 JAVA内存模型    2-4 并发的优势与风险第3章 项目准备   3-1 案例环境初始化   3-2 案例准备工作    3-3 并发模拟-工具   3-4 并发模拟-代码 第4章 线程安全性   4-1 线程安全性-原子性-atomic-1    4-2 线程安全性-原子性-at

Java并发编程入门与高并发面试

第1章 课程准备(入门课程)课程目标:Java并发编程入门,适合没有并发编程经验的同学,本章首先从课程重点.特点.适合人群及学习收获几个方面对课程进行整体的介绍,然后会从一个实际的计数场景实现开始,给大家展示多线程并发时的线程不安全问题,让大家能够初体验到并发编程,之后会讲解并发和高并发的概念,并通过对比让大家明白到底什么是并发和...1-1 课前必读(不看会错过一个亿)1-2 课程导学1-3 并发编程初体验1-4 并发与高并发基本概念(选看)1-5 JAVA内存模型1-6 并发的优势与风险(选

6、Java并发编程:volatile关键字解析

Java并发编程:volatile关键字解析 volatile这个关键字可能很多朋友都听说过,或许也都用过.在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果.在Java 5之后,volatile关键字才得以重获生机. volatile关键字虽然从字面上理解起来比较简单,但是要用好不是一件容易的事情.由于volatile关键字是与Java的内存模型有关的,因此在讲述volatile关键之前,我们先来了解一下与内存模型相关的概念和知识,然后分析了volatil

Java并发编程学习路线

一年前由于工作需要从微软技术栈入坑Java,并陆陆续续做了一个Java后台项目,目前在搞Scala+Java混合的后台开发,一直觉得并发编程是所有后台工程师的基本功,所以也学习了小一年Java的并发工具,对整体的并发理解乃至分布式都有一定的提高,所以想和大家分享一下. 我的学习路线 首先说说学习路线,我一开始是直接上手JCIP(Java Concurrency in Practice),发现不是很好懂,把握不了那本书的主线,所以思索着从国内的作者开始先,所以便读了下方腾飞的<Java并发编程的艺