Skip to content
欢迎扫码关注公众号

JEP 499: Structured Concurrency (Fourth Preview) | 结构化并发(第四次预览)

概述

通过引入 结构化并发 的 API 来简化并发编程。结构化并发将运行在不同线程中的相关任务视为一个工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观测性。这是一个 预览 API

历史

结构化并发由 JEP 428 提出,并在 JDK 19 中作为一个孵化 API 交付。它通过 JEP 437JDK 20 中重新孵化,并进行了一次小更新以继承作用域值 (JEP 429)。它首次在 JDK 21 中预览,通过 JEP 453,其中 StructuredTaskScope::fork(...) 方法被修改为返回一个 Subtask,而不是一个 Future。它在 JDK 22 中通过 JEP 462 重新预览,并在 JDK 23 中通过 JEP 480 再次预览,没有变化。

我们在此提议在 JDK 24 中再次预览该 API,不作更改,以便从实际使用中获得更多反馈时间。

目标

  • 推广一种可以消除由于取消和关闭而产生的常见风险(如线程泄漏和取消延迟)的并发编程风格。

  • 改进并发代码的可观测性。

非目标

  • 并不是要替换 java.util.concurrent 包中的任何并发构造,例如 ExecutorServiceFuture

  • 并不是要定义 Java 平台的权威结构化并发 API。第三方库或未来的 JDK 版本可以定义其他结构化并发构造。

  • 并不是要定义在线程之间共享数据流(即 通道) 的方法。我们可能会在未来提议这样做。

  • 并不是要用新的线程取消机制替换现有的线程中断机制。我们可能会在未来提议这样做。

动机

开发人员通过将任务分解为多个子任务来管理程序中的复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果这些子任务彼此足够独立,并且如果有足够的硬件资源,则可以通过并发执行子任务使整个任务运行得更快(即降低延迟)。例如,通过组合多个 I/O 操作的结果的任务如果每个 I/O 操作都在自己的线程中并发执行,则会运行得更快。虚拟线程 (JEP 444) 使得将线程专用于每一个这样的 I/O 操作变得成本效益高,但管理由此产生的大量线程仍然是一个挑战。

使用 ExecutorService 的非结构化并发

自 Java 5 引入的 java.util.concurrent.ExecutorService API 帮助开发者并发地执行子任务。

例如,这里有一个名为 handle() 的方法,它代表了一个服务器应用程序中的任务。通过向 ExecutorService 提交两个子任务来处理传入请求。一个子任务执行 findUser() 方法,另一个子任务执行 fetchOrder() 方法。ExecutorService 立即为每个子任务返回一个 Future,并根据 Executor 的调度策略并发地执行这些子任务。handle() 方法通过对其未来结果进行阻塞调用(get() 方法)等待子任务的结果,因此说该任务是 加入 其子任务的。

java
Response handle() throws ExecutionException, InterruptedException {
    Future<String>  user  = esvc.submit(() -> findUser());
    Future<Integer> order = esvc.submit(() -> fetchOrder());
    String theUser  = user.get();   // 加入 findUser
    int    theOrder = order.get();  // 加入 fetchOrder
    return new Response(theUser, theOrder);
}

由于子任务是并发执行的,每个子任务可以独立成功或失败。(在此上下文中,失败意味着抛出异常。)通常,像 handle() 这样的任务如果其任何子任务失败,则应该失败。当发生故障时,理解线程的生命周期可能会非常复杂:

  • 如果 findUser() 抛出异常,则在调用 user.get()handle() 将抛出异常,但 fetchOrder() 将继续在其自己的线程中运行。这是 线程泄漏,最坏的情况下,fetchOrder() 线程会干扰其他任务。

  • 如果执行 handle() 的线程被中断,中断不会传播到子任务。无论 findUser() 还是 fetchOrder() 线程都会泄露,在 handle() 失败后仍然继续运行。

  • 如果 findUser() 需要很长时间执行,但 fetchOrder() 在此期间失败了,那么 handle() 将不必要地等待 findUser() 而不是取消它,通过阻塞 user.get() 来实现。只有在 findUser() 完成并且 user.get() 返回之后,order.get() 才会抛出异常,导致 handle() 失败。

