1.前言
最近被问到一个问题,"我用java写了一个用到多线程的功能,但是线程数应该多少个比较好呢?"。这个问题以前听的版本有:"CPU核心数的2倍","和CPU核心数一样","CPU核心数加1"。但是因为一个“懒”字将这个问号埋在了心底。为了给这个故事画上一个完美的句号,所以就有了这篇博文。
2.线程定义
线程(英语:thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。线程是独立调度和分派的基本单位。
在多核或多CPU,或支持Hyper-threading的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单CPU单核的计算机上,使用多线程技术,也可以把进程中负责IO处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的workhorse线程执行密集计算,从而提高了程序的执行效率。
3.线程与处理器
我们讨论下用一个处理器(单核)的情况下多线程的作用。图1中假设这样一个场景:一个任务要处理一个数据块集合,其中这个任务只有一个线程A,1表示处理第一个数据块,2表示处理第二个数据块。在线程A处理每个数据块的过程中都有一段阻塞的时间。只有线程A工作部分处理器才被使用,也就是说没处理一个数据块,都有一部分计算资源被浪费了。
(图1)
为了充分利用处理器以及加快处理效率,我们引入多线程,如图2。其中共有三个线程A,B,C。当线程A处理第一个数据块遇到阻塞的时候,线程B得到处理器的使用权开始执行。线程B也遇到了阻塞,让出处理器。线程A运气比较好又一次得到处理器后继续执行...线程C在线程A第二次进入阻塞的时候得到处理器使用权执行自己的代码。
(图2)
对比图1和图2中在单处理器单核心的情况下使用多线程可以充分利用系统计算资源,缩短任务整体计算时间。
4.最优线程数计算的理论依据
在上面的图中也许你会有疑问,我这个只是任务中有阻塞部分的场景,阻塞部分和线程数的关系是什么?
Venkat Subramaniam 博士在《Programming Concurrency on the JVM》中提到关于最优线程数的计算,下面是原文:
The minimum number of threads is equal to the number of available cores. If all tasks are computation intensive, then this is all we need. Having more threads will actually hurt in this case because cores would be context switching between threads when there is still work to do. If tasks are IO intensive, then we should have more threads. We can compute the total number of threads we’d need as follows:
Number of threads = Number of Available Cores / (1 - Blocking Coefficient)
To determine the number of threads, we need to know two things:
? The number of available cores
? The blocking coefficient of tasks
The first one is easy to determine; we can look up that information, even at
runtime, as we saw earlier. It takes a bit of effort to determine the blocking
coefficient. We can try to guess it, or we can use profiling tools or the java.
lang.management API to determine the amount of time a thread spends on
system/IO operations vs. on CPU-intensive tasks.
Blocking Coefficient(阻塞系数) = 阻塞时间/(阻塞时间+使用CPU的时间)
5.实践出真知
对知识的吸收不应只停留在书本上,下面通过下面的示例代码来验证一下这个公式。下面先是代码,然后是实验结果的统计。
Stage 类
package com.bob.testjava.thread; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * Created by zhangmingbo on 1/7/17. */ public class Stage { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); ExecutorService threadPool = null; try { threadPool = Executors.newFixedThreadPool(Constants.THREAD_SIZE); CountDownLatch countDownLatch = new CountDownLatch(Constants.SIZE); for (int i = 0; i < Constants.THREAD_SIZE; i++) { threadPool.execute(new Worker(countDownLatch)); } countDownLatch.await(); } catch (Exception e) { e.printStackTrace(); } finally { long endTime = System.currentTimeMillis(); int cpuCoreNum = Runtime.getRuntime().availableProcessors(); System.out.println("Number of CPU cores :" + cpuCoreNum); System.out.println("Work time (millisecond) :" + Constants.WORK_TIME_MILLISECOND); System.out.println("Block time (millisecond) :" + Constants.BLOCK_TIME_MILLISECOND); Double blockCoefficient = (Constants.BLOCK_TIME_MILLISECOND * 1d) / (Constants.WORK_TIME_MILLISECOND + Constants.BLOCK_TIME_MILLISECOND); System.out.println("Block coefficient :" + blockCoefficient); int optimalThreadNum = new Double(cpuCoreNum / (1 - blockCoefficient)).intValue(); System.out.println("Optimal thread number in theory:" + optimalThreadNum); System.out.println("Number of Task :" + Constants.SIZE); System.out.println("Number of Thread :" + Constants.THREAD_SIZE); System.out.println("Cost Time:" + (endTime - startTime)); try { threadPool.shutdownNow(); } catch (Exception e) { } } } }
Worker 类
package com.bob.testjava.thread; import java.util.concurrent.CountDownLatch; /** * Created by zhangmingbo on 1/7/17. */ public class Worker implements Runnable { private CountDownLatch countDownLatch; public Worker(CountDownLatch countDownLatch) { this.countDownLatch = countDownLatch; } public void run() { try { while (countDownLatch.getCount() > 0) { //do work TestUtil.doWorkByTime(Constants.WORK_TIME_MILLISECOND); //do block operation TestUtil.doBlockOperation(Constants.BLOCK_TIME_MILLISECOND); countDownLatch.countDown(); } } catch (InterruptedException e) { } } }
TestUtil 类
package com.bob.testjava.thread; import java.util.concurrent.TimeUnit; /** * Created by zhangmingbo on 1/10/17. */ public class TestUtil { public static void doWorkByTime(long millisecond) { long startTime = System.currentTimeMillis(); while (true) { long endTime = System.currentTimeMillis(); long costTime = endTime - startTime; if (costTime >= millisecond) { return; } } } public static void doBlockOperation(long millisecond) throws InterruptedException { TimeUnit.MILLISECONDS.sleep(millisecond); } }
Constants 类
package com.bob.testjava.thread; /** * Created by zhangmingbo on 1/7/17. */ public class Constants { public static final int SIZE = 100; public static final int THREAD_SIZE = 8; public static final long BLOCK_TIME_MILLISECOND = 5; public static final int WORK_TIME_MILLISECOND = 5; }
上面是本次实验使用的代码,其中 Stage类作为场景类 main方法从这里执行,使用CountDownLatch来充当待处理数据的作用,它的大小决定有多少任务量,当任务做完后会打印出一些信息,如:CPU逻辑核心数、阻塞系数、任务量、任务总耗时等。这里创建了TestUtil类,分别提供类为了模拟占用CPU的操作以及阻塞的部分的方法。可以通过调节Constants类中的参数来统计本次执行的数据。Worker类是本次实验的线程类。
----------------------------------------------------------------------- 分割线 -------------------------------------------------------------------------------
实验结果数据:
实验结果和预期的不太一样。根据公式来说最优线程数应该是8,但是实验结果显示的是8的2倍甚至更多。也许是本身我电脑上还运行着其它的进程,或者我写的程序哪里不合理,这个如果哪位朋友能帮忙指正,不胜感激。
6.总结
寻找最优线程数的过程是快乐的,这快乐来自不断获得新知的喜悦。这篇博文并未完结,尚有谜团没有解开,先记录之:
谜团一:一台服务器中运行着多个进程,计算其中一个进程某个任务的最优线程数肯定要受到干扰,怎么办?
谜团二:如何准确得计算任务的阻塞系数?
7.参考资料
《Programming Concurrency on the JVM》
维基百科