IMLC.ME

Java Phaser

什么是 Phaser

Phaser 是 JUC 包中提供的一个同步屏障工具。它能让多个线程在到达屏障点后暂停,直至所有线程都达到屏障点,再恢复运行。 与 CyclicBarrier 和 CountDownLatch 相比,Phaser 支持动态配置线程数、可重复使用等特性,有着更佳的灵活性。

正如其名称所示,Phaser 让线程到达一个 Phaser(阶段) 后,暂停并等待所有线程都到达这个阶段。此后所有线程恢复运行,然后又会再下一个阶段暂停并等待所有线程到达。 整个过程不停重复,直到 Phaser 被停止或者没有剩余的任务。

下图展示了 Phaser 的一个基本的使用场景: 三条线程分别处理任务 A, B 和 C,并通过 Phaser 进行同步。 并且要求ABC三个任务做完之后,才能开始下一轮的工作。

在起始阶段0,A、B、C 同时开始工作。假设A先完成工作。由于使用了 Phaser 设置同步屏障,A会暂停并且等待BC结束。当ABC全部完成,同步屏障解除,ABC进入阶段1,继续执行操作。

Preview

如何使用 Phaser

下方代码实现了刚才提到的使用案例。
Task 类继承自 Thread,用于模拟实际的工作负载。 Task 启动后会不停地执行任务,停下等待所有工作完成,执行任务,停下等待所有工作完成...
具体细节请看代码注释

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;

public class GetStartedWithPhaser {

  // Task 类继承 Thread ,用于模拟工作负载
  public static class Task extends Thread {

    private Phaser phaser;

    public Task(Phaser phaser, String name) {
      super(name);
      this.phaser = phaser;
      this.phaser.register(); // 通过调用 register() 方法告知 Phaser 有多少个任务需要等待
    }

