java8新特性——并行流与顺序流

  在我们开发过程中,我们都知道想要提高程序效率,我们可以启用多线程去并行处理,而java8中对数据处理也提供了它得并行方法,今天就来简单学习一下java8中得并行流与顺序流。

  并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

  Java8中将并行流进行了优化,我们可以很容易的对数据进行并行操作。Stream API可以声明性地通过parallel()与scqucntial()在并行流与顺序流之间进行切换。

一、Fork—Join框架

  Fork—Join框架:是java7提供得一个用于执行任务得框架,就是在必要得情况下,将一个大任务,进行拆分(Fork)成若干个小任务(拆分到不能再拆分),再将一个个的小任务运算得结果进行join汇总。

  Fork—Join框架时ExecutorService接口得一种具体实现,目的是为了帮助更好地利用多处理器带来得好处。它是为那些能够被递归地拆分成子任务的工作类型量身设计的。起目的在于能够使用所有有可用的运算能力来提升你的应用的性能。

  Fork—Join框架会将任务分发给线程池中的工作线程。Fork—Join框架的独特之处在与它使用工作窃取(work-stealing)算法。完成自己的工作而处于空闲的工作线程能够从其他仍处于忙碌(busy)状态的工作线程中窃取等待任务执行,每个工作线程都有自己的工作队列,这是使用双端队列(deque)来实现的。当一个任务划分一个新线程时,它将自己推到deque的头部。当线程的任务队列为空,它将尝试从另一个线程的deque的尾部窃取另一个任务。

  下面,我们来写一个简单的实例来演示一下:

 1 /**
 2  * 要想使用Fark—Join,类必须继承RecursiveAction(无返回值) 或者
 3  * RecursiveTask(有返回值)
 4  * @author Wuyouxin
 5  *
 6  */
 7 public class ForkJoin extends RecursiveTask<Long>{
 8
 9     /**
10      * 序列化
11      */
12     private static final long serialVersionUID = -645248615909548422L;
13
14     private long start;
15     private long end;
16
17     public ForkJoin(long start, long end) {
18         this.start = start;
19         this.end = end;
20     }
21
22     private static final long THRESHOLD = 10000L;
23     /**
24      * 重写方法
25      */
26     @Override
27     protected Long compute() {
28         if (end - start <= THRESHOLD) {
29             long sum = 0;
30             for (long i = start; i < end; i++) {
31                 sum += i;
32             }
33             return sum;
34         } else {
35             long middle = (end -start)/2;
36             ForkJoin left = new ForkJoin(start, middle);
37             //拆分子任务,压入线程队列
38             left.fork();
39             ForkJoin right = new ForkJoin(middle, end);
40             right.fork();
41
42             //合并并返回
43             return left.join() + right.join();
44         }
45     }
46
47 }
 1     /**
 2      * 实现数的累加
 3      */
 4     @Test
 5     public void test1() {
 6         //开始时间
 7         Instant start = Instant.now();
 8
 9         //这里需要一个线程池的支持
10         ForkJoinPool pool = new ForkJoinPool();
11
12         ForkJoinTask<Long> task = new ForkJoin(0, 10000000000L);
13
14         long sum = pool.invoke(task);
15
16         //结束时间
17         Instant end = Instant.now();
18         System.out.println(Duration.between(start, end).getSeconds());
19     }

二、java8 并行流

  java8 中 对并行流做了优化,简化了许多,我们继续以累加来写个例子。

 1     /**
 2      * java8 并行流 parallel()
 3      */
 4     @Test
 5     public void test2() {
 6         //开始时间
 7         Instant start = Instant.now();
 8
 9         LongStream.rangeClosed(0, 10000000000L).parallel()
10         .reduce(0, Long :: sum);
11
12         //结束时间
13         Instant end = Instant.now();
14         System.out.println(Duration.between(start, end).getSeconds());
15     }

  java8 中不仅仅对代码进行了优化,而且效率也大大提升。

原文地址:https://www.cnblogs.com/wuyx/p/9098128.html

时间: 2024-10-07 21:31:58

java8新特性——并行流与顺序流的相关文章

Java8新特性 - 并行流与串行流

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流. Java8中将并行进行了优化,我们可以很容易的对数据进行并行操作.Stream API可以声明性地通过parallel()和sequential()在并行流和顺序流之间进行切换. 在了解并行流之前,我们首先需要了解Fork/Join框架 Fork/Join框架 Fork/Join框架:在必要的情况下,将一个大任务进行拆分(fork)成若干个小任务(拆到不可在拆时),在将一个个的小任务运算的结果进行汇总(join). Fo

Java8 新特性之流式数据处理

一. 流式处理简介 在我接触到java8流式处理的时候,我的第一感觉是流式处理让集合操作变得简洁了许多,通常我们需要多行代码才能完成的操作,借助于流式处理可以在一行中实现. 比如我们希望对一个包含整数的集合中筛选出所有的偶数,并将其封装成为一个新的List返回,那么在java8之前,我们需要通过如下代码实现: List<Integer> evens = new ArrayList<>(); for (final Integer num : nums) { if (num % 2 =

乐字节-Java8新特性-接口默认方法之Stream流(下)

接上一篇:<Java8新特性之stream>,下面继续接着讲Stream 5.流的中间操作 常见的流的中间操作,归为以下三大类:筛选和切片流操作.元素映射操作.元素排序操作: 操作 描述 筛选和切片 filter(T -> boolean):保留 boolean 为 true 的元素 limit(long n):返回前 n 个元素 skip(long n):去除前 n 个元素 distinct():去除重复元素,这个方法是通过类的 equals 方法来判断两个元素是否相等的 映射 map

RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流(转)

什么是Fork/Join框架        Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任

JDK9新特性 Reactive Stream 响应式流

JDK9新特性 Reactive Stream 响应式流  本篇主要讲解 JDK9特性 Reactive Stream 响应式流,介绍 Reactive Stream是什么 背压是什么,以及JDK9中提供的关于Reactive Stream的接口和 2个使用案例包括如何使用Processor.  1.Reactive Stream 概念  Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范.响应式流从2013年开始,作为提供非阻

Java8新特性浅析

欢迎阅读我编写的Java 8介绍.本教程将带领你一步一步地认识这门语言的新特性.通过简单明了的代码示例,你将会学习到如何使用默认接口方法,Lambda表达式,方法引用和重复注解.看完这篇教程后,你还将对最新推出的API有一定的了解,例如:流控制,函数式接口,map扩展和新的时间日期API等等. 允许在接口中有默认方法实现 Java 8 允许我们使用default关键字,为接口声明添加非抽象的方法实现.这个特性又被称为扩展方法.下面是我们的第一个例子: 1 2 3 4 5 6 7 interfac

JAVA8新特性之 Stream API

重要的 Stream API  : java.util.Stream.* Stream提供了一种高效且简易处理数据的方式 注意:1.Stream自己不会存储元素 2.对Stream进行操作不会改变数据源,相反,会产生一个执有结果的新Stream 3.Stream操作是延迟执行的.只有进行了终止操作才会产生结果 (并行流就是把内容分成多个数据块,并用不同的线程分别处理每个数据块.在Java8 后,Stream API可以声明性的通过parallel()与sequential()在并行流与串行流(顺

JAVA8 新特性简介

特点: 速度更快(HashMap加哈希表,ConcurrentHashMap使用CAS,内存结构无永久区.新增元数据区使用物理内存)代码更少(增加了新的语法 Lambda 表达式)强大的 Stream API便于并行(优化ForkJoin)//JDK8之前 需要自己实现计算过程,下面的省略部分代码public class ForkJoinCalculate extends RecursiveTask<Long> { private long start; private long end; @

Java8新特性 --Stream

一. 创建Stream 创建Stream方式一: 集合类的stream()或 parallelStream() java List<String> list = new ArrayList<>(); Stream<String> stream = list.stream(); 创建Stream方式二: 通过Arrays中的静态方法stream()获取 String[] strings = new String[10]; Stream<String> stre