Java ScheduledExecutorService 浅析
实现
ScheduledExecutorService 基于 ThreadPoolExecutor 实现。 建议先移步 https://www.imlc.me/java-ThreadPoolExecutor 了解其实现。
在 ThreadPoolExecutor 的基础上,ScheduledExecutorService 通过一下三点,实现了 Scheduler 的功能:
- 在 ThreadPoolExecutor 执行完一个任务后,把任务自动重新放回执行队列里。这样就周期性执行的功能。
- 替换 BlockingQueue 实现为 DelayedWorkQueue,做到了只有过了指定时间后,才能从队列里拿到任务。这样就实现了定时执行的功能。
- 用 ScheduledFutureTask 包装了任务,记录下时间间隔。这样就可以在把任务重新放回队列时,计算下次任务的触发时间。
从 scheduleAtFixedRate() 开始
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
...
// 把任务封装成 ScheduledFutureTask
// 其内部保存了 triggerTime 触发时间
// 和 period 周期时间
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period),
sequencer.getAndIncrement());
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
// 放进延迟队列里等待执行
delayedExecute(t);
return t;
}
跟踪进 delayedExecute()
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 把任务放进延迟队列
super.getQueue().add(task);
if (!canRunInCurrentRunState(task) && remove(task))
task.cancel(false);
else
ensurePrestart(); // 如果需要,创建并启动一条任务线程
}
}
到了这里,任务已经在队列里排队了。 与此同时,在工作线程这边(java.util.concurrent.ThreadPoolExecutor#runWorker)
final void runWorker(Worker w) {
...
try {
// getTask() 从 DelayedWorkQueue 里拉取任务。
// 因为是延迟队列,所以只有任务到了预定时间,才能被工作线程获取
// 这样就实现了ScheduledExecutorService 核心的定期执行任务的功能
while (task != null || (task = getTask()) != null) {
...
task.run();
...
}
...
} finally {
...
}
}
那么这里的 task.run() 具体执行了什么呢? 上方已经看到,在任务放进队列前,我们把任务包装成了 ScheduledFutureTask。 所以 task.run() 实际执行的,会是 java.util.concurrent.ScheduledThreadPoolExecutor.ScheduledFutureTask.run() 方法
public void run() {
// 检查当前线程池状态是否可以执行任务
if (!canRunInCurrentRunState(this))
cancel(false);
// 如果不是周期性任务,直接运行任务
else if (!isPeriodic())
super.run();
// 如果是周期性任务,调用 runAndReset() 执行任务。
// runAndReset() 会在任务结束后重置状态
else if (super.runAndReset()) {
// 设定下一次这行任务的时间
setNextRunTime();
// 重新放入任务队列
reExecutePeriodic(outerTask);
}
}
这次, ScheduledExecutorService 的核心逻辑就完成了。
scheduleWithFixedDelay() 和 scheduleAtFixedRate()
//TODO:
ScheduledFutureTask
//TODO:
DelayedWorkQueue
//TODO:
#Java