JEP 505:结构化并发(第五预览版)
JEP 505:结构化并发(第五预览版)
原文:JEP 505- Structured Concurrency (Fifth Preview)
作者:
日期:2025-10-26
| 作者 | 艾伦·贝特曼(Alan Bateman)、维克托·克兰(Viktor Klang)、罗恩·普雷斯勒(Ron Pressler) |
|---|---|
| 所有者 | 艾伦·贝特曼(Alan Bateman) |
| 类型 | 特性 |
| 范围 | SE |
| 状态 | 已关闭 / 已交付 |
| 发布版本 | 25 |
| 组件 | 核心库 |
| 讨论组 | loom - dev@openjdk.org |
| 相关内容 | JEP 499:结构化并发(第四预览版) JEP 525:结构化并发(第六预览版) |
| 审核人 | 保罗·桑多兹(Paul Sandoz) |
| 批准人 | 保罗·桑多兹(Paul Sandoz) |
| 创建时间 | 2024 年 9 月 18 日 04:58 |
| 更新时间 | 2025 年 10 月 9 日 07:21 |
| 问题编号 | 8340343 |
摘要
通过引入“结构化并发”API 简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单个工作单元,从而简化错误处理和取消操作,提高可靠性并增强可观察性。这是一个 预览 API。
历史
结构化并发在 JDK 19 中通过 JEP 428 进行孵化,并在 JDK 20 中通过 JEP 437 继续。它在 JDK 21 中通过 JEP 453 进行预览,其中 fork 方法改为返回一个 Subtask 而不是一个 Future。它在 JDK 22 中通过 JEP 462、在 JDK 23 中通过 JEP 480 以及在 JDK 24 中通过 JEP 499 再次进行预览。
我们提议在 JDK 25 中对该 API 进行再次预览,并进行一些 API 更改。特别是,现在通过静态工厂方法而不是公共构造函数来打开 StructuredTaskScope。零参数的 open 工厂方法通过创建一个 StructuredTaskScope 来涵盖常见情况,该 StructuredTaskScope 会等待所有子任务成功或任何一个子任务失败。其他策略和结果可以通过向功能更丰富的 open 工厂方法之一提供适当的 Joiner 来实现。
目标
- 推广一种并发编程风格,能够消除因取消和关闭操作引发的常见风险,如线程泄漏和取消延迟。
- 提高并发代码的可观察性。
非目标
- 目标不是替换
java.util.concurrent包中的任何并发构造,如ExecutorService或Future。 - 目标不是为所有 Java 程序创建最终的结构化并发 API。其他结构化并发构造可由第三方库定义,或在未来的 JDK 版本中定义。
- 目标不是定义一种在线程之间共享数据流(即 _ 通道_)的方法。我们可能会在未来提出这样的建议。
- 目标不是用新的线程取消机制替换现有的线程中断机制。我们可能会在未来提出这样的建议。
动机
我们通过将任务分解为多个子任务来管理程序中的复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务相互之间足够独立,并且有足够的硬件资源,那么通过并发执行子任务,整体任务可以运行得更快,即延迟更低。例如,一个组合多个 I/O 操作结果的任务,如果每个 I/O 操作在自己的线程中并发执行,将会运行得更快。虚拟线程(JEP 444)使得为每个此类 I/O 操作分配一个线程在成本上变得可行,但管理由此可能产生的大量线程仍然是一个挑战。
使用 ExecutorService 的非结构化并发
Java 5 中引入的 java.util.concurrent.ExecutorService API 可以并发执行子任务。
例如,下面是一个 handle() 方法,代表服务器应用程序中的一个任务。它通过向 ExecutorService 提交两个子任务来处理传入的请求。一个子任务执行 findUser() 方法,另一个执行 fetchOrder() 方法。ExecutorService 会立即为每个子任务返回一个 Future,并根据其调度策略并发执行这些子任务。handle() 方法通过对这些 Future 的 get() 方法进行阻塞调用来等待子任务的结果,因此该任务被称为“加入”了它的子任务。
Response handle() throws ExecutionException, InterruptedException {
Future<String> user = executor.submit(() -> findUser());
Future<Integer> order = executor.submit(() -> fetchOrder());
String theUser = user.get(); // 加入 findUser
int theOrder = order.get(); // 加入 fetchOrder
return new Response(theUser, theOrder);
}
由于子任务是并发执行的,每个子任务都可能独立地成功或失败(在此上下文中,失败意味着抛出异常)。通常,像 handle() 这样的任务,如果其任何一个子任务失败,它本身也应该失败。当出现失败时,理解线程的生命周期可能会出奇地复杂:
- 如果
findUser()抛出异常,那么handle()在调用user.get()时会抛出异常,但fetchOrder()会在其自身线程中继续运行。这就是“线程泄漏”,往好了说会浪费资源;往坏了说,fetchOrder()线程会干扰其他任务。 - 如果执行
handle()的线程被中断,中断不会传播到子任务。findUser()和fetchOrder()线程都会泄漏,即使在handle()失败后仍继续运行。 - 如果
findUser()执行时间很长,但同时fetchOrder()失败了,那么handle()会不必要地通过阻塞在user.get()上等待findUser(),而不是取消它。只有在findUser()完成且user.get()返回后,order.get()才会抛出异常,导致handle()失败。
在每种情况下,问题在于我们的程序在逻辑上是按照任务 - 子任务关系构建的,但这些关系仅存在于我们的脑海中。
这不仅增加了出错的可能性,还使得诊断和排查此类错误变得更加困难。例如,像线程转储这样的可观察性工具,会在不相关线程的调用堆栈中显示 handle()、findUser() 和 fetchOrder(),而不会提示任务 - 子任务关系。
我们可能会尝试通过在出错时显式取消其他子任务来改进,例如用 try - finally 包裹任务,并在失败任务的 catch 块中调用其他任务的 Future 的 cancel(boolean) 方法。我们还需要在 try - with - resources 语句 中使用 ExecutorService,如 JEP 444 中的示例所示,因为 Future 没有提供等待已取消任务的方法。但是,所有这些跟踪任务间关系的操作很难做对,而且往往会使代码的逻辑意图更难辨别。
这种手动协调生命周期的需求源于 ExecutorService 和 Future 允许不受限制的并发模式。所涉及的任何线程都没有约束或顺序。一个线程可以创建 ExecutorService,第二个线程可以向其提交任务,而执行任务的线程与第一个或第二个线程都没有关系。此外,一个线程提交任务后,一个完全不同的线程可以等待执行结果。任何持有 Future 引用的代码都可以加入它,即通过调用 get() 等待其结果 —— 即使该代码在与获取 Future 的线程不同的线程中。实际上,一个任务启动的子任务不一定返回到提交它的任务。它可以返回到多个任务中的任何一个 —— 或者可能不返回给任何任务。
由于 ExecutorService 和 Future 允许这种非结构化的使用方式,它们不会强制甚至不会跟踪任务和子任务之间的关系,尽管这种关系很常见且有用。因此,即使子任务在同一任务中提交和加入,一个子任务的失败也不能自动导致另一个子任务的取消:在上面的 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder() 的 Future 与 findUser() 的 Future 无关,并且两者都与最终通过其 get() 方法加入它们的线程无关。
与其手动管理这种取消操作,我们应该致力于可靠地实现自动化。
任务结构应反映代码结构
与 ExecutorService 下随意组合的线程不同,单线程代码的执行始终强制遵循任务和子任务的层次结构。方法的主体块 {...} 对应一个任务,块内调用的方法对应子任务。被调用的方法必须返回调用它的方法,或者向调用它的方法抛出异常。被调用的方法的生命周期不能长于调用它的方法,也不能返回给或向其他不同的方法抛出异常。因此,所有子任务在任务之前完成,每个子任务都是其父任务的子项,并且每个子任务相对于其他子任务以及任务的生命周期由代码的语法块结构决定。
例如,在这个单线程版本的 handle() 中,任务 - 子任务关系从语法结构中一目了然:
Response handle() throws IOException {
String theUser = findUser();
int theOrder = fetchOrder();
return new Response(theUser, theOrder);
}
在 findUser() 子任务完成之前,无论是成功还是失败,我们都不会启动 fetchOrder() 子任务。如果 findUser() 失败,我们根本不会启动 fetchOrder(),并且 handle() 任务会隐式失败。子任务只能返回给其父任务这一事实很重要:这意味着父任务可以隐式地将一个子任务的失败视为取消其他未完成子任务并随后自身失败的触发条件。
在单线程代码中,任务 - 子任务层次结构在运行时体现在调用堆栈中。因此,我们免费获得了相应的父子关系,这些关系控制着错误传播。观察单个线程时,层次关系很明显:findUser() 以及之后的 fetchOrder() 都从属于 handle()。这使得回答“handle() 现在在处理什么?”这个问题变得很容易。
如果任务与其子任务之间的父子关系从代码的语法结构中清晰可见,并且在运行时也能体现出来,就像单线程代码那样,那么并发编程将会更容易、更可靠且更具可观察性。语法结构将划定子任务的生命周期,并实现线程间层次结构的运行时表示,类似于线程内的调用堆栈。这种表示将实现错误传播和取消,以及对并发程序进行有意义的观察。
(Java 平台已经有一个用于给并发任务强加结构的 API,即 java.util.concurrent.ForkJoinPool,它是并行流背后的执行引擎。然而,该 API 是为计算密集型任务而设计的,而非涉及 I/O 的任务。)
结构化并发
“结构化并发”是一种并发编程方法,它保留了任务和子任务之间的自然关系,从而产生更易读、可维护且可靠的并发代码。“结构化并发”这一术语由 马丁·苏斯特里克(Martin Sústrik) 创造,并由 纳撒尼尔·J·史密斯(Nathaniel J. Smith) 推广。来自其他语言的理念,如 Erlang 的分层监督者,为结构化并发中的错误处理设计提供了思路。
结构化并发源自一个简单的原则:如果一个任务分裂为并发子任务,那么它们都返回到同一个地方,即任务的代码块。
在结构化并发中,子任务代表一个任务工作。任务等待子任务的结果并监测它们是否失败。与单线程代码的结构化编程技术一样,多线程结构化并发的强大之处来自两个理念:通过代码块的执行流具有明确定义的入口和出口点,并且操作的生命周期以一种反映其在代码中语法嵌套的方式嵌套。
因为代码块的入口和出口点定义明确,并发子任务的生命周期被限制在其父任务的语法块内。因为兄弟子任务的生命周期嵌套在其父任务的生命周期内,所以可以将它们作为一个单元进行推理和管理。由于父任务的生命周期又嵌套在其父任务的生命周期内,运行时可以将任务层次结构具体化为一棵树,这是单线程调用堆栈的并发对应物。这允许将诸如截止日期之类的策略应用于任务的整个子树,并允许可观察性工具将子任务显示为从属于其父任务。
结构化并发与 虚拟线程 非常匹配,虚拟线程是由 JDK 实现的轻量级线程。虚拟线程共享操作系统线程,允许存在大量的虚拟线程。除了数量丰富之外,虚拟线程足够轻量,可以表示任何并发行为单元,甚至是涉及 I/O 的行为。这意味着服务器应用程序可以使用结构化并发一次性处理数千或数百万个传入请求:它可以为处理每个请求的任务分配一个新的虚拟线程,并且当一个任务通过提交子任务进行并发执行而展开时,它可以为每个子任务分配一个新的虚拟线程。在幕后,通过安排每个虚拟线程携带对其唯一父线程的引用,将任务 - 子任务关系具体化为一棵树,类似于调用堆栈中的一个帧引用其唯一的调用者。
总之,虚拟线程提供了大量的线程。结构化并发可以正确且稳健地协调它们,并使可观察性工具能够按照开发人员的理解来显示线程。在 Java 平台中拥有一个结构化并发 API 将使构建可维护、可靠且可观察的服务器应用程序变得更容易。
描述
结构化并发 API 的主要类是 StructuredTaskScope,位于 java.util.concurrent 包中。这个类允许我们将一个任务构建为一组并发子任务,并将它们作为一个单元进行协调。子任务通过逐个“派生(fork)”并作为一个单元“合并(join)”,在各自的线程中执行。StructuredTaskScope 将子任务的生命周期限制在一个清晰的 词法作用域 内,在这个作用域中,一个任务与其子任务的所有交互——派生、合并、处理错误以及组合结果——都会发生。
以下是前面 handle() 示例的修订版本,使用了 StructuredTaskScope:
Response handle() throws InterruptedException {
try (var scope = StructuredTaskScope.open()) {
Subtask<String> user = scope.fork(() -> findUser());
Subtask<Integer> order = scope.fork(() -> fetchOrder());
scope.join(); // 合并子任务,传播异常
// 两个子任务都已成功,因此组合它们的结果
return new Response(user.get(), order.get());
}
}
与原始示例相比,理解这里涉及的线程的生命周期很容易:在所有情况下,它们的生命周期都被限制在一个词法作用域内,即 try - with - resources 语句的主体部分。此外,使用 StructuredTaskScope 确保了一些有价值的属性:
- 短路错误处理 — 如果
findUser()或fetchOrder()子任务中的一个通过抛出异常而失败,那么另一个如果尚未完成则会被取消,即 中断。 - 取消传播 — 如果运行
handle()的线程在调用join()之前或期间被中断,那么当该线程退出作用域时,两个子任务都会自动取消。 - 清晰性 — 上述代码具有清晰的结构:设置子任务,等待它们完成或取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(并且子任务已经完成,所以无需进一步清理)。
- 可观察性 — 如下文 可观察性 所述,线程转储清晰地显示任务层次结构,运行
findUser()和fetchOrder()的线程显示为作用域的子线程。
StructuredTaskScope 是一个 预览 API,默认情况下处于禁用状态
要使用 StructuredTaskScope API,必须按如下方式启用预览 API:
- 使用
javac --release 25 --enable-preview Main.java编译程序,并使用java --enable-preview Main运行它;或者, - 使用 源代码启动器 时,使用
java --enable-preview Main.java运行程序;或者, - 使用 jshell 时,使用
jshell --enable-preview启动它。
使用 StructuredTaskScope
StructuredTaskScope<T, R> API 中,T 是在作用域中派生的任务的结果类型,R 是 join 方法的结果类型,可总结如下:
public sealed interface StructuredTaskScope<T, R> extends AutoCloseable {
public static <T> StructuredTaskScope<T, Void> open();
public static <T, R> StructuredTaskScope<T, R> open(Joiner<? super T,
? extends R> joiner);
public <U extends T> Subtask<U> fork(Callable<? extends U> task);
public Subtask<? extends T> fork(Runnable task);
public R join() throws InterruptedException;
public void close();
}
使用 StructuredTaskScope 的代码的一般工作流程是:
- 通过调用其中一个静态
open方法打开一个新的作用域。打开作用域的线程是该作用域的“所有者”。 - 使用
fork方法在作用域中派生子任务。 - 使用
join方法作为一个单元合并作用域的所有子任务。这可能会抛出异常。 - 处理结果。
- 关闭作用域,通常通过
try - with - resources隐式关闭。如果作用域尚未取消,这将取消该作用域,从而取消其所有剩余的子任务并等待它们终止。
在 handle() 示例中,零参数的 open() 工厂方法创建并打开一个 StructuredTaskScope,它实现了默认的“完成策略”,即如果任何子任务失败则整个任务失败。如我们将在下文 合并器 中看到的,其他策略可以通过将合适的 Joiner 传递给单参数的 open 方法来实现。
每次调用 fork 方法都会启动一个线程来执行一个子任务,默认情况下是一个虚拟线程。一个子任务可以创建自己的 StructuredTaskScope 来派生自己的子任务,从而创建一个作用域层次结构。该层次结构反映在代码的块结构中,块结构限制了子任务的生命周期:一旦作用域关闭,所有子任务的线程都保证已终止,并且当块退出时不会有线程遗留。
join 方法必须由作用域的所有者线程在作用域内调用。如果作用域的块在合并之前退出,那么作用域将被取消,所有者将在 close 方法中等待所有子任务终止,然后抛出异常。
合并之后,作用域的所有者可以使用从 fork 方法返回的 Subtask 对象来处理子任务的结果。如果在合并之前调用,Subtask::get 方法会抛出异常。
取消操作
作用域的所有者线程可能在合并之前或合并过程中被中断。例如,所有者线程自身可能是一个已被取消的封闭作用域的子任务。如果发生这种情况,join() 将抛出异常,因为继续执行已无意义。try - with - resources 语句随后将取消该作用域,这会取消所有子任务并等待它们终止。这就实现了将任务的取消自动传播到其子任务。
为了支持取消操作,子任务的编码必须确保在被 中断 时尽快结束。如果子任务不响应中断,例如因为它们阻塞在不可中断的方法上,可能会无限期延迟作用域的关闭。close 方法始终会等待执行子任务的线程结束,即使作用域已被取消。在被中断的线程结束之前,执行无法继续到 close 方法之后。
作用域值
在作用域中派生的子任务会继承 ScopedValue 绑定(JEP 487)。如果作用域的所有者从绑定的 ScopedValue 中读取一个值,那么每个子任务将读取相同的值。
强制结构化使用
在运行时,StructuredTaskScope 对并发操作强制实施结构和顺序。例如,尝试从非作用域所有者的线程调用 fork 方法将因异常而失败。在 try - with - resources 块之外使用作用域,并且在不调用 close() 的情况下返回,或者没有正确嵌套 close() 调用,可能会导致作用域的方法抛出 StructureViolationException。
StructuredTaskScope 没有实现 ExecutorService 或 Executor 接口,因为这些接口的实例通常以非结构化方式使用(见下文 替代方案)。然而,将使用 ExecutorService 但能从结构化中受益的代码迁移为使用 StructuredTaskScope 并不复杂。
合并器
在 handle() 示例中,如果任何子任务失败,join() 方法将抛出异常并且作用域将被取消。如果所有子任务都成功,join() 方法将正常完成并返回 null。这是默认的完成策略。
可以通过使用合适的 StructuredTaskScoped.Joiner 创建 StructuredTaskScope 来选择其他策略。Joiner 对象处理子任务的完成情况,并为 join() 方法生成结果。根据合并器的不同,join() 方法可能返回一个结果、一个元素流或其他对象。
Joiner 接口声明了一些工厂方法,用于创建适用于常见情况的合并器。例如,工厂方法 anySuccessfulResultOrThrow() 返回一个新的合并器,它会产生任何成功完成的子任务的结果:
<T> T race(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>anySuccessfulResultOrThrow())) {
tasks.forEach(scope::fork);
return scope.join();
}
}
一旦有一个子任务成功,作用域就会被取消,未完成的子任务也会被取消,并且 join() 将返回成功子任务的结果。如果所有子任务都失败,join() 将抛出一个 FailedException,其中一个子任务的异常作为 原因。这种模式在例如服务器应用程序中可能很有用,这些应用程序需要从一组冗余服务中的任何一个获取结果。
工厂方法 allSuccessfulOrThrow() 返回一个新的合并器,当所有子任务都成功完成时,它会产生一个子任务流:
<T> List<T> runConcurrently(Collection<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow())) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
}
如果有一个或多个子任务失败,join() 将抛出一个 FailedException,其中一个失败子任务的异常作为原因。Joiner 实现的完成策略与零参数 open() 方法实现的默认策略相同。它们的结果不同之处在于,join() 方法返回一个已完成子任务的流而不是 null,这使得这个合并器适用于所有子任务返回相同类型结果且忽略 fork 方法返回的 Subtask 对象的情况。在这个示例中,流中的 Subtask 元素被映射为结果并累积到一个列表中。
Joiner 接口还声明了另外三个工厂方法:
awaitAll(),它返回一个新的合并器,该合并器只是等待所有子任务完成,无论是否成功;awaitAllSuccessfulOrThrow(),它返回一个新的合并器,该合并器等待所有子任务成功完成;以及allUntil(Predicate<Subtask<? extends T>> isDone),它返回一个新的合并器,当所有子任务都成功完成或者对已完成子任务的谓词返回true时,取消封闭作用域并产生所有子任务的流。
使用任何类型的 Joiner 时,为每个 StructuredTaskScope 创建一个新的 Joiner 至关重要。Joiner 对象绝不应在不同的任务作用域中使用,也不应在作用域关闭后重复使用。
自定义合并器
可以直接实现 Joiner 接口以支持自定义完成策略。它有两个类型参数:T 表示在作用域中执行的子任务的结果类型,R 表示 join() 方法的结果类型。该接口可总结如下:
public interface Joiner<T, R> {
public default boolean onFork(Subtask<? extends T> subtask);
public default boolean onComplete(Subtask<? extends T> subtask);
public R result() throws Throwable;
}
onFork 方法在派生子任务时被调用,而 onComplete 方法在子任务完成时被调用。这两个方法都返回一个 boolean 值,用于指示作用域是否应该被取消。当所有子任务都已完成或作用域被取消时,会调用 result 方法来为 join 方法生成结果,否则抛出异常。如果 result 方法抛出异常,join 方法将抛出一个 FailedException,并将该异常作为原因。
以下是一个 Joiner 类,它收集成功完成的子任务的结果,忽略失败的子任务。onComplete 方法可能会被多个线程并发调用,因此必须是线程安全的。result 方法返回任务结果的流。
class CollectingJoiner<T> implements Joiner<T, Stream<T>> {
private final Queue<T> results = new ConcurrentLinkedQueue<>();
public boolean onComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
}
return false;
}
public Stream<T> result() {
return results.stream();
}
}
可以像这样使用这个自定义策略:
<T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
try (var scope = StructuredTaskScope.open(new CollectingJoiner<T>())) {
tasks.forEach(scope::fork);
return scope.join().toList();
}
}
异常处理
异常的处理方式将取决于具体用法。当作用域被认为失败时,join() 方法会抛出一个 FailedException。在 handle() 示例中,如果一个子任务失败,将抛出一个 FailedException,并将失败子任务的异常作为原因。在某些情况下,在 try - with - resources 语句中添加一个 catch 块以便在作用域关闭后处理异常可能会很有用:
try (var scope = StructuredTaskScope.open()) {
...
} catch (StructuredTaskScope.FailedException e) {
Throwable cause = e.getCause();
switch (cause) {
case IOException ioe ->..
default ->..
}
}
异常处理代码可以使用 instanceof 操作符结合模式匹配(JEP 394)来处理特定的原因。
子任务中的特定异常可能会触发返回一个默认值。在这种情况下,在子任务中捕获异常并使其以默认值作为结果完成,而不是让作用域的所有者处理异常,可能更为合适。
配置
前文对 StructuredTaskScope API 的总结 展示了两个静态 open 方法。第三个这样的方法 接受一个 Joiner 以及一个函数,该函数可以生成一个 配置对象,以便为监控和管理目的设置作用域的名称、设置作用域的超时时间,以及设置作用域的 fork 方法用于创建线程的 线程工厂。
以下是 runConcurrently 方法的修订版本,它设置了一个线程工厂和一个超时时间:
<T> List<T> runConcurrently(Collection<Callable<T>> tasks,
ThreadFactory factory,
Duration timeout)
throws InterruptedException
{
try (var scope = StructuredTaskScope.open(Joiner.<T>allSuccessfulOrThrow(),
cf -> cf.withThreadFactory(factory)
.withTimeout(timeout))) {
tasks.forEach(scope::fork);
return scope.join().map(Subtask::get).toList();
}
}
此作用域中的 fork 方法将调用给定的线程工厂来创建执行每个子任务的线程。这在例如设置线程名称或其他属性时可能会很有用。
超时时间指定为一个 java.time.Duration。如果在 join() 方法等待之前或等待期间超时时间到期,作用域将被取消,这会取消所有未完成的子任务,并且 join() 会抛出一个 TimeoutException。
可观察性
我们扩展了 [为虚拟线程添加的 JSON 线程转储格式](https://openjdk.org/jeps/444#Observing - virtual - threads),以展示 StructuredTaskScope 如何将线程分组为层次结构:
$ jcmd <pid> Thread.dump_to_file -format=json <file>
每个作用域的 JSON 对象包含在该作用域中派生的线程数组及其堆栈跟踪信息。作用域的所有者线程通常会阻塞在 join 方法中,等待子任务完成;线程转储通过展示结构化并发所施加的树状层次结构,使得很容易查看子任务线程正在做什么。作用域的 JSON 对象还包含对其父作用域的引用,以便可以从转储中重构程序的结构。
com.sun.management.HotSpotDiagnosticsMXBean API 也可用于生成此类线程转储,可直接使用,或通过平台的 MBeanServer 以及本地或远程 JMX 工具间接使用。
替代方案
增强 ExecutorService 接口
我们对这个接口的一个实现进行了原型设计,该实现始终强制实施结构并限制哪些线程可以提交任务。然而,我们发现这存在问题,因为在 JDK 以及生态系统中,ExecutorService 及其父接口 Executor 的大多数用法都不是结构化的。将同一个 API 用于一个限制得多的概念必然会导致混淆。例如,将一个结构化的 ExecutorService 实例传递给现有的接受该类型的方法,在大多数情况下几乎肯定会抛出异常。
让 fork 方法返回一个 Future
当 StructuredTaskScope API 处于孵化阶段时,fork 方法返回一个 Future。这通过使这些方法类似于现有的 ExecutorService::submit 方法,给人一种熟悉感。然而,StructuredTaskScope 旨在以结构化方式使用,而 ExecutorService 并非如此,这带来的混淆多于清晰。
- 对
Future的常见用法涉及调用其get()方法,该方法会阻塞直到有结果可用。但在StructuredTaskScope的上下文中,以这种方式使用Future不仅不被鼓励,而且适得其反。结构化的Future对象应该仅在join()方法返回后查询,此时已知它们已完成,并且应该使用的方法不是常见的get()方法,而是最近引入的resultNow()方法,该方法不会阻塞。 - 一些开发人员想知道为什么
fork方法不返回功能更强的CompletableFuture对象。由于这些方法返回的Future应该仅在已知已完成后使用,CompletableFuture不会带来任何好处,因为其高级功能仅对未完成的Future有帮助。此外,CompletableFuture是为异步编程范式设计的,而StructuredTaskScope鼓励阻塞范式。
简而言之,Future 和 CompletableFuture 旨在提供在结构化并发中适得其反的自由度。
- 结构化并发是关于将在不同线程中运行的相关任务视为单个工作单元,而
Future在将多个任务视为单个任务时大多有用。一个作用域应该只阻塞一次以等待其子任务的结果,然后应该集中处理异常。因此,在绝大多数情况下,应该在fork方法返回的Future上调用的唯一方法是resultNow。这与Future的常规用法有明显变化,并且Future接口分散了人们对其在这种上下文中正确用法的注意力。
在当前的 API 中,Subtask::get() 的行为与 API 孵化时 Future::resultNow() 的行为完全相同。