    @Override
    public void run() {
      try {
        // 不停地执行任务
        while(true) {
          System.out.println("当前阶段" + this.phaser.getPhase() + ": " + this.getName() + " 开始");
          Thread.sleep(1000);
          System.out.println("当前阶段" + this.phaser.getPhase() + ": " + this.getName() + " 结束并等待");
          this.phaser.arriveAndAwaitAdvance(); // 告知 Phaser 自己已完成任务。等待直到可以继续执行任务。
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

  }

  public static void main(String[] args) {

    Phaser phaser = new Phaser(); // 创建一个 Phaser
    ExecutorService executor = Executors.newFixedThreadPool(3);

    // 启动 A、B 和 C 三个任务
    executor.submit(new Task(phaser, "Task A"));
    executor.submit(new Task(phaser, "Task B"));
    executor.submit(new Task(phaser, "Task C"));

  }

}

日志输出:

当前阶段0: Task A 开始
当前阶段0: Task B 开始
当前阶段0: Task C 开始
当前阶段0: Task A 结束并等待
当前阶段0: Task C 结束并等待
当前阶段0: Task B 结束并等待
当前阶段1: Task B 开始
当前阶段1: Task C 开始
当前阶段1: Task A 开始
当前阶段1: Task B 结束并等待
当前阶段1: Task A 结束并等待
当前阶段1: Task C 结束并等待
当前阶段2: Task B 开始
当前阶段2: Task C 开始
当前阶段2: Task A 开始
当前阶段2: Task B 结束并等待
当前阶段2: Task A 结束并等待
当前阶段2: Task C 结束并等待
当前阶段3: Task C 开始
当前阶段3: Task A 开始
当前阶段3: Task B 开始
当前阶段3: Task A 结束并等待
当前阶段3: Task B 结束并等待
当前阶段3: Task C 结束并等待

Phaser 的详细介绍

至此,你应该对 Phaser 有个大概的了解。现在可以真正开始了解 Phaser。首先让我们来了解一下 Phaser 的设计思路。

先介绍一下有关的概念:

Phase Phaser 可以重复进入同步屏障,线程执行到在屏障前等待的这个过程,成为一个阶段(Phase)。通过屏障后,就进入了下一个阶段(phase 计数加一)。
Party 可以理解为参与者、任务或工作线程。这是一个抽象的概念,实际中多为工作线程,但也可以是子Phaser,又或者单纯只是一个逻辑上的概念。
Register/Deregister Register 注册,即告知 Phaser 有新的 Party 加入。Deregister 注销,则告知 Phaser 要移除一个 Party。
Arrive 告知 Phaser 有一个任务已经完成。

Phaser 的工作流程可以简单地描述为:

  1. 新创建一个 Phaser
  2. 向 Phaser 注册 Party
  3. Party 执行工作,完成后通知 Phaser 工作完成,即 Arrive。然后进入等待状态。
  4. 当所有 Party 报告 Arrive,Phaser 同步屏障解除,所有 Party 恢复运行。Phaser 进入新的阶段。

理解了概念和流程后,Phaser 类提供的方法就无需具体解释了:

int register(): 注册一个新的 Party
int bulkRegister(int parties): 注册一批 Parties
int arrive(): 通知 Phaser 有 Party 到达,此方法不会等待所有 Party 到达。
int arriveAndAwaitAdvance(): 通知 Phaser 有 Party 到达,并且等待所有 Party 到达。
int arriveAndDeregister(): 通知 Phaser 有 Party 到达,并且注销该 Party,不会参与下一阶段的执行。次方法也不会等待。
int awaitAdvance(int phase):等待 Phaser 结束指定阶段。如果指定的 phase 不是当前阶段或 Phase 已停止,本方法会立刻返回。

剩下的 getters —— getPhase()getRegisteredParties()getArrivedParties()getUnarrivedParties() 就见名知意了。

与 CyclicBarrier 或 CountDownLatch 相比,Phaser 的同步线程数(也就是 party 数),是可以动态配置的。 开发者既可以通过 Phaser(int parties) 在构造对象时指定 Party 数目,也可以在新建 Phaser 后, 通过 register(),bulkRegister(int) 增加或通过 arriveAndDeregister() 减少 Party 数。

与 CyclicBarrier 操作对象必须是线程不同,Phaser 维护的 Party 只是一个抽象的概念,或者说,只是一个数字。 Party 实际中可以是一个工作线程、一个代码逻辑,或者是一个子Phase(该概念会在后面介绍,请先忽略)。

这里提供一个用 Phaser 实现 CountDownLatch 的实现,帮助读者理解。

import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class PhaserBasedCountDownLatch {

  public static void main(String[] args) {

    final int TOTAL = 10;
    PhaserBasedCountDownLatch latch = new PhaserBasedCountDownLatch(TOTAL);

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    
    // 每秒钟计数一次
    scheduler.scheduleAtFixedRate(() -> {
      latch.countDown();
      System.out.println(String.format("Counted: %s/%s", latch.getCount(), TOTAL));
    }, 1, 1, TimeUnit.SECONDS);

    // 等待计数器数到10
    latch.await();
    scheduler.shutdownNow();
    System.out.println("Tasks completed");
  }

  private Phaser phaser;
  public PhaserBasedCountDownLatch(int count) {
    this.phaser = new Phaser(count);
  }

  public void countDown() {
    this.phaser.arrive();
  }

  public int getCount() {

    // 数到 10 后,Phaser phase 加一,ArrivedParties 重置为0。
    // 这里为了日志符合直观,特殊处理了一下
    if(this.phaser.getPhase() == 1) {
      return 10;
    }

    return this.phaser.getArrivedParties();
  }

  public void await() {
    this.phaser.awaitAdvance(this.phaser.getPhase());
    this.phaser.forceTermination();
  }
}

日志输出:

Counted: 1/10
Counted: 2/10
Counted: 3/10
Counted: 4/10
Counted: 5/10
Counted: 6/10
Counted: 7/10
Counted: 8/10
Counted: 9/10
Counted: 10/10
Tasks completed

深入 Phaser

终止 Phaser (Termination )

Phaser 是可以被终止的。在两种情况下,Phaser 会进入终止状态:

  1. onAdvance() 接口返回 true。如果用户不重写 onAdvance(), Phaser 的 默认实现 为,当 registeredParties 为0时,返回 true, 停止 Phaser。
  2. 用户调用 phaser.forceTermination()

当 Phaser 进入终止状态,所有同步方法调用会立刻返回,并且返回一个负数作为标志。所有注册操作也不会生效。

分层 Tiering

当需要管理的 Party 太多,线程竞争的成本会变得不可接受。Phaser 提供了一个分层机制,其内部维护了一个树状结构的 Phaser 关系。 只有所有子 Phaser 的 parties 都 arrive 了,作为父节点的 Phaser 才会结束当前阶段。

子 phaser 对于父 phaser 来说,也被抽象成一个 party。

根 Phaser
    ├── 任务 1 (arrived)
    ├── 任务 2 (unarrived)
    └── 子 Phaser (unarrived)
        ├── 任务 3 (unarrived)
        └── 任务 4 (arrived)

以上图为例, 对于根 phaser 来说,它有 3 个 parties。其中一个已到达(任务1),两个未到达(任务2和子 Phaser)。 对于子 phaser 来说,它有 2 个 parties。其中一个已到达,一个未到达。所以,不言自明,子 phaser 对根 phaser 来说,就是一个未到达的 party。

Phaser 通过构造方法定义其上级 Phaser。

public Phaser(Phaser parent) {
        this(parent, 0);
}

而状态信息如何在上下级 phaser 中传递,逻辑过于简单,就不在详细分析。只简单举一个例子。 下方截取一部分 doArrive() 的代码:

public class Phaser {
  private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
      long s = (root == this) ? state : reconcileState(); // 如果自己不是根 phaser,则尝试整理所有子节点的状态(避免父phaser已经进入了下一阶段,子phaser还停留在上一阶段)
      //...
      if (STATE.compareAndSet(this, s, s-=adjust)) {
        if (unarrived == 1) {
          long n = s & PARTIES_MASK;  // base of next state
          int nextUnarrived = (int)n >>> PARTIES_SHIFT;
          if (root == this) {
            // 如果是根 phaser,直接执行操作
            //...
          }
          else if (nextUnarrived == 0) { // propagate deregistration
            // 如果不是根 phaser,向上级节点汇报注销操作
            phase = parent.doArrive(ONE_DEREGISTER);
            STATE.compareAndSet(this, s, s | EMPTY);
          }
          else
            // 如果不是根 phaser,向上级节点汇报到达状态。
            phase = parent.doArrive(ONE_ARRIVAL);
        }
        return phase;
      }
    }
  }
}

构造 Phaser 时,用户可以指定其上级 Phaser。

代码实现

如何实现线程等待 - internalAwaitAdvance()

Phaser 的 awaitAdvance()arriveAndAwaitAdvance(),最后都会调用 internalAwaitAdvance() 方法。 下方代码来自 Java 11,代码解释请看注释。

public class Phaser {

  /**
   * Possibly blocks and waits for phase to advance unless aborted.
   * Call only on root phaser.
   *
   * @param phase 当前 phase
   * @param node node 记录了等待是否可中断和超时时间。如果 node 非空,则进行可中断或有超时地等待。如果 node 为空,则进行不可中断的等待
   * @return current phase
   */
  private int internalAwaitAdvance(int phase, QNode node) {
    // assert root == this;
    releaseWaiters(phase-1);          // ensure old queue clean
    boolean queued = false;           // true when node is enqueued
    int lastUnarrived = 0;            // to increase spins upon change
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    
    // while 循环配置 Thread.onSpinWait() 实现忙等(busy-waiting)
    // (p = (int)((s = state) >>> PHASE_SHIFT)) 通过位操作取得当前 phase
    // 如果当前 phase 与入参 phase 一直,说明 Phaser 还未进入下一阶段,则进行等待。
    // 如果不一致,说明 Phaser 进入了下一阶段(所有 Party 已经 arrive,继而 phase 递增一),结束等待。 
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) { 
      if (node == null) {           // spinning in noninterruptible mode 以不可中断的方式进行忙等
        int unarrived = (int)s & UNARRIVED_MASK;
        if (unarrived != lastUnarrived &&
            (lastUnarrived = unarrived) < NCPU)
          spins += SPINS_PER_ARRIVAL;
        boolean interrupted = Thread.interrupted();
        if (interrupted || --spins < 0) { // need node to record intr 
          // 如果当前线程被中断或超出自旋次数,赋值 node 变量,改为可中断等待。
          node = new QNode(this, phase, false, false, 0L);
          node.wasInterrupted = interrupted;
        }
        else
          Thread.onSpinWait();
      }
      else if (node.isReleasable()) // done or aborted
        break;
      else if (!queued) {           // push onto queue
        AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
        QNode q = node.next = head.get();
        if ((q == null || q.phase == phase) &&
            (int)(state >>> PHASE_SHIFT) == phase) // avoid stale enq
          queued = head.compareAndSet(q, node);
      }
      else {
        try {
          // 由于 QNode 实现了 ForkJoinPool.ManagedBlocker 接口
          // 可调用 ForkJoinPool.managedBlock(node) 接口进行线程挂起操作
          ForkJoinPool.managedBlock(node);   
        } catch (InterruptedException cantHappen) {
          node.wasInterrupted = true;
        }
      }
    }

    if (node != null) {
      if (node.thread != null)
        node.thread = null;       // avoid need for unpark()
      if (node.wasInterrupted && !node.interruptible)
        Thread.currentThread().interrupt();
      if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
        // 如果代码进行到这里,当时实际却没有进入到下一个 phase
        // 说明等待操作失败(被打断或超时),中断等待并恢复所有线程
        return abortWait(phase); // possibly clean up on abort
    }
    releaseWaiters(phase);
    return p;
  }
  
}
如何恢复线程运行 - releaseWaiters()

releaseWaiters() 非常简单,遍历等待队列并且调用 LockSupport.unpark() 唤醒线程。 与普通的遍历队列的代码相比,这里的队列阶段使用了 AtomicReference 作为节点,通过 compareAndSet() 赋值子阶段引用。 这个在多线程编程中是基础操作了。

public class Phaser {
  private void releaseWaiters(int phase) {
    QNode q;   // first element of queue
    Thread t;  // its thread
    AtomicReference<QNode> head = (phase & 1) == 0 ? evenQ : oddQ;
    while ((q = head.get()) != null &&
        q.phase != (int)(root.state >>> PHASE_SHIFT)) {
      if (head.compareAndSet(q, q.next) &&
          (t = q.thread) != null) {
        q.thread = null;
        LockSupport.unpark(t);
      }
    }
  }
}

参考文献

Phaser Javadoc


#Java