Java Phaser
什么是 Phaser
Phaser 是 JUC 包中提供的一个同步屏障工具。它能让多个线程在到达屏障点后暂停,直至所有线程都达到屏障点,再恢复运行。 与 CyclicBarrier 和 CountDownLatch 相比,Phaser 支持动态配置线程数、可重复使用等特性,有着更佳的灵活性。
正如其名称所示,Phaser 让线程到达一个 Phaser(阶段) 后,暂停并等待所有线程都到达这个阶段。此后所有线程恢复运行,然后又会再下一个阶段暂停并等待所有线程到达。 整个过程不停重复,直到 Phaser 被停止或者没有剩余的任务。
下图展示了 Phaser 的一个基本的使用场景: 三条线程分别处理任务 A, B 和 C,并通过 Phaser 进行同步。 并且要求ABC三个任务做完之后,才能开始下一轮的工作。
在起始阶段0,A、B、C 同时开始工作。假设A先完成工作。由于使用了 Phaser 设置同步屏障,A会暂停并且等待BC结束。当ABC全部完成,同步屏障解除,ABC进入阶段1,继续执行操作。
如何使用 Phaser
下方代码实现了刚才提到的使用案例。
Task 类继承自 Thread,用于模拟实际的工作负载。 Task 启动后会不停地执行任务,停下等待所有工作完成,执行任务,停下等待所有工作完成...
具体细节请看代码注释
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
public class GetStartedWithPhaser {
// Task 类继承 Thread ,用于模拟工作负载
public static class Task extends Thread {
private Phaser phaser;
public Task(Phaser phaser, String name) {
super(name);
this.phaser = phaser;
this.phaser.register(); // 通过调用 register() 方法告知 Phaser 有多少个任务需要等待
}
@Override
public void run() {
try {
// 不停地执行任务
while(true) {
System.out.println("当前阶段" + this.phaser.getPhase() + ": " + this.getName() + " 开始");
Thread.sleep(1000);
System.out.println("当前阶段" + this.phaser.getPhase() + ": " + this.getName() + " 结束并等待");
this.phaser.arriveAndAwaitAdvance(); // 告知 Phaser 自己已完成任务。等待直到可以继续执行任务。
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
Phaser phaser = new Phaser(); // 创建一个 Phaser
ExecutorService executor = Executors.newFixedThreadPool(3);
// 启动 A、B 和 C 三个任务
executor.submit(new Task(phaser, "Task A"));
executor.submit(new Task(phaser, "Task B"));
executor.submit(new Task(phaser, "Task C"));
}
}
日志输出:
当前阶段0: Task A 开始
当前阶段0: Task B 开始
当前阶段0: Task C 开始
当前阶段0: Task A 结束并等待
当前阶段0: Task C 结束并等待
当前阶段0: Task B 结束并等待
当前阶段1: Task B 开始
当前阶段1: Task C 开始
当前阶段1: Task A 开始
当前阶段1: Task B 结束并等待
当前阶段1: Task A 结束并等待
当前阶段1: Task C 结束并等待
当前阶段2: Task B 开始
当前阶段2: Task C 开始
当前阶段2: Task A 开始
当前阶段2: Task B 结束并等待
当前阶段2: Task A 结束并等待
当前阶段2: Task C 结束并等待
当前阶段3: Task C 开始
当前阶段3: Task A 开始
当前阶段3: Task B 开始
当前阶段3: Task A 结束并等待
当前阶段3: Task B 结束并等待
当前阶段3: Task C 结束并等待
Phaser 的详细介绍
至此,你应该对 Phaser 有个大概的了解。现在可以真正开始了解 Phaser。首先让我们来了解一下 Phaser 的设计思路。
先介绍一下有关的概念:
Phase Phaser 可以重复进入同步屏障,线程执行到在屏障前等待的这个过程,成为一个阶段(Phase)。通过屏障后,就进入了下一个阶段(phase 计数加一)。
Party 可以理解为参与者、任务或工作线程。这是一个抽象的概念,实际中多为工作线程,但也可以是子Phaser,又或者单纯只是一个逻辑上的概念。
Register/Deregister Register 注册,即告知 Phaser 有新的 Party 加入。Deregister 注销,则告知 Phaser 要移除一个 Party。
Arrive 告知 Phaser 有一个任务已经完成。
Phaser 的工作流程可以简单地描述为:
- 新创建一个 Phaser
- 向 Phaser 注册 Party
- Party 执行工作,完成后通知 Phaser 工作完成,即 Arrive。然后进入等待状态。
- 当所有 Party 报告 Arrive,Phaser 同步屏障解除,所有 Party 恢复运行。Phaser 进入新的阶段。
理解了概念和流程后,Phaser 类提供的方法就无需具体解释了:
int register()
: 注册一个新的 Party
int bulkRegister(int parties)
: 注册一批 Parties
int arrive()
: 通知 Phaser 有 Party 到达,此方法不会等待所有 Party 到达。
int arriveAndAwaitAdvance()
: 通知 Phaser 有 Party 到达,并且等待所有 Party 到达。
int arriveAndDeregister()
: 通知 Phaser 有 Party 到达,并且注销该 Party,不会参与下一阶段的执行。次方法也不会等待。
int awaitAdvance(int phase)
:等待 Phaser 结束指定阶段。如果指定的 phase 不是当前阶段或 Phase 已停止,本方法会立刻返回。
剩下的 getters —— getPhase()
、getRegisteredParties()
、getArrivedParties()
、getUnarrivedParties()
就见名知意了。
与 CyclicBarrier 或 CountDownLatch 相比,Phaser 的同步线程数(也就是 party 数),是可以动态配置的。
开发者既可以通过 Phaser(int parties)
在构造对象时指定 Party 数目,也可以在新建 Phaser 后,
通过 register()
,bulkRegister(int)
增加或通过 arriveAndDeregister()
减少 Party 数。
与 CyclicBarrier 操作对象必须是线程不同,Phaser 维护的 Party 只是一个抽象的概念,或者说,只是一个数字。 Party 实际中可以是一个工作线程、一个代码逻辑,或者是一个子Phase(该概念会在后面介绍,请先忽略)。
这里提供一个用 Phaser 实现 CountDownLatch 的实现,帮助读者理解。
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class PhaserBasedCountDownLatch {
public static void main(String[] args) {
final int TOTAL = 10;
PhaserBasedCountDownLatch latch = new PhaserBasedCountDownLatch(TOTAL);
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
// 每秒钟计数一次
scheduler.scheduleAtFixedRate(() -> {
latch.countDown();
System.out.println(String.format("Counted: %s/%s", latch.getCount(), TOTAL));
}, 1, 1, TimeUnit.SECONDS);
// 等待计数器数到10
latch.await();
scheduler.shutdownNow();
System.out.println("Tasks completed");
}
private Phaser phaser;
public PhaserBasedCountDownLatch(int count) {
this.phaser = new Phaser(count);
}
public void countDown() {
this.phaser.arrive();
}
public int getCount() {
// 数到 10 后,Phaser phase 加一,ArrivedParties 重置为0。
// 这里为了日志符合直观,特殊处理了一下
if(this.phaser.getPhase() == 1) {
return 10;
}
return this.phaser.getArrivedParties();
}
public void await() {
this.phaser.awaitAdvance(this.phaser.getPhase());
this.phaser.forceTermination();
}
}
日志输出:
Counted: 1/10
Counted: 2/10
Counted: 3/10
Counted: 4/10
Counted: 5/10
Counted: 6/10
Counted: 7/10
Counted: 8/10
Counted: 9/10
Counted: 10/10
Tasks completed
深入 Phaser
终止 Phaser (Termination )
Phaser 是可以被终止的。在两种情况下,Phaser 会进入终止状态:
onAdvance()
接口返回 true。如果用户不重写onAdvance()
, Phaser 的 默认实现 为,当 registeredParties 为0时,返回 true, 停止 Phaser。- 用户调用
phaser.forceTermination()
。
当 Phaser 进入终止状态,所有同步方法调用会立刻返回,并且返回一个负数作为标志。所有注册操作也不会生效。
分层 Tiering
当需要管理的 Party 太多,线程竞争的成本会变得不可接受。Phaser 提供了一个分层机制,其内部维护了一个树状结构的 Phaser 关系。 只有所有子 Phaser 的 parties 都 arrive 了,作为父节点的 Phaser 才会结束当前阶段。
子 phaser 对于父 phaser 来说,也被抽象成一个 party。
根 Phaser
├── 任务 1 (arrived)
├── 任务 2 (unarrived)
└── 子 Phaser (unarrived)
├── 任务 3 (unarrived)
└── 任务 4 (arrived)
以上图为例, 对于根 phaser 来说,它有 3 个 parties。其中一个已到达(任务1),两个未到达(任务2和子 Phaser)。 对于子 phaser 来说,它有 2 个 parties。其中一个已到达,一个未到达。所以,不言自明,子 phaser 对根 phaser 来说,就是一个未到达的 party。
Phaser 通过构造方法定义其上级 Phaser。
public Phaser(Phaser parent) {
this(parent, 0);
}
而状态信息如何在上下级 phaser 中传递,逻辑过于简单,就不在详细分析。只简单举一个例子。
下方截取一部分 doArrive()
的代码:
public class Phaser {
private int doArrive(int adjust) {
final Phaser root = this.root;
for (;;) {
long s = (root == this) ? state : reconcileState(); // 如果自己不是根 phaser,则尝试整理所有子节点的状态(避免父phaser已经进入了下一阶段,子phaser还停留在上一阶段)
//...
if (STATE.compareAndSet(this, s, s-=adjust)) {
if (unarrived == 1) {
long n = s & PARTIES_MASK; // base of next state
int nextUnarrived = (int)n >>> PARTIES_SHIFT;
if (root == this) {
// 如果是根 phaser,直接执行操作
//...
}
else if (nextUnarrived == 0) { // propagate deregistration
// 如果不是根 phaser,向上级节点汇报注销操作
phase = parent.doArrive(ONE_DEREGISTER);
STATE.compareAndSet(this, s, s | EMPTY);
}
else
// 如果不是根 phaser,向上级节点汇报到达状态。
phase = parent.doArrive(ONE_ARRIVAL);
}
return phase;
}
}
}
}
构造 Phaser 时,用户可以指定其上级 Phaser。
代码实现
如何实现线程等待 - internalAwaitAdvance()
Phaser 的 awaitAdvance()
和 arriveAndAwaitAdvance()
,最后都会调用 internalAwaitAdvance()
方法。
下方代码来自 Java 11,代码解释请看注释。
public class Phaser {
/**
* Possibly blocks and waits for phase to advance unless aborted.
* Call only on root phaser.
*
* @param phase 当前 phase
* @param node node 记录了等待是否可中断和超时时间。如果 node 非空,则进行可中断或有超时地等待。如果 node 为空,则进行不可中断的等待
* @return current phase
*/
private int internalAwaitAdvance(int phase, QNode node) {
// assert root == this;
releaseWaiters(phase-1); // ensure old queue clean
boolean queued = false; // true when node is enqueued
int lastUnarrived = 0; // to increase spins upon change
int spins = SPINS_PER_ARRIVAL;
long s;
int p;
// while 循环配置 Thread.onSpinWait() 实现忙等(busy-waiting)
// (p = (int)((s = state) >>> PHASE_SHIFT)) 通过位操作取得当前 phase
// 如果当前 phase 与入参 phase 一直,说明 Phaser 还未进入下一阶段,则进行等待。
// 如果不一致,说明 Phaser 进入了下一阶段(所有 Party 已经 arrive,继而 phase 递增一),结束等待。
while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
if (node == null) { // spinning in noninterruptible mode 以不可中断的方式进行忙等
int unarrived = (int)s & UNARRIVED_MASK;
if (unarrived != lastUnarrived &&
(lastUnarrived = unarrived) < NCPU)
spins += SPINS_PER_ARRIVAL;
boolean interrupted = Thread.interrupted();
if (interrupted || --spins < 0) { // need node to record intr
// 如果当前线程被中断或超出自旋次数,赋值 node 变量,改为可中断等待。
node = new QNode(this, phase, false, false, 0L);
node.wasInterrupted = interrupted;
}
else
Thread.onSpinWait();
}
else if (node.isReleasable()) // done or aborted
break;
else if (!queued) { // push onto queue
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
QNode q = node.next = head.get();
if ((q == null || q.phase == phase) &&
(int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
queued = head.compareAndSet(q, node);
}
else {
try {
// 由于 QNode 实现了 ForkJoinPool.ManagedBlocker 接口
// 可调用 ForkJoinPool.managedBlock(node) 接口进行线程挂起操作
ForkJoinPool.managedBlock(node);
} catch (InterruptedException cantHappen) {
node.wasInterrupted = true;
}
}
}
if (node != null) {
if (node.thread != null)
node.thread = null; // avoid need for unpark()
if (node.wasInterrupted && !node.interruptible)
Thread.currentThread().interrupt();
if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
// 如果代码进行到这里,当时实际却没有进入到下一个 phase
// 说明等待操作失败(被打断或超时),中断等待并恢复所有线程
return abortWait(phase); // possibly clean up on abort
}
releaseWaiters(phase);
return p;
}
}
如何恢复线程运行 - releaseWaiters()
releaseWaiters()
非常简单,遍历等待队列并且调用 LockSupport.unpark()
唤醒线程。
与普通的遍历队列的代码相比,这里的队列阶段使用了 AtomicReference 作为节点,通过 compareAndSet()
赋值子阶段引用。
这个在多线程编程中是基础操作了。
public class Phaser {
private void releaseWaiters(int phase) {
QNode q; // first element of queue
Thread t; // its thread
AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
while ((q = head.get()) != null &&
q.phase != (int)(root.state >>> PHASE_SHIFT)) {
if (head.compareAndSet(q, q.next) &&
(t = q.thread) != null) {
q.thread = null;
LockSupport.unpark(t);
}
}
}
}
参考文献
#Java