ThreadPoolExecutor详解

最近在准备秋招,复习线程池相关知识点的时候看到了几个比较有意思的面试题:

  • 线程池执行任务的时候,不显示的进行异常捕获,任务执行遇到异常会怎么样?线程会崩溃吗?
  • 为什么队列满了再进行创建普通线程执行任务?

看到这两个问题我看到的时候就发现这块是自己知识盲区,不读源码我觉得是搞不明白的🤨,因此本文以此问题为切入点来读读ThreadPoolExecutor的核心源代码

1 Executor 体系

在阅读源码之前,最好先熟悉一下整个相关体系,可以让我们有一个全局观,不容易被带偏(个人的一点心得)

ExecutorService提供了线程池的顶级抽象,以下是它的核心方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Executor {

void execute(Runnable command);
}

public interface ExecutorService extends Executor {

void shutdown();

List<Runnable> shutdownNow();

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Callable<T> task);

Future<?> submit(Runnable task);

...
}

2 ThreadPoolExecutor源码

2.1 Execute()

ThreadPoolExecutor对ExecutorService的方法做了实现,使用execute()方法提交一个任务。

我们从源码分析一下线程池的任务执行流程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 这个变量很巧妙,同时存放了线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
return c & CAPACITY;
}

//任务队列
private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
// 如果任务为null,则抛出异常。
if (command == null) throw new NullPointerException();
int c = ctl.get();

// 1. 当前线程数量 < corePoolSize
// 通过addWorker(command, true)新建一个线程,将任务添加到该线程中并执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2. 当前线程数量 > corePoolSize
// 尝试将任务加入队列,如果工作队列未满则加入成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次获取线程池状态,如果不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果当前工作线程数量为0,新创建一个线程并执行。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}

// 3.队列已满后尝试添加worker
// 注意这里是先执行addWorker再判断,如果addWorker返回false(意味着当前线程数超过max)才会走拒绝策略
else if (!addWorker(command, false))
// 4.队列已满且无法添加线程将会拒绝
reject(command);
}

根据上面的源码,线程池大致提交任务的流程如下图所示:

总结一下就是:

  1. 如果当前核心线程有空闲,就将任务分配给核心线程执行;
  2. 如果核心线程满了,就将任务暂时放在工作队列中;
  3. 如果工作队列也满了,创建普通线程来执行任务,核心+普通线程的数量不超过maxPool;
  4. 如果都不行,就执行拒绝策略

2.2 addWorker()

在上面的源码中,我们看到当需要创建线程时会调用addWorker()方法。addWorker()能够添加新的工作线程到线程池,该方法涉及到线程池的一些重要资源:

  • largestPoolSize(线程池最大大小)和workers(工作线程集合)都属于临界资源,会有多线程竞争问题,因此通过全局锁来进行互斥
  • ctl(工作线程数量)则是通过CAS去保证线程安全的。

以下是内部实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
// 全局锁,并发操作必备 
private final ReentrantLock mainLock = new ReentrantLock();
// 跟踪线程池的最大大小 ,只有在持有全局锁mainLock的前提下才能访问此集合
private int largestPoolSize;
// 工作线程集合 ,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合
private final HashSet<Worker> workers = new HashSet<>();
// 获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 判断线程池的状态是否为 Running
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* 添加新的工作线程到线程池
* @param firstTask 执行的任务
* @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小
* @return 添加成功就返回true否则返回false
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
//这两句用来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
//获取线程池中工作的线程的数量
int wc = workerCountOf(c);
// core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//原子操作将workcount的数量加1
if (compareAndIncrementWorkerCount(c))
break retry;
// 如果线程的状态改变了就再次执行上述操作
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
// else CAS失败时因为workerCount改变了,继续内层循环尝试CAS对worker数量+1
}
}
// 标记工作线程是否启动成功
boolean workerStarted = false;
// 标记工作线程是否创建成功
boolean workerAdded = false;
Worker w = null;
try {

w = new Worker(firstTask);
final Thread t = w.thread; // 这里的thread是worker本身
if (t != null) {
// 加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//获取线程池状态
int rs = runStateOf(ctl.get());
// 线程池是否是可添加worker状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//更新当前工作线程的最大容量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// 工作线程是否启动成功
workerAdded = true;
}
} finally {
// 释放锁
mainLock.unlock();
}
//// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例
if (workerAdded) {
t.start(); // worker.run()调用runWorker()
/// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程启动失败,需要从工作线程中移除对应的Worker
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
} ThreadPoolExecutor.java

放一个大佬总结的流程图

2.3 Worker内部实现

