ThreadPoolExecutor详解
最近在准备秋招,复习线程池相关知识点的时候看到了几个比较有意思的面试题:
- 线程池执行任务的时候,不显示的进行异常捕获,任务执行遇到异常会怎么样?线程会崩溃吗?
- 为什么队列满了再进行创建普通线程执行任务?
看到这两个问题我看到的时候就发现这块是自己知识盲区,不读源码我觉得是搞不明白的🤨,因此本文以此问题为切入点来读读ThreadPoolExecutor的核心源代码
1 Executor 体系
在阅读源码之前,最好先熟悉一下整个相关体系,可以让我们有一个全局观,不容易被带偏(个人的一点心得)

ExecutorService提供了线程池的顶级抽象,以下是它的核心方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public interface Executor {
void execute(Runnable command); }
public interface ExecutorService extends Executor {
void shutdown(); List<Runnable> shutdownNow(); <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Callable<T> task); Future<?> submit(Runnable task);
... }
|
2 ThreadPoolExecutor源码
2.1 Execute()
ThreadPoolExecutor对ExecutorService的方法做了实现,使用execute()
方法提交一个任务。
我们从源码分析一下线程池的任务执行流程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int workerCountOf(int c) { return c & CAPACITY; }
private final BlockingQueue<Runnable> workQueue;
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (!isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
|
根据上面的源码,线程池大致提交任务的流程如下图所示:

总结一下就是:
- 如果当前核心线程有空闲,就将任务分配给核心线程执行;
- 如果核心线程满了,就将任务暂时放在工作队列中;
- 如果工作队列也满了,创建普通线程来执行任务,核心+普通线程的数量不超过maxPool;
- 如果都不行,就执行拒绝策略
2.2 addWorker()
在上面的源码中,我们看到当需要创建线程时会调用addWorker()
方法。addWorker()
能够添加新的工作线程到线程池,该方法涉及到线程池的一些重要资源:
largestPoolSize
(线程池最大大小)和workers
(工作线程集合)都属于临界资源,会有多线程竞争问题,因此通过全局锁来进行互斥
- 而
ctl
(工作线程数量)则是通过CAS去保证线程安全的。
以下是内部实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
| private final ReentrantLock mainLock = new ReentrantLock(); private int largestPoolSize; private final HashSet<Worker> workers = new HashSet<>();
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static boolean isRunning(int c) { return c < SHUTDOWN; }
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); if (runStateOf(c) != rs) continue retry; } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try {
w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; } ThreadPoolExecutor.java
|
放一个大佬总结的流程图

2.3 Worker内部实现
Worker****是基于AQS实现的,内部实现了一个不可重入锁,这个锁的作用是什么呢?
Worker的锁主要的目的是为了控制中断。这也是为什么不直接执行execute(command)
提交的command,而要在外面包一层Worker的原因
Tips:worker的锁是用来控制中断的?居然还有这种用法??我直接惊了😮
如何控制呢?
用AQS锁,当运行时上锁,中断必须获取锁,这样就保证了worker不会在运行的时候被突然噶掉
TreadPoolExecutor在调用shutdown()
方法中断worker前都要获取worker锁
已启动的worker只有在处于等待状态,即尝试从workQueue中获取任务getTask()
时才能被中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; Runnable firstTask;
volatile long completedTasks; Worker(Runnable firstTask) { setState(-1); this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; }
protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
|
注意,worker内部的锁是不可重入的互斥锁,而不是用ReentrantLock可重入锁
这是因为我们不想让在调用比如setCorePoolSize()
这种线程池控制方法时可以再次获取锁(重入),因为setCorePoolSize()
时可能会interruptIdleWorkers()
,在对一个线程interrupt时会要w.tryLock()
。如果可重入,就可能会在对线程池操作的方法中中断线程,类似方法还有:
setMaximumPoolSize()
setKeppAliveTime()
allowCoreThreadTimeOut()
shutdown()
还有一个细节,在new Worker()
时初始state=-1,这是为了让线程真正开始后才可以中断,初始化state=-1,在开始runWorker()
时将state置为0,而state>=0才可以被中断
2.4 runWorker()
worker的启动方法,一旦启动后,会一直循环从队列中获取任务执行,执行任务时会锁住worker,只有在getTask()
方法阶段才会释放锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); try { task.run(); afterExecute(task, null); } catch (Throwable ex) { afterExecute(task, ex); throw ex; } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
|
流程图

