[并发编程]并发编程第二篇:利用并发编程,实现计算大量数据的和

利用并发编程,实现计算大量数据的和

实现代码:

package tj.pojo.generate.main;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class ConcurrentCalculator {

    private ExecutorService exec;
    private int cpuCoreNumber;
    private List<Future<Long>> tasks = new ArrayList<Future<Long>>();

    // 内部类
    class SumCalculator implements Callable<Long> {
        private int[] numbers;
        private int start;
        private int end;

        public SumCalculator(final int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }
        @Override
        public Long call() throws Exception {
            Long sum = 0L;
            for (int i = start; i < end; i++) {
                sum += numbers[i];
            }
            System.out.println(String.format("%s~%s的和为%s", start, end, sum));
            return sum;
        }

    }

    public ConcurrentCalculator() {
        cpuCoreNumber = Runtime.getRuntime().availableProcessors();
        System.out.println("CPU核心数:" + cpuCoreNumber);
        exec = Executors.newFixedThreadPool(cpuCoreNumber);

    }

    public Long sum(final int[] numbers) {
        for (int i = 0; i < cpuCoreNumber; i++) {
            int increment = numbers.length / cpuCoreNumber + 1;
            int start = increment * i;
            int end = start + increment;
            if (end > numbers.length) {
                end = numbers.length;
            }
            SumCalculator task = new SumCalculator(numbers, start, end);

            FutureTask<Long> future = new FutureTask<Long>(task);
            tasks.add(future);
            System.out.println("添加一个任务,总任务数为:" + tasks.size());
            if (!exec.isShutdown()) {
                exec.submit(future);
                // ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。
                // exec.submit(task);
            }
        }
        System.out.println("任务分配完成,总任务数为:" + tasks.size());
        return getResult();
    }

    public Long getResult() {
        Long sums = 0L;
        for (Future<Long> task : tasks) {
            try {
                Long sum = task.get();
                sums += sum;
                System.out.println("当前总合计:" + sums);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return sums;
    }

    public void close() {
        exec.shutdown();
    }
}

其中,在代码的第62行~第64行,由于不了解ExecutoreService.submit(Runnable task)方法的功能。

同时FutureTask<Long> future和SumCalculator task都实现了Runnable接口,造成代码调用时,进程一直不结束。

传递了FutureTask<Long> future才正确执行。

                exec.submit(future);
                // ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。
                // exec.submit(task);

测试方法:

public static void test() {
        int[] numbers = new int[100];
        for (int i = 0; i < 100; i++) {
            numbers[i] = i + 1;
        }
        tj.pojo.generate.main.ConcurrentCalculator cc = new tj.pojo.generate.main.ConcurrentCalculator();
        Long sum = cc.sum(numbers);
        System.out.println("1~100的和为" + sum);
        cc.close();
    }

FutureTask的实现代码:

public class FutureTask<V> implements RunnableFuture<V>

FutureTask类实现了RunnableFuture接口,RunnableFuture接口的实现代码:

public interface RunnableFuture<V> extends Runnable, Future<V> {

void run();

}

可以看出RunnableFuture继承了Runnable接口和Future接口,而FutureTask实现了RunnableFuture接口。

所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。

事实上,FutureTask是Future接口的一个唯一实现类。

并发编程的两种实现形式:

1):使用Callable+Future获取执行结果

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");

        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}

2):使用Callable+FutureTask获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();

        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }

        System.out.println("主线程在执行任务");

        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
时间: 2024-10-11 22:20:01

[并发编程]并发编程第二篇:利用并发编程,实现计算大量数据的和的相关文章

Java并发包下锁学习第二篇Java并发基础框架-队列同步器介绍

Java并发包下锁学习第二篇队列同步器 还记得在第一篇文章中,讲到的locks包下的类结果图吗?如下图: ? 从图中,我们可以看到AbstractQueuedSynchronizer这个类很重要(在本文中,凯哥就用AQS来代替这个类).我们先来了解这个类.对这个类了解之后,学习后面的会更容易了. 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<Lock系列>教程的第一篇:<Java并发包下锁学习第二篇:队列同步器>. 本文主要内容:同步器介绍:同步器

【Windows编程】系列第二篇:Windows SDK创建基本控件

在Win32 SDK环境下,怎么来创建常用的那些基本控件呢?我们知道如果用MFC,简单的拖放即可完成大多数控件的创建,但是我们既然是用Windows SDK API编程,当然是从根上解决这个问题,实际上MFC的下层也是通过这些API完成的. 实际上控件也是窗口,只不过是被微软预先创建好的一种窗口类,比如button就是一个类名为“BUTTON”的类.既然控件实际上是窗口,当然也是用CreateWindow或者CreateWindowEx这个函数来完成的,第二个函数除了多一个扩展风格之外,其他完全

《Java并发编程实战》第二章 线程安全性 读书笔记

一.什么是线程安全性 编写线程安全的代码 核心在于要对状态访问操作进行管理. 共享,可变的状态的访问 - 前者表示多个线程访问, 后者声明周期内发生改变. 线程安全性 核心概念是正确性.某个类的行为与其规范完全一致. 多个线程同时操作共享的变量,造成线程安全性问题. * 编写线程安全性代码的三种方法: 不在线程之间共享该状态变量 将状态变量修改为不可变的变量 在访问状态变量时使用同步 Java同步机制工具: synchronized volatile类型变量 显示锁(Explicit Lock

高性能网络编程(一):单台服务器并发TCP连接数到底可以有多少

阅读(81374) | 评论(9)收藏16 淘帖1 赞3 JackJiang Lv.9    1 年前 | 前言 曾几何时我们还在寻求网络编程中C10K问题(有关C10K问题请见文章<The C10K problem(英文在线阅读.英文PDF版下载.中文译文)>)的解决方案,但是现在从硬件和操作系统支持来看单台服务器支持上万并发连接已经没有多少挑战性了. 我们先假设单台服务器最多只能支持万级并发连接,其实对绝大多数应用来说已经远远足够了,但是对于一些拥有很大用户基数的互联网公司,往往面临的并发

[书籍翻译] 《JavaScript并发编程》第七章 抽取并发逻辑

本文是我翻译<JavaScript Concurrency>书籍的第七章 抽取并发逻辑,该书主要以Promises.Generator.Web workers等技术来讲解JavaScript并发编程方面的实践. 完整书籍翻译地址:https://github.com/yzsunlei/javascript_concurrency_translation .由于能力有限,肯定存在翻译不清楚甚至翻译错误的地方,欢迎朋友们提issue指出,感谢. 到本书这里,我们已经在代码中明确地模拟了并发问题.使

Python编程实战:运用设计模式、并发和程序库创建高质量程序 阅读笔记

Python编程实战:运用设计模式.并发和程序库创建高质量程序 目录 1 创建型设计模式 2 结构型设计模式 3 行为型设计模式 4 高级并发 5 扩充Python 6 高级网络编程 7 Tkinter 8 OpenGL 创建型设计模式 抽象工厂 @classmethod def make_xxx(Class, ...) Builder with open(filename, "w", encoding='utf-8') as f: f.write(x) 多一层映射封装好吗? 序列与m

并发编程之J.U.C的第二篇

并发编程之J.U.C的第二篇 3.2 StampedLock 4. Semaphore Semaphore原理 5. CountdownLatch 6. CyclicBarrier 7.线程安全集合类概述 8. ConcurrentHashMap 3. JDK7 ConcurrentHashMap 3. 性能比较 10. ConcurrentLinkedQueue 11. CopyOnWriteArrayList 3.2 StampedLock 该类自JDK8加入,是为了进一步优化读性能,它的特

【Java并发编程二】同步容器和并发容器

一.同步容器 在Java中,同步容器包括两个部分,一个是vector和HashTable,查看vector.HashTable的实现代码,可以看到这些容器实现线程安全的方式就是将它们的状态封装起来,并在需要同步的方法上加上关键字synchornized. 另一个是Collections类中提供的静态工厂方法创建的同步包装类. 同步容器都是线程安全的.但是对于复合操作(迭代.缺少即加入.导航:根据一定的顺序寻找下一个元素),有时可能需要使用额外的客户端加锁进行保护.在一个同步容器中,复合操作是安全

【转】Java多线程编程(十)-并发编程原理(分布式环境中并发问题)

转载地址:http://blog.csdn.net/leicool_518/article/details/42268947 在分布式环境中,处理并发问题就没办法通过操作系统和JVM的工具来解决,那么在分布式环境中,可以采取一下策略和方式来处理: 避免并发 时间戳 串行化 数据库 行锁 统一触发途径 避免并发 在分布式环境中,如果存在并发问题,那么很难通过技术去解决,或者解决的代价很大,所以我们首先要想想是不是可以通过某些策略和业务设计来避免并发.比如通过合理的时间调度,避开共享资源的存取冲突.