为什么Future最大只有4个并发线程?
线程池中有多少个线程是由ExecutionContext决定的。如果使用的是默认的global,则只有4个并发线程。
import scala.concurrent.ExecutionContext.Implicits.global
默认的global ExecutionContext在哪里设置的4个并发线程?
global用的是ExecutionContextImpl,其中有这么一段代码:
def createExecutorService: ExecutorService = {
def getInt(name: String, f: String => Int): Int =
try f(System.getProperty(name)) catch { case e: Exception => Runtime.getRuntime.availableProcessors }
def range(floor: Int, desired: Int, ceiling: Int): Int =
if (ceiling < floor) range(ceiling, desired, floor) else scala.math.min(scala.math.max(desired, floor), ceiling)
val desiredParallelism = range(
getInt("scala.concurrent.context.minThreads", _.toInt),
getInt("scala.concurrent.context.numThreads", {
case null | "" => Runtime.getRuntime.availableProcessors
case s if s.charAt(0) == ‘x‘ => (Runtime.getRuntime.availableProcessors * s.substring(1).toDouble).ceil.toInt
case other => other.toInt
}),
getInt("scala.concurrent.context.maxThreads", _.toInt))
val threadFactory = new DefaultThreadFactory(daemonic = true)
try {
new ForkJoinPool(
desiredParallelism,
threadFactory,
uncaughtExceptionHandler,
true) // Async all the way baby
} catch {
庄家ForkJoinPool时设定了desiredParallelism。可以看到desiredParallelism函数得到并行度有多少是根据系统变量来的(注意getInt函数):
scala.concurrent.context.minThreads:最小并发线程数(Int)
scala.concurrent.context.numThreads:并发线程数,如果是Int,则就使用这个值;如果是String,并且以“x”开头,后面跟个Double(如“x1.5”),则其值为1.5 * Runtime.getRuntime.availableProcessors
scala.concurrent.context.maxThreads:最大并发线程数(Int)
如果这三个变量没有设置,则getInt会取Runtime.getRuntime.availableProcessors,即当前CPU的核数。所以,在我的电脑上只有4个并发线程运行Future.
怎么改变Future的并发线程数?
1. 从上面的代码分析可以很容易想到,如果仍使用global ExecutionContext,修改系统变量即可:
System.setProperty("scala.concurrent.context.minThreads", "8")
System.setProperty("scala.concurrent.context.maxThreads", "8")
2. 更好的方法是重写一个自己的ExecutionContext。
import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = new ExecutionContext {
val threadPool = Executors.newFixedThreadPool(1000);
def execute(runnable: Runnable) {
threadPool.submit(runnable)
}
def reportFailure(t: Throwable) {}
}
时间: 2025-01-17 12:55:06