在每种情况下,问题在于我们的程序逻辑上是以任务 - 子任务关系组织的,但这些关系只存在于开发者的脑海中。

这不仅创造了更多的错误空间,而且使得诊断和解决此类错误变得更加困难。例如,可观测性工具如线程转储将显示 handle()findUser()fetchOrder() 出现在无关线程的调用栈上,没有任何关于任务 - 子任务关系的提示。

我们可能尝试做得更好,比如当错误发生时显式地取消其他子任务,例如通过使用 try-finally 包装任务并在失败任务的 catch 块中调用其他任务的 future 的 cancel(boolean) 方法。我们也需要在 try-with-resources 语句中使用 ExecutorService,正如 JEP 444 中的例子所示,因为 Future 不提供等待已取消任务的方法。但是所有这一切都很难正确实现,而且往往使代码的逻辑意图更难以辨识。跟踪任务间的关系,并手动添加所需的取消边,对开发者来说要求很高。

需要手动协调生命周期的原因是 ExecutorServiceFuture 允许不受限制的并发模式。涉及的线程之间没有约束或顺序。一个线程可以创建一个 ExecutorService,第二个线程可以提交工作给它,而执行工作的线程与第一个或第二个线程都没有关系。此外,在一个线程提交工作之后,完全不同的线程可以等待执行结果。拥有对 Future 引用的任何代码都可以加入它,即通过调用 get() 来等待其结果——即使是获取 Future 的线程之外的代码也可以这样做。实际上,由一个任务启动的子任务不必返回给提交它的任务。它可以返回给任意数量的任务——或者可能不返回。

由于 ExecutorServiceFuture 允许这样非结构化的使用,它们并不强制甚至跟踪任务和子任务之间的关系,即使这些关系很常见且有用。相应地,即使子任务在同一任务中提交和加入,一个子任务的失败不能自动导致另一个子任务的取消:在上述 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder() 的 future 与 findUser() 的 future 没有关系,二者也都与最终通过其 get() 方法加入它的线程无关。与其要求开发者手动管理这样的取消操作,我们希望可靠地自动化这个过程。

任务结构应反映代码结构

ExecutorService 下的线程自由组合不同,单线程代码的执行总是强制实现任务和子任务的层次结构。方法的主体块 {...} 对应于一个任务,而在块内调用的方法对应于子任务。被调用的方法必须返回给调用它的方法,或者向其抛出异常。被调用的方法不能比调用它的方法存活更久,也不能返回给或抛出异常给不同的方法。因此所有子任务都在任务之前完成,每个子任务都是其父任务的孩子,并且每个子任务相对于其他子任务和任务的生命周期由代码的语法块结构决定。

例如,在 handle() 的这个单线程版本中,任务 - 子任务关系可以从语法结构中清晰看出:

java
Response handle() throws IOException {
    String theUser  = findUser();
    int    theOrder = fetchOrder();
    return new Response(theUser, theOrder);
}

我们不会启动 fetchOrder() 子任务,直到 findUser() 子任务完成,无论成功与否。如果 findUser() 失败,则根本不会启动 fetchOrder(),并且 handle() 任务会隐式失败。一个子任务只能返回给其父任务这一事实非常重要:这意味着父任务可以将一个子任务的失败隐式地视为触发取消其他未完成子任务并随后自己失败的因素。

在单线程代码中,任务 - 子任务层次结构在运行时体现在调用栈上。因此我们可以免费获得相应的父子关系,这些关系控制错误传播。当观察单个线程时,层次关系是显而易见的:findUser()(以及之后的 fetchOrder())看起来隶属于 handle()。这使得回答“handle() 现在正在处理什么?”这个问题变得容易。

如果任务及其子任务之间的父子关系能够从代码的语法结构中体现出来并在运行时具体化——就像单线程代码一样,那么并发编程将会更容易、更可靠且更具可观测性。语法结构将界定子任务的生命周期,并启用线程间层次结构的运行时表示,类似于线程内的调用栈。该表示形式将支持错误传播和取消操作,以及对并发程序进行有意义的观测。

(Java 平台已经有一个用于对并发任务施加结构的 API,即 java.util.concurrent.ForkJoinPool,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务而非涉及 I/O 的任务设计的。)