2.5 getTask()
下面来看一下getTask()
方法,这里面涉及到keepAliveTime
的使用以及空闲worker的销毁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| private Runnable getTask() { boolean timedOut = false;
for (;;) { int c = ctl.get(); int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
|
- 如果线程允许空闲并且当前线程数大于核心线程数,则执行
workQueue.take
:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
- 如果线程不允许空闲或者当前线程数小于核心线程数****, 则执行
workQueue.poll
: 如果在keepAliveTime
时间内,阻塞队列还是没有任务,则返回null并删除一个worker;
只有在线程数小于coreSize并且没有任务的情况下才会删除worker
3 ThreadPoolExecutor关闭
线程池有两个关闭的方法:
1 2 3 4 5 6
| void shutdown();
List<Runnable> shutdownNow();
|
那么有以下几个问题:
- 线程池关闭的流程是怎么样的呢?
- 线程池的探活机制?(快手面试原题)
3.1 shutdown
shutdown()
方法的步骤:
- CAS更新线程池状态为shutdown
interruptIdleWorkers()
中断所有空闲线程
tryTerminated()
尝试终止线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } tryTerminate(); }
|
interruptIdleWorkers()
会中断所有可能正在等待任务的线程,即未被锁定的worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); } }
|
3.2 shutdownNow
shutdown()
方法的步骤:
- CAS更新线程池状态为STOP
interruptWorkers()
中断所有空闲线程
tryTerminated()
尝试终止线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
|
interruptWorkers()
会中断所有已启动的worker,即使worker处于活跃状态
1 2 3 4 5
| private void interruptWorkers() { for (Worker w : workers) w.interruptIfStarted(); }
|
4 拓展面试题
了解了上面的核心原理,现在就可以回答最开始的面试题了:
4.1 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?
📌主要分三点回答:
- 是否会抛出堆栈异常?
- 是否会影响其他线程任务?
- 异常线程被丢弃还是放回?
4.1.1 是否会抛出堆栈异常?
是否会抛出堆栈异常跟任务提交方式有关。ExecutorService提供了两种任务提交的方式:execute()
和submit()
。上面已经介绍过了execute()
,我们再来看看submit()
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public Future<?> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFuture<Void> ftask = newTaskFor(task, null); execute(ftask); return ftask; }
public class FutureTask<V> implements RunnableFuture<V> { public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } } AbstractExecutorService.java
|
可以看到submit()
将任务封装成了RunnableFuture
(无论是Runnable还是Callable),但本质上还是执行了execute()
因此两者最根本的区别在RunnableFuture.run()
方法里,FutureTask会捕获异常并保存堆栈信息。因此如果遇到线程执行异常,执行***submit()
任务的线程不会崩溃退出,并能通过get()
获取到异常信息,如果不get就啥也不会发生,而execute()
***内部是没有进行异常捕获的,因此会让线程崩溃
4.1.2 是否会影响其他线程任务?
答案是不会的,自己实验一下就知道了,这里不多赘述
4.1.3 异常线程被丢弃还是放回?
除了报错信息外,线程池会怎么处理异常线程呢?
其实线程池是可以设置异常处理器的,也就是uncaughtException
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Thread t = new Thread(); t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { } });
ExecutorService threadPool = Executors.newFixedThreadPool(1, thread -> { Thread t = new Thread(thread); t.setUncaughtExceptionHandler((t1, e) -> { }); return t; });
|

注意,这里任务执行异常后,异常会继续向上抛出,processWorkerExit()
会被执行。而processWorkerExit()
的核心操作就是移除异常的worker,并创建一个新的worker:

这也就会造成一个很有意思的现象:

4.2 为什么队列满了再进行创建普通线程执行任务?
普通线程有一个别名,叫急救线程,意在面临工作队列溢出的情况下帮助处理任务。他们就相当于厂里的临时工,只有在正式员工(核心线程)忙不过来且仓库(工作队列)没空间的时候才会用到,因此这样设计的好处是节约系统的资源,创建线程需要获取全局锁,开销较大;而放入队列开销相对较低
但是像 Tomcat 线程池的做法是,遇到超量的任务,先创建普通线程,普通线程不够了再往队列里丢 ,这样做的原因是Tomcat 面临大量网络I/O请求、短连接,放入队列意味着用户需要一直等待,用户肯定不乐意,本着物尽其用的道理,谁也别想闲着。
4.3 Tomcat线程池是如何做的?
简单的说:
- 新线程池继承JDK线程池接口,重写了
execute()
方法:当抛出拒绝策略了再给一次机会,尝试往阻塞队列里插任务,尽最大努力的去执行任务 (意思是不抛弃不放弃,免得用户请求失败)
- 新阻塞队列继承了LinkedBlockingQueue,重写了
offer()
方法:每次向队列插入任务,判断如果当前线程数小于最大线程数则插入失败,插入失败后就会尝试创建新的线程,以此来逼迫线程池创建新线程来处理任务 (意思你别想偷懒,把任务丢到队列就不管了)
4.4 shutDown的探活机制?
探活机制指shutDown如何发现正在执行任务的worker并且不直接中断他们
worker在执行任务时,会锁定内部的AQS锁,并且是不可重入的,shutDown关闭worker时,会调用interruptIdleWorkers()
方法,该方法会先尝试获取内部锁,获取到后再执行中断
参考: