并行流处理数据

一个接受数字n作为参数,并返回从1到n的所有数字之和。

    public static int intSum(int n) {
        return Stream.iterate(1, i -> i + 1)
                .limit(n)
                .reduce(0, Integer::sum);
    }

这是一个简单的顺序流,如果n极大,那么显然单线程是有问题的,所以引入了并行概念。

    public static int parallelIntSum(int n) {
        return Stream.iterate(1, i -> i + 1)
                .limit(n)
                .parallel()
                .reduce(0, Integer::sum);
    }

图示

我们可以混合使用sequence和parallel吗?NO! 举一个错误的例子。

    public static int complexSum(int n) {
        return IntStream.iterate(0, i -> i + 1)
                .parallel()
                .filter(i -> i % 2 == 0)
                .sequential()
                .map(x -> x * 2)
                .parallel()
                .reduce(0, Integer::sum);
    }

其实这个例子只会执行最后一个parallel。

这种并发的资源如何分配呢?

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().available- Processors()得到的。

但是你可以通过java.util.concurrent.ForkJoinPool.common. parallelism来改变线程大小,如所:

  System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

这是一个全局设置,因此它将代码中所有的并行流。让ForkJoinPool的大小等于处理器数量是个不错的默认值, 非你有很好的理由,则我们建你不要修改它。

实际上前面举得并行例子真的快吗?不,因为有两个问题导致它很慢:

  iterate实际生成的是boxed对象,进行运算需要拆箱

  我们很难把iterate分成多个块来并行

使用更加针对的方法

    public static int parallelIntSum(int n) {
        return IntStream.rangeClosed(1, n)
                .parallel()
                .reduce(0, Integer::sum);
    }

其实这个不需要刻意记忆,就是我们日常的静态编程,我们对流也进行一次类型化就好了,实际上我觉得这个工作可以交给底层处理,但是可能是为了区分动态编程还是把控制交给使用者。

实际上,并行化是有开销的,首先是流的分割,再就是子流的线程分配,再就是数据的合并...最好是进行一定的测试确保性能,不建议直接使用。

对于并行流的使用要确保使用不能改变共享状态。

    public static class Accumulator {
        private long total = 0;
        public void add(long value) {
            total += value;
        }
    }
    public static long sideEffectParallelSum(long n) {
        Accumulator accumulator = new Accumulator();
        LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);
        return accumulator.total;
    }

这是完全错误的,因为total是所有操作Accumulator线程的共享属性,不是原子操作。

虽然顺序流到并行流现在看起来很简单,但是要确保是否有必要。比如需要考虑的一个因素,背后的数据结构支持流拆分的性能如何

fork/join框架


import static lambdasinaction.chap7.ParallelStreamsHarness.FORK_JOIN_POOL;
public class ForkJoinSumCalculator extends RecursiveTask<Long> {

    public static final long THRESHOLD = 10_000;

    private final long[] numbers;
    private final int start;
    private final int end;

    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinSumCalculator(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork();
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        Long rightResult = rightTask.compute();
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
        return FORK_JOIN_POOL.invoke(task);
    }
}

这是自行实现的一个框架,实际上来讲这个Pool我们是单例的,因为不适合使用者随意更改。这其实就是一个分治的线程级别的实现。

使用的注意地方,最关键的就是join会阻塞,所以你要保证所有的子任务都ok后再调用join,否则阻塞会导致性能问题。

实际上的线程是多个的,但是从算法图解上看,并不是一个理想的并行方案,因此有了工行窃取。

end

原文地址:https://www.cnblogs.com/CherryTab/p/12129378.html

时间: 2024-10-08 00:29:09

并行流处理数据的相关文章

java8新特性——并行流与顺序流

在我们开发过程中,我们都知道想要提高程序效率,我们可以启用多线程去并行处理,而java8中对数据处理也提供了它得并行方法,今天就来简单学习一下java8中得并行流与顺序流. 并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流. Java8中将并行流进行了优化,我们可以很容易的对数据进行并行操作.Stream API可以声明性地通过parallel()与scqucntial()在并行流与顺序流之间进行切换. 一.Fork-Join框架 Fork-Join框架:是java7提供

【Java8实战】使用并行流

除了顺序流外,Java 8中也可以对集合对象调用parallelStream方法或者对顺序流调用parallel方法来生成并行流.并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流.这样在使用流处理数据规模较大的集合对象时可以充分的利用多核CPU来提高处理效率.不过在一些情况下,并行流未必会比顺序流快,甚至会慢很多,所以了解如何高效的使用并行流也至关重要.此外,我们也可以调用流的sequential方法,将并行流转换为顺序流. 测试并行流的性能 举个例子,对1到1000的整

Java8新特性 - 并行流与串行流

并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流. Java8中将并行进行了优化,我们可以很容易的对数据进行并行操作.Stream API可以声明性地通过parallel()和sequential()在并行流和顺序流之间进行切换. 在了解并行流之前,我们首先需要了解Fork/Join框架 Fork/Join框架 Fork/Join框架:在必要的情况下,将一个大任务进行拆分(fork)成若干个小任务(拆到不可在拆时),在将一个个的小任务运算的结果进行汇总(join). Fo

RecursiveTask和RecursiveAction的使用 以及java 8 并行流和顺序流(转)

什么是Fork/Join框架        Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架. 我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果.比如计算1+2+..+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任

Stream parallel并行流的思考

1.并行流并不一定能提高效率,就和多线程并不能提高线程的效率一样 因为引入并行流会引起额外的开销,就像线程的频繁上下文切换会导致额外的性能开销一样,当数据在多个cpu中的处理时间小于内核之间的传输时间,使用并行流也就没有什么意义了. 这边用代码演示一下 public static long iterativeSum(long n) { long result = 0; for (long i = 1L; i <=n; i++) { result += i; } return result; }

for循环、并行流、串行流效率比较

User类 import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor; @Builder@Data@AllArgsConstructor@NoArgsConstructorpublic class User { int id; String name; int sex; int age; } 测试类 import org.springframe

使用对象流将数据以对象形式进行读写

1 import java.io.*; 2 3 public class Test { 4 5 public static void main(String[] args) { 6 7 File f = new File("a.txt"); 8 try { 9 if(!f.exists()) 10 f.createNewFile(); 11 } catch (IOException e1) { 12 e1.printStackTrace(); 13 } 14 15 try { 16 O

流式数据中的数学统计量计算

在科技飞速发展的今天,每天都会产生大量新数据,例如银行交易记录,卫星飞行记录,网页点击信息,用户日志等.为了充分利用这些数据,我们需要对数据进行分析.在数据分析领域,很重要的一块内容是流式数据分析.流式数据,也即数据是实时到达的,无法一次性获得所有数据.通常情况下我们需要对其进行分批处理或者以滑动窗口的形式进行处理.分批处理也即每次处理的数据之间没有交集,此时需要考虑的问题是吞吐量和批处理的大小.滑动窗口计算表示处理的数据每次向前移N个单位,N小于要处理数据的长度.例如,在语音识别中,每个包处理

IO流--字符流写数据

IO流是用来处理设备之间的数据传输的,诸如:文件的复制,上传下载文件 Java中的流可以从不同的角度进行分类: - 按照流的方向不同:分为输入流和输出流. - 按照处理数据单位的不同:分为字节流和字符流. - 按照功能不同:分为节点流和处理流 要区分字符流和字节流,我们可以从类名来区分 类名中包含Reader:字符流  输入流 类名中包含Writer:字符流  输出流 类名中包含Input:字节流  输入流 类名中包含Output:字节流  输出流 包含Stream:字节流 今天着重来看下字符流