结构化并发

结构化并发 是一种保留任务和子任务之间自然关系的并发编程方法,它导致更可读、可维护和可靠的并发代码。“结构化并发”一词是由 Martin Sústrik 创造并由 Nathaniel J. Smith 推广开来的。来自其他语言如 Erlang 的层次监督者的想法为结构化并发中的错误处理设计提供了信息。

结构化并发源于一个简单的原则:

如果一个任务拆分为并发子任务,那么它们都会返回到同一个地方,即任务的代码块。

在结构化并发中,子任务代表任务工作。任务等待子任务的结果并监控它们是否有失败。与单线程代码中的结构化编程技术一样,结构化并发对于多线程的强大之处在于两个理念:通过代码块流动执行的明确定义入口和出口点,以及操作生命周期的严格嵌套,这种嵌套反映了代码中的语法嵌套。

由于代码块的入口和出口点定义明确,因此并发子任务的生命周期被限制在其父任务的语法块内。由于兄弟子任务的生命周期嵌套在其父任务的生命周期内,因此可以作为一个单元来推理和管理。因为父任务的生命周期又嵌套在其父级的生命周期内,所以运行时可以将任务层次结构具体化为一棵树,这是单线程调用栈的并发对应物。这允许代码将策略(如截止日期)应用于整个任务子树,并允许可观测性工具以子任务从属于其父任务的方式呈现。

结构化并发非常适合虚拟线程,这些轻量级线程由 JDK 实现。许多虚拟线程共享相同的操作系统线程,从而允许存在非常大量的虚拟线程。除了数量众多之外,虚拟线程足够便宜,可以表示任何并发行为单元,即使是涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发一次处理成千上万甚至数百万的传入请求:它可以为处理每个请求的任务分配一个新的虚拟线程,当任务通过提交子任务进行并发执行时,它可以为每个子任务分配一个新的虚拟线程。在幕后,任务 - 子任务关系通过安排每个虚拟线程携带对其唯一父节点的引用来具体化为一棵树,类似于调用栈中的帧引用其唯一的调用者。

总之,虚拟线程提供了丰富的线程资源。结构化并发可以正确且稳健地协调它们,并使可观测性工具按照开发者的理解显示线程。在 Java 平台上拥有一个结构化并发的 API 将使构建可维护、可靠且可观测的服务器应用程序变得更加容易。

描述

结构化并发 API 的主要类是位于 java.util.concurrent 包中的 StructuredTaskScope。此类允许开发者将一个任务结构化为一组并发子任务,并作为一个单元进行协调。通过单独 分叉 (forking) 子任务并在可能的情况下作为一个单元 加入 (joining) 或取消它们,子任务在其自己的线程中执行。子任务的成功结果或异常被聚合并由父任务处理。StructuredTaskScope 将子任务的生命周期限制在一个清晰的 词法作用域 内,在这个作用域中,任务与其子任务之间的所有交互——分叉、加入、取消、错误处理和结果组合——都发生在此作用域内。

以下是使用 StructuredTaskScope 编写的早期 handle() 示例(ShutdownOnFailure下文 中有解释):

java
Response handle() throws ExecutionException, InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Supplier<String>  user  = scope.fork(() -> findUser());
        Supplier<Integer> order = scope.fork(() -> fetchOrder());

        scope.join()            // Join both subtasks
             .throwIfFailed();  // ... and propagate errors

        // Here, both subtasks have succeeded, so compose their results
        return new Response(user.get(), order.get());
    }
}

与原始示例相比,理解这里涉及的线程的生命周期非常简单:在所有条件下,它们的生命周期都被限制在词法作用域内,即带有资源的 try 语句的主体。此外,StructuredTaskScope 的使用确保了一些有价值的属性:

  • 具有短路的错误处理 —— 如果 findUser()fetchOrder() 子任务通过抛出异常而失败,则另一个未完成的子任务将被取消。(这是由 ShutdownOnFailure 实现的关闭策略管理的;其他策略也是可能的)。

  • 取消传播 —— 如果运行 handle() 的线程在调用 join() 之前或期间被中断,则当线程退出作用域时,两个子任务会自动取消。

  • 清晰性 —— 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(处理已经完成的子任务的结果)还是失败(由于子任务已结束,无需进一步清理)。

  • 可观测性 —— 如 下文 所述,线程转储清楚地显示了任务层次结构,运行 findUser()fetchOrder() 的线程作为范围的子项显示。