Worker****是基于AQS实现的,内部实现了一个不可重入锁,这个锁的作用是什么呢?

Worker的锁主要的目的是为了控制中断。这也是为什么不直接执行execute(command)提交的command,而要在外面包一层Worker的原因

Tips:worker的锁是用来控制中断的?居然还有这种用法??我直接惊了😮

如何控制呢?

用AQS锁,当运行时上锁,中断必须获取锁,这样就保证了worker不会在运行的时候被突然噶掉

TreadPoolExecutor在调用shutdown()方法中断worker前都要获取worker锁

已启动的worker只有在处于等待状态,即尝试从workQueue中获取任务getTask()时才能被中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

final Thread thread; //利用ThreadFactory和Worker这个Runnable创建的线程对象

Runnable firstTask;

volatile long completedTasks;

Worker(Runnable firstTask) {
// 设置AQS的同步状态state=-1,state是AQS的一个计数器,大于0代表锁已经被获取
// 在调用runWorker()前,禁止interrupt中断,在interruptIfStarted()方法中会判断 getState()>=0
// state=-1表示当前worker未启动,线程还没运行,因此不需要中断
setState(-1);
this.firstTask = firstTask;
// 根据当前worker创建一个线程对象
// 当前worker本身就是一个runnable任务,也就是不会用参数的firstTask创建线程,而是调用当前worker.run()时调用firstTask.run()
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this); //runWorker()是ThreadPoolExecutor的方法,用于启动worker
}

// 0代表“没被锁定”状态
// 1代表“锁定”状态
protected boolean isHeldExclusively() {
return getState() != 0;
}

/**
* 尝试获取锁
* 重写AQS的tryAcquire()
*/
protected boolean tryAcquire(int unused) {
//尝试一次将state从0设置为1,即“锁定”状态,但由于每次都是state 0->1,而不是+1,那么说明不可重入
//且state==-1时也不会获取到锁
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread()); //设置exclusiveOwnerThread=当前线程
return true;
}
return false;
}

/**
* 尝试释放锁
* 不是state-1,而是置为0
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

/**
* 中断(如果运行)
* shutdownNow时会循环对worker线程执行
* 且不需要获取worker锁,即使在worker运行时也可以中断
*/
void interruptIfStarted() {
Thread t;
//如果state>=0、t!=null、且t没有被中断
//new Worker()时state==-1,说明不能中断
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

注意,worker内部的锁是不可重入的互斥锁,而不是用ReentrantLock可重入锁

这是因为我们不想让在调用比如setCorePoolSize()这种线程池控制方法时可以再次获取锁(重入),因为setCorePoolSize()时可能会interruptIdleWorkers(),在对一个线程interrupt时会要w.tryLock()。如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:

  • setMaximumPoolSize()
  • setKeppAliveTime()
  • allowCoreThreadTimeOut()
  • shutdown()

还有一个细节,在new Worker()时初始state=-1,这是为了让线程真正开始后才可以中断,初始化state=-1,在开始runWorker()时将state置为0,而state>=0才可以被中断

2.4 runWorker()

worker的启动方法,一旦启动后,会一直循环从队列中获取任务执行,执行任务时会锁住worker,只有在getTask()方法阶段才会释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

流程图

2.5 getTask()

下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用以及空闲worker的销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private Runnable getTask() {
boolean timedOut = false; // 是否超时

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut=false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
  • 如果线程允许空闲并且当前线程数大于核心线程数,则执行workQueue.take:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
  • 如果线程不允许空闲或者当前线程数小于核心线程数****, 则执行workQueue.poll: 如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null并删除一个worker;

只有在线程数小于coreSize并且没有任务的情况下才会删除worker

3 ThreadPoolExecutor关闭

线程池有两个关闭的方法:

1
2
3
4
5
6
// 阻止提交任务,在所有线程执行完后结束
void shutdown();

// 使用shutdownNow()后触发,线程池不再接受新的任务,尝试终止所有正在执行的任务
List<Runnable> shutdownNow();

那么有以下几个问题:

  • 线程池关闭的流程是怎么样的呢?
  • 线程池的探活机制?(快手面试原题)

3.1 shutdown

shutdown()方法的步骤:

  1. CAS更新线程池状态为shutdown
  2. interruptIdleWorkers()中断所有空闲线程
  3. tryTerminated()尝试终止线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); //上锁

try {
//判断调用者是否有权限shutdown线程池
checkShutdownAccess();

//CAS循环设置线程池状态为shutdown
advanceRunState(SHUTDOWN);

//中断所有空闲线程
interruptIdleWorkers();

onShutdown(); // hook for ScheduledThreadPoolExecutor
}
finally {
mainLock.unlock(); //解锁
}

//尝试终止线程池
tryTerminate();
}

