Java并发和多线程2:3种方式实现数组求和

本篇演示3个数组求和的例子。

例子1:单线程
例子2:多线程,同步求和(如果没有计算完成,会阻塞)
例子3:多线程,异步求和(先累加已经完成的计算结果)

例子1-代码

package cn.fansunion.executorservice;

public class BasicCaculator {

	public static long sum(int[] numbers){
	    long sum = 0;
	    for(int i=0;i<numbers.length;i++){
	    	sum += numbers[i];
	    }
	    return sum;
	}
}

例子2-代码
ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

package cn.fansunion.executorservice;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
//并发计算数组的和,“同步”求和
public class ConcurrentCalculator {

	private ExecutorService exec;
	//这个地方,纯粹是“一厢情愿”,“并行执行”不受咱们控制,取决于操作系统的“态度”
	private int cpuCoreNumber;
	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();

	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0L;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}

	public ConcurrentCalculator() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length)
				end = numbers.length;
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			FutureTask<Long> task = new FutureTask<Long>(subCalc);
			tasks.add(task);
			if (!exec.isShutdown()) {
				exec.submit(task);
			}
		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 */
	public Long getResult() {
		Long result = 0l;
		for (Future<Long> task : tasks) {
			try {
				// 如果计算未完成则阻塞
				Long subSum = task.get();
				result += subSum;
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

例子3-代码
在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞。
如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。
生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果。
CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。
修改刚才的例子2,使用CompletionService:

package cn.fansunion.executorservice;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

//并发计算数组的和,“异步”求和
public class ConcurrentCalculatorAsync {

	private ExecutorService exec;
	private CompletionService<Long> completionService;
	//这个地方,纯粹是“一厢情愿”,“并行执行”不受咱们控制,取决于操作系统的“态度”
	private int cpuCoreNumber;

	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			return sum;
		}
	}

	public ConcurrentCalculatorAsync() {
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		completionService = new ExecutorCompletionService<Long>(exec);
	}

	public Long sum(final int[] numbers) {
		// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length){
				end = numbers.length;
			}
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			if (!exec.isShutdown()) {
				completionService.submit(subCalc);
			}

		}
		return getResult();
	}

	/**
	 * 迭代每个只任务,获得部分和,相加返回
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {
			try {
				Long subSum = completionService.take().get();
				result += subSum;
				System.out.println("subSum="+subSum+",result="+result);
			} catch (InterruptedException e) {
				e.printStackTrace();
			} catch (ExecutionException e) {
				e.printStackTrace();
			}
		}
		return result;
	}

	public void close() {
		exec.shutdown();
	}
}

运行代码

package cn.fansunion.executorservice;

import java.math.BigDecimal;
//数组求和3个Demo
public class ArraySumDemo {
	public static void main(String[] args) {
		int n = 200000000;
		int[] numbers = new int[n];
		for(int i=1;i<=n;i++){
			numbers[i-1]=i;
		}
		basic(numbers);

		long time = System.currentTimeMillis();
		concurrentCaculatorAsync(numbers);
		long endTime=System.currentTimeMillis();
		System.out.println("多核并行计算,异步相加:"+time(time,endTime));

		long time2 = System.currentTimeMillis();
		concurrentCaculator(numbers);
		long endTime2=System.currentTimeMillis();
		System.out.println("多核并行计算,同步相加:"+time(time2,endTime2));
	}

	private static void basic(int[] numbers) {
		long time1 = System.currentTimeMillis();
		long sum=BasicCaculator.sum(numbers);
		long endTime1 = System.currentTimeMillis();
		System.out.println("单线程:"+time(time1,endTime1));
		System.out.println("Sum:"+sum);
	}

	private static double time(long time, long endTime) {
		long costTime = endTime-time;
		BigDecimal bd = new BigDecimal(costTime);
		//本来想着,把毫秒转换成秒的,最后发现计算太快了
		BigDecimal unit = new BigDecimal(1L);
		BigDecimal s= bd.divide(unit,3);
		return s.doubleValue();
	}

	//并行计算,“同步”求和
	private static void concurrentCaculator(int[] numbers) {
		ConcurrentCalculator calc = new ConcurrentCalculator();
		Long sum = calc.sum(numbers);
		System.out.println(sum);
		calc.close();
	}

	//并行计算,“异步”求和
	private static void concurrentCaculatorAsync(int[] numbers) {
		ConcurrentCalculatorAsync calc = new ConcurrentCalculatorAsync();
		Long sum = calc.sum(numbers);
		System.out.println("Sum:"+sum);
		calc.close();
	}
}

控制台输出
单线程:93.0
Sum:20000000100000000
subSum=3750000175000002,result=3750000175000002
subSum=1250000075000001,result=5000000250000003
subSum=6250000275000003,result=11250000525000006
subSum=8749999574999994,result=20000000100000000
Sum:20000000100000000
多核并行计算,异步相加:786.0
20000000100000000
多核并行计算,同步相加:650.0

个人看法:3段代码的时间仅供参考,没有排除干扰因素。
总的来说,单线程执行更快一些,应该是由于“数组求和”本身,并不需要其它额外资源,不会阻塞。
而多线程,反而增加了“线程调度”的时间开销。

还可以看出,CPU计算还是非常快的。“200000000”2亿个整数相加,用了不到0.1秒的时间。

插曲
最开始看代码的时候,误解了。以为“根据CPU核心个数拆分任务”,这个时候的“多线程”就是“并行”了。
实际上,不一定,除了要看CPU的核数,还要看操作系统的分配。
// 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor
for (int i = 0; i < cpuCoreNumber; i++) {

}
最开始,我还在考虑“单线程”、“多核并行+多线程并发”、“单核+多线程并发”,等好几种情况来实现“数组求和”。
最后,感觉自己还是想多了。“并行”应该不受自己控制,只能控制是“单线程”或者“多线程”。

“java并发编程-Executor框架”这篇文章中的“例子:并行计算数组的和。” 这句话,误导了我,根本不能保证是“并行计算”。
友情提示:网络上的文章,仅供参考学习,需要自己的判断。

关于Java-多核-并行-多线程,我初步认为“多线程可以并行执行,但不受我们自己的控制,取决于操作系统”。

网友的一些看法

看法1:
   java线程可以在运行在多个cpu核上吗?
   
   我是一直都以为这个问题的答案是肯定的,也就是说可以运行在多核上。
但是有一天见到这样的一个理论,我就顿时毁三观了。

JVM在操作系统中是作为一个进程的,java所有的线程都运行自这个JVM进程中,
所以说java线程某个时间只可能运行在一个核上。

这个说法对我的打击太大了,我不能接受。于是就开始多方求证。网上搜索 和朋友一起讨论,
最终证实了java线程是可以运行在多核上的,为什么呢?
下面一句话将惊醒梦中人:
现代os都将线程作为最小调度单位,进程作为资源分配的最小单位。 在windows中进程是不活动的,
只是作为线程的容器。

也就是说,java中的所有线程确实在JVM进程中,但是CPU调度的是进程中的线程。

看法2:
   JAVA中的多线程能在多CPU机器上并行执行吗?注意,我说的不是并发执行哦 。
   我们用java写一个多线程程序,就启动了一个JVM进程,所以这些线程都是在这一个JVM进程之中的,我不知道同一时刻,能不能有多个CPU运行同一进程,进而并行执行这同一进程中的不同线程?一直很疑惑

你的思路是对的,CPU就是为了迎合操作系统的多线程从而提高系统的计算效率.但是具体分配任务到各个内核中去执行的并非JAVA与JVM而是操作系统.
也就是说,你所执行的多线程,可能会被分配到同一个CPU内核中运行.也可能非配到不同的cpu中运行.如果可以控制CPU的分配,那也应该是操作系统的api才能实现的了。

我用JAVA创建了一个线程,这时候有主线程和子线程都在运行,那意思双核CPU有可能在同一时刻点并行运行这两个线程咯?
我翻了好多JAVA的有关多线程的章节,似乎都没有说道多核CPU运行JAVA多线程,貌似都是已单核为例讲解的,所以我一直觉得可能都是并发的而不是并行的?

不是,你要将你的软件线程和计算机的CPU处理线程区分开呀.简单说,你是无法控制CPU对于任务的分配的.

更多代码示例
http://git.oschina.net/fansunion/Concurrent(逐步更新中)

参考资料:
java并发编程-Executor框架
http://www.iteye.com/topic/366591

java线程可以在运行在多个cpu核上吗?
http://blog.csdn.net/maosijunzi/article/details/42527553

JAVA中的多线程能在多CPU上并行执行吗?注意,我说的不是并发执行哦
http://zhidao.baidu.com/link?url=e11sEOSNFoLTfVyP-5FfpktIXEgbMQkbLAzvgh8mn4V16n_qQas89voj5gVhOEkho0jRA7fp_vbnElxKgeQCDrOxGkcu6xAWaUniqpcWg33

时间: 2024-10-23 09:56:24

Java并发和多线程2:3种方式实现数组求和的相关文章

JAVA中实现多线程的四种方式

Java中多线程实现方式主要有四种:1<继承Thread类.2<实现Runnable接口.3<实现Callable接口通过FutureTask包装器来创建Thread线程.4<使用ExecutorService.Callable.Future实现有返回结果的多线程. 其中前两种方式线程执行完后都没有返回值,后两种是带返回值的. 1.继承Thread类创建线程 Thread类本质上是实现了Runnable接口的一个实例,代表一个线程的实例.启动线程的唯一方法就是通过Thread类的s

Java中实现多线程的两种方式之间的区别

Java提供了线程类Thread来创建多线程的程序.其实,创建线程与创建普通的类的对象的操作是一样的,而线程就是Thread类或其子类的实例对象.每个Thread对象描述了一个单独的线程.要产生一个线程,有两种方法: ◆需要从Java.lang.Thread类派生一个新的线程类,重载它的run()方法:  ◆实现Runnalbe接口,重载Runnalbe接口中的run()方法. 为什么Java要提供两种方法来创建线程呢?它们都有哪些区别?相比而言,哪一种方法更好呢? 在Java中,类仅支持单继承

Java中实现多线程的两种方式

/**  * 使用Thread类模拟4个售票窗口共同卖100张火车票的程序  *   * 没有共享数据,每个线程各卖100张火车票  *   * @author jiqinlin  * */public class ThreadTest {    public static void main(String[] args){        new MyThread().start();        new MyThread().start();        new MyThread().st

java 实现多线程的两种方式

一.问题引入 说到这两个方法就不得不说多线程,说到多线程就不得不提实现多线程的两种方式继承Thread类和实现Runable接口,下面先看这两种方式的区别. 二. Java中实现多线程的两种方式 1.  继承Thread类 /** * 使用Thread类模拟4个售票窗口共同卖100张火车票的程序,实际上是各卖100张 */ public class ThreadTest { public static void main(String[] args){ new MyThread().start(

java实现多线程的三种方式

java中实现多线程的方法有两种:继承Thread类和实现runnable接口 1.继承Thread类,重写父类run()方法   public class thread1 extends Thread {           public void run() {                 for (int i = 0; i < 10000; i++) {                         System.out.println("我是线程"+this.get

Java实现多线程的两种方式

实现多线程的两种方式: 方式1: 继承Thread类 A: 自定义MyThread类继承Thread类 B: 在MyThread类中重写run() C: 创建MyThread类的对象 D: 启动线程对象. 问题: a. 为什么要重写run方法? run()方法里封装的是被线程执行的代码 b. 启动线程对象用的是哪个方法? start()方法 c. run()和start()方法的区别? 直接调用run方法只是普通的方法调用 调用start方法先会启动线程,再由jvm调用run()方法 方式2:

IOS 创建和使用多线程的6种方式

非原创  文字来自:http://www.cnblogs.com/yuanjianguo2012/p/3725480.html   纯属复制 进程和线程概念: 一个程序包含一个以上的进程,而一个进程又可以包含一个以上的线程,每一个进程都有自己独立的内存空间,相应的一个进程中的所有线程都共享该内存空间. 进程:是一个具有一定独立功能的程序关于某个数据集合的一次运行活动.它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的资 源分配单元,也是基本的执行单元.独立的地址空间使得不同的进程

java基础知识回顾之java Thread类学习(三)--java线程实现常见的两种方式实现好处:

总结:实现Runnable接口比继承Thread类更有优势: 1.因为java只能单继承,实现Runnable接口可以避免单继承的局限性 2.继承Thread类,多个线程不能处理或者共享同一个资源,但是实现Runnable接口可以处理同一个资源. 下面我们做个测试:验证下.车站的售票系统售票的例子,车站的各个售票口相当于各个线程,我们先使用第一种方法几继承Thread类的方式实现: 代码如下: package com.lp.ecjtu.Thread; /** * * @author Admini

线程的状态以及创建多线程的三种方式

首先了解一下线程的五种状态: 新建状态: 新建状态是指new之后,即新创建了一个线程的时候,此时并未运行任何线程方法体内的程序代码. 就绪状态: 简单来说就是指程序调用了start()之后,线程就得到了启动,代表线程进入了就绪状态,但是此时并不代表它会立刻去执行run()方法体内的程序代码,而是随时等待cpu的调度. 运行状态: 获得cpu的时间后,调用run()方法,进入运行状态. 阻塞状态: 由于某种原因放弃了cpu的会用权力,暂时停止运行,等待再次被调用. 死亡状态: 线程正常执行完毕,或