IMLC.ME

Java ThreadPoolExecutor 实现浅析

本文基于 openjdk17+35 (https://github.com/openjdk/jdk/tree/jdk-17+35) 分支编写。

ThreadPoolExecutor

创建

我们可以直接通过构造方法 new ThreadPoolExecutor(...) 。 其核心参数主要有: corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory 和handler。如果不了解的朋友直接看 javadoc

刚创建的 ThreadPoolExecutor 实例虽然已经处在 RUNNING 状态,但其实什么也没干。 只有你提交任务的时候,线程才会真正被创建和执行。

提交任务

虽然 ThreadPoolExecutor 提供了多种提交任务的方法—— submit()、execute()、invokeAll() 或者 invokeAny(),但是最后都会由 ThreadPoolExecutor.execute() 方法来创建并启动线程。

execute() 具体做了什么,openjdk 的源代码注释已经做了比较的说明,建议直接查阅:https://github.com/openjdk/jdk/blob/jdk-17%2B35/src/java.base/share/classes/java/util/concurrent/ThreadPoolExecutor.java#L1331-L1350

我这里做个简单翻译:

  1. 如果当前运行线程数小于 corePoolSize,ThreadPoolExecutor 会尝试新建一条线程,并把当前的任务指定为 firstTask。调用 addWorker() 时会自动检查 runState 和 workerCount,以避免在错误的状态下创建 Worker。

  2. 如果任务被成功添加到队列里,ThreadPoolExecutor 仍需复查是否应该创建一条新线程(因为现有的线程可能已经停止)或者线程池已经被终止。所以我们需要复查线程池状态,如果线程池已经停止,我们需要把任务从队列里撤回;又或者线程池内没有任何线程,则需要启动一条新线程。

  3. 如果任务无法被添加到队列,ThreadPoolExecutor 会尝试启动一条新线程。如果此操作失败,说明线程池要么已被停止,要么已经无法处理更多任务(队列已满),此时我们需要拒绝该任务。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        // 如果当前 worker 数小于 corePoolSize, 创建新 worker(同时也创建了新线程)执行任务
        // 线程池的状态检查由 addWorker() 方法完成。
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // workQueue.offer(command) 尝试把任务添加到 workQueue 队列里
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 如果任务无法被插入到队列里,就尝试添加 worker 来增加线程
    // 如果增加线程失败,调用 reject() 方法触发 RejectedExecutionHandler。
    else if (!addWorker(command, false))
        reject(command);
}

这里的代码揭露了一个线程池调优经常被提及的点。 如果工作队列 workQueue 太长,workQueue.offer(command) 无法失败,线程池就没有机会扩充非核心线程。 因此,针对具体业务,我们需要考量核心线程数和队列长度。 如果队列长度太长,任务不得不长期排队,影响延迟。 如果队列长度过短,遇到突发高峰就容易造成任务被拒绝。

停止

线程池提供了两个方法停止线程运行:

  • shutdown() 会把线程池状态置为 SHUTDOWN, 然后停止所有闲置线程。还在运行的线程会继续运行

  • shutdownNow() 将线程池池状态置为STOP, 然后停止所有线程。同时返回所有还在队列里的任务。

只要调用这两个关闭方法中的任意一个, isShutDown() 返回true. 当所有任务都成功关闭了, isTerminated()返回true。

比较两个方法的源代码,可以看到区别主要就两点,已通过注释标注出来:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(SHUTDOWN); // 状态置为 SHUTDOWN
        interruptIdleWorkers(); // 中断 Idle 线程
        onShutdown();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
}
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP); // 状态置为 STOP
        interruptWorkers(); // 中断所有线程
        tasks = drainQueue(); // 返回所有 task
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}

ThreadPoolExecutor.Worker

Worker 实现了 Runnable 接口,当 Worker 被加入到 ExecutorService 之后,Worker 随即被启动。

Worker 的主要工作都在 runWorker() 方法内进行。

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // while 循环不停地用 getTask() 获得 task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // 如果线程池正在停止,确保本线程也跟随终止
                // 否则,确保线程没有被终止
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    try {
                        // 执行任务
                        task.run();
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 当用户进程执行出错,Exception 向上抛出到此
            // Worker 会被终止并重现创建
            processWorkerExit(w, completedAbruptly);
        }
    }

核心参数是如何实现的

corePoolSize 和 maximumPoolSize 是如何实现的?

当用户提交新的任务时,检查当前的 worker 数和 corePoolSize 。 如果当前 worker 数小于 corePoolSize。直接创建新的线程处理任务。

public void execute(Runnable command) {
        ...
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        ...
}

当调用 addWorker() 创建新线程时,如果发现当前 worker 数已经到达 maximumPoolSize,直接返回 false 表示添加 worker 失败。

private boolean addWorker(Runnable firstTask, boolean core) {
    ...
    for (int c = ctl.get();;) {
        // Check if queue empty only if necessary.
        if (runStateAtLeast(c, SHUTDOWN)
            && (runStateAtLeast(c, STOP)
                || firstTask != null
                || workQueue.isEmpty()))
            return false;

        for (;;) {
            if (workerCountOf(c)
                >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateAtLeast(c, SHUTDOWN))
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    ...
}
keepAliveTime 是如何实现的?

在 Worker.runWorker() 中,一个 while 循环会不停地尝试 getTask()。如果 getTask() 返回 null,则循环结束。最后在 finally 块中调用 processWorkerExit() 来清理当前 worker 。

final void runWorker(Worker w) {
    ...
    try {
        while (task != null || (task = getTask()) != null) {
        ...
        }
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

实现 keepAliveTime 的关键就藏在这个 getTask() 里。详情看注释

private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        ...

        int wc = workerCountOf(c);

        // 当 allowCoreThreadTimeOut (允许核心线程过期) 或者当前 worker 数大于 corePoolSize
        // timed 被置为 true,表示需要计时
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 在第一次循环中,timed=true 而 timedOut=false,不执行任何操作 (先跳过下一行的说明,继续往下看)
        // 在第二轮循环中,由于上一次循环 timedOut 已被置为 true,所以 if 成立,worker数减一。并且返回 null
        // 而又基于上方对 runWorker() 的分析,getTask() 返回 null,worker 线程会退出循环,运行结束。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {

            // 如果 timed=true,调用 workQueue.poll() ,在限时之内取得任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                // 如果 r 不为 null,说明非核心线程在处理任务,直接返回拿到的任务。
                return r;
            // 如果 r 为 null,说明至少有一条非核心线程已经空闲,并且闲置了 keepAliveTime 纳秒。因而 timedOut 被置为 true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
ExceptionHandler 是如何实现的?

依旧是 java.util.concurrent.ThreadPoolExecutor#execute() 方法:

if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
else if (!addWorker(command, false))
    reject(command);

当 addWorker() 失败后,调用 reject() 触发 RejectedExecutionHandler。

How does invokeAll and invokeAny work?

// TODO:


#Java