IMLC.ME

Java ScheduledExecutorService 浅析

实现

ScheduledExecutorService 基于 ThreadPoolExecutor 实现。 建议先移步 https://www.imlc.me/java-ThreadPoolExecutor 了解其实现。

在 ThreadPoolExecutor 的基础上,ScheduledExecutorService 通过一下三点,实现了 Scheduler 的功能:

  1. 在 ThreadPoolExecutor 执行完一个任务后,把任务自动重新放回执行队列里。这样就周期性执行的功能。
  2. 替换 BlockingQueue 实现为 DelayedWorkQueue,做到了只有过了指定时间后,才能从队列里拿到任务。这样就实现了定时执行的功能。
  3. 用 ScheduledFutureTask 包装了任务,记录下时间间隔。这样就可以在把任务重新放回队列时,计算下次任务的触发时间。

从 scheduleAtFixedRate() 开始

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    ...
    // 把任务封装成 ScheduledFutureTask
    // 其内部保存了 triggerTime 触发时间
    // 和 period 周期时间
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period),
                                      sequencer.getAndIncrement());
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    // 放进延迟队列里等待执行
    delayedExecute(t);
    return t;
}

跟踪进 delayedExecute()

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        // 把任务放进延迟队列
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(task) && remove(task))
            task.cancel(false);
        else
            ensurePrestart(); // 如果需要,创建并启动一条任务线程
    }
}

到了这里,任务已经在队列里排队了。 与此同时,在工作线程这边(java.util.concurrent.ThreadPoolExecutor#runWorker)

final void runWorker(Worker w) {
    ...
    try {
        // getTask() 从 DelayedWorkQueue 里拉取任务。
        // 因为是延迟队列,所以只有任务到了预定时间,才能被工作线程获取
        // 这样就实现了ScheduledExecutorService 核心的定期执行任务的功能
        while (task != null || (task = getTask()) != null) {
          ...
          task.run();
          ...
        }
        ...
    } finally {
        ...
    }
}

那么这里的 task.run() 具体执行了什么呢? 上方已经看到,在任务放进队列前,我们把任务包装成了 ScheduledFutureTask。 所以 task.run() 实际执行的,会是 java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask.run() 方法

public void run() {
    // 检查当前线程池状态是否可以执行任务
    if (!canRunInCurrentRunState(this))
        cancel(false);
    // 如果不是周期性任务,直接运行任务
    else if (!isPeriodic())
        super.run();
    // 如果是周期性任务,调用 runAndReset() 执行任务。
    // runAndReset() 会在任务结束后重置状态
    else if (super.runAndReset()) {
        // 设定下一次这行任务的时间
        setNextRunTime();
        // 重新放入任务队列
        reExecutePeriodic(outerTask);
    }
}

这次, ScheduledExecutorService 的核心逻辑就完成了。

scheduleWithFixedDelay() 和 scheduleAtFixedRate()

//TODO:

ScheduledFutureTask

//TODO:

DelayedWorkQueue

//TODO:


#Java