并发编程 - 线程池 Thread Pool

本文介绍了Java中的几种线程池,及Executors构造线程池的用法。

Java 线程池 Thread Pool

构建新线程的开销较大。如果程序中需要创建大量很短生命期的线程,应该使用线程池,而不是将每个任务映射到一个单独线程。

线程池中包含很多准备运行的线程,每为线程池提供一个Runnable实例,就会有一个池中的线程调用run方法。当run方法退出时,这个线程不会死亡,而是留在池中准备为下一个请求 (Runnable实例) 提供服务。

执行器 Executors

执行器有许多静态工厂方法,用来构造线程池。
静态工厂方法返回类型为 ExecutorService

表格 Executors工厂方法
方法 描述
newCachedThreadPool 必要时创建新线程;空闲线程会保留60 s
newFixedThreadPool 池中包含固定数目的线程;空闲线程会一直保留
newWorkStealingPool 一种适合“fork-join”任务的线程池,其中复杂的任务会分解为更简单的任务,空闲线程会“密取”较简单的任务
newSingleThreadExecutor 只有一个线程的“池”,会顺序地执行所提交的任务
newScheduledThreadPool 用于调度执行的固定线程池
newSingleThreadScheduledExecutor 用于调度执行的单线程池

缓存线程池 newCachedThreadPool

构造一个线程池,会立即执行各个任务。如果有空闲线程可用,就使用现有空闲线程执行任务;如果没有可用的空闲线程,则创建一个新线程。

固定线程池 newFixedThreadPool

构造一个固定大小的线程池。如果提交的任务数多于空闲线程数,就把未得到服务的任务放到队列中。当其他任务完成以后再运行这些排队的任务。

单线程池 newSingleThreadExecutor

是一个退化了的大小为1的线程池:由一个线程顺序地执行所提交的任务(顺序执行)。

参考上面的 FixedThreadPool

使用线程池的小结

如果线程生存时间很短,或者大量时间都在阻塞,业务是轻量负载的,可以使用缓存线程池

为了获得最优的运行速度,业务是高负载的,可以令并发线程数等于处理内核数(查看CPU内核数方法见下文)。在这种情况下,就应该使用固定线程池,即并发线程总数有一个上限。

单线程池对于性能分析很有帮助。如临时使用一个单线程池替换其他线程池,就能测量不适用并发的情况下应用的运行速度会慢多少。

实际使用的时候推荐自己创建ThreadPoolExecutor (避免OOM)

Executors 返回线程池对象的弊端如下:

  • FixedThreadPoolSingleThreadExecutor: 允许请求的队列长度为 > Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  • CachedThreadPool ScheduledThreadPool : 允许创建的线程数量为 > Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

综合来讲,因为队列上限和创建线程上限,导致OOM发生。

除了避免 OOM 的原因之外,不推荐使用 Executors 提供的两种快捷的线程池的原因还有:

  1. 实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。
  2. 我们应该显示地给我们的线程池命名,这样有助于我们定位问题。
如何查看CPU内核数量

Linux: cat /proc/cpuinfo| grep "cpu cores"| uni

Mac: 查看物理内核 sysctl hw.physicalcpu

逻辑内核 sysctl hw.logicalcpu


ExecutorService分析

上文中的几种线程池都属于ThreadPoolExecutor类,实现了ExecutorService的接口。

图 ExecutorService 继承关系
executorService
1
2
3
4
5
Future<T> submit(Callable<T> task);
Future<?> submit(Runnable task);
Future<T> submit(Runnable task, T result);
void shutdown();
List<Runnable> shutdownNow();

调用submit时,会得到Future对象,可用来得到结果(get)或者取消任务(cancel)。

第二个submit返回的Future对象可以来调用isDone, cancel, isCancelled。但是get方法在完成的时候只是简单返回null。

第三个submit也生成一个Future,它的get方法在完成的时候返回指定的result对象。

shutdown 用于关闭线程池,被关闭的线程池不再接受新任务,当所有任务完成时,线程池中的线程死亡。

shutdownNow 会取消所有尚未开始的任务。


fork-join框架

有一些应用可能对处理器内核分别使用一个线程,以完成计算密集型任务,如图像或视频处理的应用。Java 7 中新引入了fork-join框架,专门用来支持这一类应用。

假设有一个处理任务,它可以很自然地分解为子任务,如下所示

1
2
3
4
5
6
7
8
if problemSize < threshold
solve problem directly
else
{
break problem into subproblems
recursively solve each subproblem
conbine the results
}

为了完成这种递归计算,需要提供扩展RecursiveTask<T>的类或者提供扩展RecursiveAction<T>的类。再覆盖compute方法来生成并调用子任务。

例如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Counter extends RecursiveTask<Integer> {

private static final int THRESHOLD = 10;
int to, from;
private DoublePredicate filter;
int[] values;

public Counter(int[] numbers, int from, int to, DoublePredicate filter) {
this.values = numbers;
this.from = from;
this.to = to;
this.filter = filter;
}

@Override
protected Integer compute() {
if (to - from < THRESHOLD) {
int count = 0;
for (int i = from; i < to; i++) {
if (filter.test(values[i])) count++;
}
return count;
} else {
int mid = (to + from) / 2;
Counter first = new Counter(values, from, mid, filter);
Counter second = new Counter(values, mid, to, filter);
invokeAll(first, second);
return first.join() + second.join();
}
}
}

在后台,fork-join采用了一种有效的智能方法来平衡可用线程的工作负载,这种方法称为工作密取(work-stealing)。一个工作线程将子任务压入双端队列的队头(只有一个线程可以访问队头,故可以不加锁),另一个工作线程空闲时,它会从双端队列的队尾“密取”一个任务。

⚠️警告: fork-join池是针对非阻塞工作负载优化的。如果向一个fork-join池增加很多阻塞任务,会使其无法有效工作。

推荐

解析ThreadPoolExecutor

参考

如何设置线程池参数?美团给出了一个让面试官虎躯一震的回答 作者: why技术


并发编程 - 线程池 Thread Pool
http://fuheihei.github.io/java-concurrency-programming/thread-pool/
作者
Haha monster
发布于
2022年5月25日
许可协议