StructuredTaskScope 是一个 预览 API,默认情况下禁用

要使用 StructuredTaskScope API,您必须启用预览 API,方法如下:

  • 使用 javac --release 24 --enable-preview Main.java 编译程序,并使用 java --enable-preview Main 运行它;或者,

  • 当使用 源代码启动器 时,使用 java --enable-preview Main.java 运行程序;或者,

  • 当使用 jshell 时,以 jshell --enable-preview 启动它。

使用 StructuredTaskScope

StructuredTaskScope API 如下:

java
public class StructuredTaskScope<T> implements AutoCloseable {

    public <U extends T> Subtask<U> fork(Callable<? extends U> task);
    public void shutdown();

    public StructuredTaskScope<T> join() throws InterruptedException;
    public StructuredTaskScope<T> joinUntil(Instant deadline)
        throws InterruptedException, TimeoutException;
    public void close();

    protected void handleComplete(Subtask<? extends T> handle);
    protected final void ensureOwnerAndJoined();

}

使用 StructuredTaskScope 的代码一般工作流程是:

  1. 创建一个作用域。创建该作用域的线程是其 所有者

  2. 使用 fork(Callable) 方法在作用域内分叉子任务。

  3. 任何时刻,任一子任务或作用域的所有者都可以调用作用域的 shutdown() 方法来取消未完成的子任务,并防止分叉新的子任务。

  4. 作用域的所有者作为一个单元加入作用域,即所有子任务。所有者可以调用作用域的 join() 方法,等待直到所有子任务都已完成(无论成功与否)或者通过 shutdown() 被取消。或者,它可以调用作用域的 joinUntil(java.time.Instant) 方法,等待到某个截止时间点。

  5. 加入后,处理子任务中的任何错误并处理它们的结果。

  6. 关闭作用域,通常通过 try-with-resources 隐式实现。这会关闭尚未关闭的作用域,并等待已被取消但尚未完成的任何子任务结束。

每次调用 fork(...) 都会启动一个新的线程来执行一个子任务,默认情况下是一个虚拟线程。一个子任务可以创建自己的嵌套 StructuredTaskScope 来分叉自己的子任务,从而创建层次结构。这种层次结构反映在代码的块结构中,它限制了子任务的生命周期:一旦作用域被关闭,所有子任务的线程都被保证已经终止,当退出块时不会留下任何线程。

作用域内的任意子任务、嵌套作用域内的任意子子任务以及作用域的所有者可以随时调用作用域的 shutdown() 方法以表明任务完成——即使其他子任务仍在执行。shutdown() 方法会 中断 仍在执行子任务的线程,并导致 join()joinUntil(Instant) 方法返回。因此,所有的子任务都应该以响应中断的方式编写。在调用 shutdown() 之后分叉的新子任务将处于 UNAVAILABLE 状态且不会运行。实际上,shutdown() 是顺序代码中 break 语句的并发类似物。

在作用域内调用 join()joinUntil(Instant) 是必须的。如果作用域的块在加入之前退出,则作用域将等待所有子任务终止然后抛出异常。

可能在加入之前或期间中断拥有作用域的线程。例如,它可能是已被关闭的外部作用域的一个子任务。如果发生这种情况,join()joinUntil(Instant) 将抛出异常,因为继续没有意义。然后,try-with-resources 语句将关闭作用域,这将取消所有子任务并等待它们终止。这样就有效自动传播了任务取消至其子任务。如果在子任务终止或调用 shutdown() 之前 joinUntil(Instant) 方法的截止时间已到,则它将抛出异常,再次由 try-with-resources 语句关闭作用域。

join() 成功完成时,每个子任务要么成功完成,要么失败,要么因为作用域被关闭而被取消。

加入后,作用域的所有者处理失败的子任务并处理成功完成的子任务的结果;这通常是通过关闭策略完成的(见 下文)。使用 Subtask.get() 方法可以获得成功完成的任务的结果。get() 方法从不阻塞;如果在加入前或子任务未成功完成时错误地调用它,则会抛出 IllegalStateException

在作用域内分叉的子任务继承了 ScopedValue 绑定(JEP 446)。如果作用域的所有者从绑定的 ScopedValue 中读取一个值,则每个子任务将读取相同的值。

如果作用域的所有者本身就是一个现有作用域的子任务,即它是作为分叉的子任务创建的,则该作用域将成为新作用域的父级。因此,作用域和子任务形成了一棵树。

StructuredTaskScope 的结构化使用是在运行时强制实施的。例如,尝试从不在作用域层次结构中的线程(即所有者、子任务和嵌套作用域中的子任务(子子任务))调用 fork(Callable) 将因异常而失败。在 try-with-resources 块之外使用作用域并在未调用 close() 或未保持正确的 close() 调用嵌套的情况下返回,可能会导致作用域的方法抛出 StructureViolationException

StructuredTaskScope 对并发操作施加了结构和顺序。因此,它不实现 ExecutorServiceExecutor 接口,因为这些接口的实例通常以非结构化的方式使用(见 下文)。然而,迁移到使用 StructuredTaskScope 的代码,尽管需要结构化,也是直接明了的。

实践中,大多数 StructuredTaskScope 的使用并不会直接利用 StructuredTaskScope 类,而是使用接下来部分中描述的实现关闭策略的两个子类之一。在其他场景下,用户可能会编写自己的子类来实现自定义的关闭策略。

关闭策略

在处理并发子任务时,通常会使用 短路模式 以避免做不必要的工作。有时,例如,如果其中一个子任务失败(即“调用全部”),或者相反,如果其中一个子任务成功(即“调用任意一个”),则取消所有子任务是有意义的。StructuredTaskScope 的两个子类 ShutdownOnFailureShutdownOnSuccess 分别支持这些模式,并提供了当第一个子任务失败或成功时关闭作用域的策略。

关闭策略还额外提供了集中处理异常和可能的成功结果的方法。这与结构化并发的精神一致,根据该精神,整个作用域被视为一个单元。

这是一个带有失败时关闭策略的 StructuredTaskScope(也在上面的 handle() 示例中使用),它并发运行一组任务并在任何一个任务失败时失败:

java
<T> List<T> runAll(List<Callable<T>> tasks) 
        throws InterruptedException, ExecutionException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
        scope.join()
             .throwIfFailed();  // 如果有任何子任务失败,则传播异常
        // 此处,所有任务均已成功完成,因此组合它们的结果
        return suppliers.stream().map(Supplier::get).toList();
    }
}

这是一个带有成功时关闭策略的 StructuredTaskScope,它返回第一个成功子任务的结果:

java
<T> T race(List<Callable<T>> tasks, Instant deadline) 
        throws InterruptedException, ExecutionException, TimeoutException {
    try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
        for (var task : tasks) {
            scope.fork(task);
        }
        return scope.joinUntil(deadline)
                    .result();  // 如果没有子任务成功完成,则抛出异常
    }
}

一旦有一个子任务成功,此作用域将自动关闭,取消未完成的子任务。如果所有子任务都失败或给定的截止时间已过,则任务失败。这种模式在例如需要从一组冗余服务中的任何一个获取结果的服务器应用程序中非常有用。

虽然这两个关闭策略是开箱即用提供的,开发人员可以创建抽象其他模式的自定义策略(见 下文)。

处理结果

通过关闭策略集中处理异常(例如,使用 ShutdownOnFailure::throwIfFailed)后加入,作用域的所有者可以使用从 fork(...) 调用返回的 Subtask 对象来处理子任务的结果(如果这些结果不由策略处理,例如通过 ShutdownOnSuccess::result())。

通常,作用域所有者将调用的唯一 Subtask 方法是 get() 方法。所有其他的 Subtask 方法通常仅用于自定义关闭策略的 handleComplete(...) 方法实现中(见 下文)。实际上,我们建议由 fork(...) 返回的 Subtask 引用变量被类型化为,例如 Supplier<String> 而不是 Subtask<String>(当然,除非你选择使用 var)。如果关闭策略本身处理子任务结果——如 ShutdownOnSuccess 情况——那么应该完全避免使用 fork(...) 返回的 Subtask 对象,并将 fork(...) 方法视为返回 void。子任务应将其结果作为信息返回,以便在策略进行集中异常处理后供作用域所有者处理。

如果作用域所有者处理子任务异常以产生复合结果,而不是使用关闭策略,则可以从子任务中将异常作为值返回。例如,以下是一个方法,它并行运行一系列任务,并返回包含每个任务各自成功或异常结果的已完成 Future 列表:

java
<T> List<Future<T>> executeAll(List<Callable<T>> tasks)
        throws InterruptedException {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    	  List<? extends Supplier<Future<T>>> futures = tasks.stream()
    	      .map(task -> asFuture(task))
     	      .map(scope::fork)
     	      .toList();
    	  scope.join();
    	  return futures.stream().map(Supplier::get).toList();
    }
}

static <T> Callable<Future<T>> asFuture(Callable<T> task) {
   return () -> {
       try {
           return CompletableFuture.completedFuture(task.call());
       } catch (Exception ex) {
           return CompletableFuture.failedFuture(ex);
       }
   };
}

自定义关闭策略

StructuredTaskScope 可以被扩展,并且可以重写其受保护的 handleComplete(...) 方法,以实现不同于 ShutdownOnSuccessShutdownOnFailure 的策略。例如,一个子类可以:

  • 收集成功完成的子任务的结果并忽略失败的子任务,
  • 在子任务失败时收集异常,或
  • 调用 shutdown() 方法来关闭并在满足某些条件时使 join() 唤醒。

当子任务完成时,即使在调用了 shutdown() 之后,它也会作为 Subtask 报告给 handleComplete(...) 方法:

java
public sealed interface Subtask<T> extends Supplier<T> {
    enum State { SUCCESS, FAILED, UNAVAILABLE }

    State state();
    Callable<? extends T> task();
    T get();
    Throwable exception();
}

在调用 shutdown() 之前,无论是成功(SUCCESS 状态)还是不成功(FAILED 状态)完成的子任务都会调用 handleComplete(...) 方法。只有在子任务处于 SUCCESS 状态时才能调用 get() 方法,并且只有在子任务处于 FAILED 状态时才能调用 exception() 方法;在其他情况下调用 get()exception() 将导致抛出 IllegalStateExceptionUNAVAILABLE 状态表示以下之一:(1) 子任务已分叉但尚未完成;(2) 子任务在关闭后完成,或 (3) 子任务在关闭后分叉因此未启动。handleComplete(...) 方法永远不会针对处于 UNAVAILABLE 状态的子任务调用。

子类通常会定义方法,以便在 join() 方法返回后执行的代码可以使用结果、状态或其他结果。一个收集结果并忽略失败子任务的子类可以定义一个返回结果集合的方法。实现了子任务失败即关闭策略的子类可能定义一个获取第一个失败子任务异常的方法。

这里是一个收集成功完成子任务结果的 StructuredTaskScope 子类的例子。它定义了 results() 方法,供主任务用来检索结果。

java
class MyScope<T> extends StructuredTaskScope<T> {

    private final Queue<T> results = new ConcurrentLinkedQueue<>();

    MyScope() { super(null, Thread.ofVirtual().factory()); }

    @Override
    protected void handleComplete(Subtask<? extends T> subtask) {
        if (subtask.state() == Subtask.State.SUCCESS)
            results.add(subtask.get());
    }

    @Override
    public MyScope<T> join() throws InterruptedException {
        super.join();
        return this;
    }

    // 返回成功完成的子任务的结果流
    public Stream<T> results() {
        super.ensureOwnerAndJoined();
        return results.stream();
    }

}

这个自定义策略可以这样使用:

java
<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
    try (var scope = new MyScope<T>()) {
        for (var task : tasks) scope.fork(task);
        return scope.join()
                    .results().toList();
    }
}

扇入场景

上面的例子主要关注 扇出 场景,这些场景管理多个并发的传出 I/O 操作。StructuredTaskScope扇入 场景中也非常有用,这些场景管理多个并发的传入 I/O 操作。在这样的场景中,我们通常会根据传入请求动态创建数量未知的子任务。