interruptIdleWorkers()会中断所有可能正在等待任务的线程,即未被锁定的worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

3.2 shutdownNow

shutdown()方法的步骤:

  1. CAS更新线程池状态为STOP
  2. interruptWorkers()中断所有空闲线程
  3. tryTerminated()尝试终止线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
 public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}

interruptWorkers()会中断所有已启动的worker,即使worker处于活跃状态

1
2
3
4
5
private void interruptWorkers() {
// assert mainLock.isHeldByCurrentThread();
for (Worker w : workers)
w.interruptIfStarted();
}

4 拓展面试题

了解了上面的核心原理,现在就可以回答最开始的面试题了:

4.1 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

📌主要分三点回答:

  1. 是否会抛出堆栈异常?
  2. 是否会影响其他线程任务?
  3. 异常线程被丢弃还是放回?

4.1.1 是否会抛出堆栈异常?

是否会抛出堆栈异常跟任务提交方式有关。ExecutorService提供了两种任务提交的方式:execute()submit()。上面已经介绍过了execute(),我们再来看看submit()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

// FutureTask
public class FutureTask<V> implements RunnableFuture<V> {
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 1、状态检查
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 2、调用包装前的实际方法执行并获取返回值
result = c.call();
ran = true;
} catch (Throwable ex) {
// 3、如果执行失败,则保存异常信息
result = null;
ran = false;
setException(ex);
}
// 4、如果执行成功,则将返回值保存起来
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
}
AbstractExecutorService.java

可以看到submit()将任务封装成了RunnableFuture(无论是Runnable还是Callable),但本质上还是执行了execute()

因此两者最根本的区别在RunnableFuture.run()方法里,FutureTask会捕获异常并保存堆栈信息。因此如果遇到线程执行异常,执行***submit()任务的线程不会崩溃退出,并能通过get()获取到异常信息,如果不get就啥也不会发生,而execute()***内部是没有进行异常捕获的,因此会让线程崩溃

4.1.2 是否会影响其他线程任务?

答案是不会的,自己实验一下就知道了,这里不多赘述

4.1.3 异常线程被丢弃还是放回?

除了报错信息外,线程池会怎么处理异常线程呢?

其实线程池是可以设置异常处理器的,也就是uncaughtException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// new Thread()
Thread t = new Thread();
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
// TODO
}
});

// 线程池
ExecutorService threadPool = Executors.newFixedThreadPool(1, thread -> {
Thread t = new Thread(thread);
t.setUncaughtExceptionHandler((t1, e) -> {
// TODO
});
return t;
});

注意,这里任务执行异常后,异常会继续向上抛出,processWorkerExit()会被执行。而processWorkerExit()的核心操作就是移除异常的worker,并创建一个新的worker

这也就会造成一个很有意思的现象:

4.2 为什么队列满了再进行创建普通线程执行任务?

普通线程有一个别名,叫急救线程,意在面临工作队列溢出的情况下帮助处理任务。他们就相当于厂里的临时工,只有在正式员工(核心线程)忙不过来且仓库(工作队列)没空间的时候才会用到,因此这样设计的好处是节约系统的资源,创建线程需要获取全局锁,开销较大;而放入队列开销相对较低

但是像 Tomcat 线程池的做法是,遇到超量的任务,先创建普通线程,普通线程不够了再往队列里丢 ,这样做的原因是Tomcat 面临大量网络I/O请求、短连接,放入队列意味着用户需要一直等待,用户肯定不乐意,本着物尽其用的道理,谁也别想闲着。

4.3 Tomcat线程池是如何做的?

简单的说:

  1. 新线程池继承JDK线程池接口,重写了execute()方法:当抛出拒绝策略了再给一次机会,尝试往阻塞队列里插任务,尽最大努力的去执行任务 (意思是不抛弃不放弃,免得用户请求失败)
  2. 新阻塞队列继承了LinkedBlockingQueue,重写了offer()方法:每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败,插入失败后就会尝试创建新的线程,以此来逼迫线程池创建新线程来处理任务 (意思你别想偷懒,把任务丢到队列就不管了)

4.4 shutDown的探活机制?

探活机制指shutDown如何发现正在执行任务的worker并且不直接中断他们

worker在执行任务时,会锁定内部的AQS锁,并且是不可重入的,shutDown关闭worker时,会调用interruptIdleWorkers()方法,该方法会先尝试获取内部锁,获取到后再执行中断


参考: