本文基于 openjdk17+35 (https://github.com/openjdk/jdk/tree/jdk-17+35) 分支编写。
我们可以直接通过构造方法 new ThreadPoolExecutor(...)
。
其核心参数主要有: corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory 和handler。如果不了解的朋友直接看 javadoc。
刚创建的 ThreadPoolExecutor 实例虽然已经处在 RUNNING 状态,但其实什么也没干。
只有你提交任务的时候,线程才会真正被创建和执行。
虽然 ThreadPoolExecutor 提供了多种提交任务的方法—— submit()、execute()、invokeAll() 或者 invokeAny(),但是最后都会由 ThreadPoolExecutor.execute() 方法来创建并启动线程。
execute() 具体做了什么,openjdk 的源代码注释已经做了比较的说明,建议直接查阅:https://github.com/openjdk/jdk/blob/jdk-17%2B35/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1331-L1350
我这里做个简单翻译:
如果当前运行线程数小于 corePoolSize,ThreadPoolExecutor 会尝试新建一条线程,并把当前的任务指定为 firstTask。调用 addWorker() 时会自动检查 runState 和 workerCount,以避免在错误的状态下创建 Worker。
如果任务被成功添加到队列里,ThreadPoolExecutor 仍需复查是否应该创建一条新线程(因为现有的线程可能已经停止)或者线程池已经被终止。所以我们需要复查线程池状态,如果线程池已经停止,我们需要把任务从队列里撤回;又或者线程池内没有任何线程,则需要启动一条新线程。
如果任务无法被添加到队列,ThreadPoolExecutor 会尝试启动一条新线程。如果此操作失败,说明线程池要么已被停止,要么已经无法处理更多任务(队列已满),此时我们需要拒绝该任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 如果当前 worker 数小于 corePoolSize, 创建新 worker(同时也创建了新线程)执行任务
// 线程池的状态检查由 addWorker() 方法完成。
if (addWorker(command, true))
return;
c = ctl.get();
}
// workQueue.offer(command) 尝试把任务添加到 workQueue 队列里
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果任务无法被插入到队列里,就尝试添加 worker 来增加线程
// 如果增加线程失败,调用 reject() 方法触发 RejectedExecutionHandler。
else if (!addWorker(command, false))
reject(command);
}
这里的代码揭露了一个线程池调优经常被提及的点。
如果工作队列 workQueue 太长,workQueue.offer(command)
无法失败,线程池就没有机会扩充非核心线程。
因此,针对具体业务,我们需要考量核心线程数和队列长度。
如果队列长度太长,任务不得不长期排队,影响延迟。
如果队列长度过短,遇到突发高峰就容易造成任务被拒绝。
线程池提供了两个方法停止线程运行:
shutdown() 会把线程池状态置为 SHUTDOWN, 然后停止所有闲置线程。还在运行的线程会继续运行。
shutdownNow() 将线程池池状态置为STOP, 然后停止所有线程。同时返回所有还在队列里的任务。
只要调用这两个关闭方法中的任意一个, isShutDown() 返回true.
当所有任务都成功关闭了, isTerminated()返回true。
比较两个方法的源代码,可以看到区别主要就两点,已通过注释标注出来:
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN); // 状态置为 SHUTDOWN
interruptIdleWorkers(); // 中断 Idle 线程
onShutdown();
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP); // 状态置为 STOP
interruptWorkers(); // 中断所有线程
tasks = drainQueue(); // 返回所有 task
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
Worker 实现了 Runnable 接口,当 Worker 被加入到 ExecutorService 之后,Worker 随即被启动。
Worker 的主要工作都在 runWorker() 方法内进行。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// while 循环不停地用 getTask() 获得 task
while (task != null || (task = getTask()) != null) {
w.lock();
// 如果线程池正在停止,确保本线程也跟随终止
// 否则,确保线程没有被终止
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
// 执行任务
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 当用户进程执行出错,Exception 向上抛出到此
// Worker 会被终止并重现创建
processWorkerExit(w, completedAbruptly);
}
}
当用户提交新的任务时,检查当前的 worker 数和 corePoolSize 。
如果当前 worker 数小于 corePoolSize。直接创建新的线程处理任务。
public void execute(Runnable command) {
...
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
...
}
当调用 addWorker() 创建新线程时,如果发现当前 worker 数已经到达 maximumPoolSize,直接返回 false 表示添加 worker 失败。
private boolean addWorker(Runnable firstTask, boolean core) {
...
for (int c = ctl.get();;) {
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
...
}
在 Worker.runWorker() 中,一个 while 循环会不停地尝试 getTask()。如果 getTask() 返回 null,则循环结束。最后在 finally 块中调用 processWorkerExit() 来清理当前 worker 。
final void runWorker(Worker w) {
...
try {
while (task != null || (task = getTask()) != null) {
...
}
} finally {
processWorkerExit(w, completedAbruptly);
}
}
实现 keepAliveTime 的关键就藏在这个 getTask() 里。详情看注释
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
...
int wc = workerCountOf(c);
// 当 allowCoreThreadTimeOut (允许核心线程过期) 或者当前 worker 数大于 corePoolSize
// timed 被置为 true,表示需要计时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 在第一次循环中,timed=true 而 timedOut=false,不执行任何操作 (先跳过下一行的说明,继续往下看)
// 在第二轮循环中,由于上一次循环 timedOut 已被置为 true,所以 if 成立,worker数减一。并且返回 null
// 而又基于上方对 runWorker() 的分析,getTask() 返回 null,worker 线程会退出循环,运行结束。
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果 timed=true,调用 workQueue.poll() ,在限时之内取得任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
// 如果 r 不为 null,说明非核心线程在处理任务,直接返回拿到的任务。
return r;
// 如果 r 为 null,说明至少有一条非核心线程已经空闲,并且闲置了 keepAliveTime 纳秒。因而 timedOut 被置为 true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
依旧是 java.util.concurrent.ThreadPoolExecutor#execute()
方法:
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
当 addWorker() 失败后,调用 reject() 触发 RejectedExecutionHandler。
// TODO:
#Java