这是一个服务器示例,在 StructuredTaskScope 内部分叉子任务来处理传入连接:

java
void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
    try (var scope = new StructuredTaskScope<Void>()) {
        try {
            while (true) {
                var socket = serverSocket.accept();
                scope.fork(() -> handle(socket));
            }
        } finally {
            // 如果发生了错误或者我们被中断,则停止接受新连接
            scope.shutdown();  // 关闭所有活跃连接
            scope.join();
        }
    }
}

从并发的角度来看,这种场景的不同之处不仅在于请求的方向,还在于任务的持续时间和数量。这里与前面的例子不同,作用域的所有者在其持续时间内是无界的——仅当其被中断时才会停止。子任务的数量也是未知的,因为它们是根据外部事件动态分叉的。

所有处理连接的子任务都在该作用域内创建,因此在线程转储中很容易看到它们的目的,这些子任务将显示为作用域所有者的子项。作为一个单元关闭整个服务也很容易。

可观察性

我们扩展了由 JEP 444 新增的 JSON 线程转储格式,以显示 StructuredTaskScope 如何将线程分组为层次结构:

bash
$ jcmd <pid> Thread.dump_to_file -format=json <file>

每个作用域的 JSON 对象包含在该作用域内分叉的线程及其堆栈跟踪数组。一个作用域的所有者线程通常会在 join 方法中被阻塞,等待子任务完成;通过显示结构化并发所施加的树形层次结构,线程转储可以很容易地查看子任务线程正在做什么。作用域的 JSON 对象还包含对其父级的引用,因此可以从转储中重构程序的结构。

也可以使用 com.sun.management.HotSpotDiagnosticsMXBean API 生成此类线程转储,可以直接使用,也可以通过平台的 MBeanServer 和本地或远程 JMX 工具间接使用。

为什么 fork(...) 不返回 Future

StructuredTaskScope API 处于孵化阶段时,fork(...) 方法返回了一个 Future。这通过使 fork(...) 类似于现有的 ExecutorService::submit 方法带来了熟悉感。然而,鉴于 StructuredTaskScope 旨在与 ExecutorService 不同地使用——如上所述,以结构化的方式——Future 的使用带来的困惑多于清晰度。

  • 熟悉的 Future 用法涉及调用其 get() 方法,该方法会阻塞直到结果可用。但在 StructuredTaskScope 的上下文中,以这种方式使用 Future 不仅被不鼓励而且是适得其反的。只有在 join() 返回后,即已知它们已完成或被取消时,才应查询结构化的 Future 对象,此时应使用的方法不是熟悉的 get() 而是新引入的 resultNow(),它从不阻塞。

  • 一些开发者想知道为什么 fork(...) 不返回更强大的 CompletableFuture 对象。由于由 fork(...) 返回的 Future 只应在确定其已完成之后使用,CompletableFuture 不会提供任何优势,因为它的高级特性仅对未完成的 future 有用。此外,CompletableFuture 设计用于异步编程范式,而 StructuredTaskScope 则鼓励阻塞范式。

简而言之,FutureCompletableFuture 的设计提供了在结构化并发中有害的自由度。

  • 结构化并发关注的是将运行在不同线程上的多个任务视为单个工作单元,而 Future 在处理多个任务作为独立任务时最有用。一个作用域应该只阻塞一次以等待其子任务的结果,然后集中处理异常。因此,在绝大多数情况下,对于从 fork(...) 返回的 Future 唯一应该调用的方法是 resultNow()。这是对 Future 普通用途的一个显著改变,而 Future 接口在这种情况下正确使用的注意力分散因素。

在当前 API 中,Subtask::get() 的行为与 API 孵化期间的 Future::resultNow() 完全相同。

替代方案

  • 增强 ExecutorService 接口。我们为此接口原型实现了一种总是强制执行结构并限制哪些线程可以提交任务的实现。然而,我们发现它是有问题的,因为 JDK 和生态系统中 ExecutorService(及其父接口 Executor)的大多数用法都不是结构化的。为一种受到严格限制的概念重用相同的 API 势必会引起混淆。例如,将结构化的 ExecutorService 实例传递给接受此类型现有方法几乎肯定会在大多数情况下抛出异常。