通过结构化并发 API 简化并发编程,将在不同线程中运行的相关任务组视为单个工作单元。这简化了错误处理和取消操作,提高了可靠性并增强了可观察性。结构化并发之前分别于 2022 年 3 月和 9 月在 JDK 20 和 JDK 19 中孵化,它作为 java.util.concurrent 包中的一个预览 API。这次唯一的重大变化是,StructuredTaskScope::Fork(…) 方法返回的是 [Subtask] 而不是 Future。结构化并发的目标包括促进一种并发编程风格,这种风格可以消除因取消和关闭而产生的常见风险(如线程泄漏和取消延迟),同时提高并发代码的可观察性。

结构化并发是一种旨在通过提供结构化且易于遵循的方法来简化并发编程的编程范例。使用结构化并发,开发人员可以创建更容易理解和调试、不容易出现竞态条件和其他与并发相关的错误的并发代码。在结构化并发中,所有并发代码都被结构化为称为任务的明确定义的工作单元。任务以结构化的方式创建、执行和完成,任务的执行始终保证在其父任务完成之前完成。

结构化并发可以使多线程编程更加简单和可靠。在传统的多线程编程中,线程的启动、执行和终止都是由开发人员手动管理的,因此容易出现线程泄漏、死锁和不正确的异常处理等问题。

使用结构化并发,开发人员可以更自然地组织并发任务,使任务之间的依赖关系更清晰,代码逻辑更简洁。结构化并发还提供了一些异常处理机制,以更好地管理并发任务中的异常,避免由异常引起的程序崩溃或数据不一致。

此外,结构化并发还可以通过限制并发任务的数量和优先级来防止资源竞争和饥饿现象。这些特性使得开发人员能够更容易地实现高效且可靠的并发程序,而不必过多关注底层线程管理。

1.结构化并发

想象以下情景。假设您有三个任务需要同时执行。只要任何一个任务完成并返回结果,就可以直接使用该结果,可以停止其他两个任务。例如,一个天气服务通过三个渠道获取天气情况,只要一个渠道返回即可。

在这种情况下,在Java 8下应该做什么,当然也是可以的。

List<Future<?>> futures = executor.invokeAll(tasks); String result = executor.invokeAny(tasks); 

使用ExecutorService的invokeAll和invokeAny方法实现,但会有一些额外的工作。在获取第一个结果后,您需要手动关闭另一个线程。

在JDK 21中,可以使用结构化编程来实现。

2.ShutdownOnSuccess

ShutdownOnSuccess捕获第一个结果并关闭任务范围以中断未完成的线程并唤醒调用线程。

一种情况是任何子任务的结果都可以直接使用,而无需等待其他未完成任务的结果。

它定义了获取第一个结果或在所有子任务失败时抛出异常的方法。

public static void main(String[] args) throws IOException { try (var scope = new StructuredTaskScope.ShutdownOnSuccess()) { Future<String> res1 = scope.fork(() -> runTask(1)); Future<String> res2 = scope.fork(() -> runTask(2)); Future<String> res3 = scope.fork(() -> runTask(3)); scope.join(); System.out.println("scope:" + scope.result()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } public static String runTask(int i) throws InterruptedException { Thread.sleep(1000); long l = new Random().nextLong(); String s = String.valueOf(l); System.out.println(i + "task:" + s); return s; } 

3.ShutdownOnFailure

执行多个任务,只要有一个失败(发生异常或引发其他活动异常),就停止其他未完成的任务,并使用scope.throwIfFailed来捕获并抛出异常。

如果所有任务都正常,可以使用Feture.get()或*Feture.resultNow()来获取结果。

public static void main(String[] args) throws IOException { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<String> res1 = scope.fork(() -> runTaskWithException(1)); Future<String> res2 = scope.fork(() -> runTaskWithException(2)); Future<String> res3 = scope.fork(() -> runTaskWithException(3)); scope.join(); scope.throwIfFailed(Exception::new); String s = res1.resultNow(); System.out.println(s); String result = Stream.of(res1, res2, res3) .map(Future::resultNow) .collect(Collectors.joining()); System.out.println("result:" + result); } catch (Exception e) { e.printStackTrace(); } } public static String runTaskWithException(int i) throws InterruptedException { Thread.sleep(1000); long l = new Random().nextLong(3); if (l == 0) { throw new InterruptedException(); } String s = String.valueOf(l); System.out.println(i + "task:" + s); return s; } 

4.结构化并发带来的好处

以以下结构化并发示例为基础,我们看看结构化并发如何解决非结构化并发中可能存在的一些问题:

public class Test { public static void main(String[] args) { try (var scope = new StructuredTaskScope.ShutdownOnFailure()) { Future<String> userFuture = scope.fork(() -> getUser()); Future<Integer> orderFuture = scope.fork(() -> getOrder()); scope.join() // Join both subtasks .throwIfFailed(); // ... and propagate errors System.out.println("User: " + userFuture.get()); System.out.println("Order: " + orderFuture.get()); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException(e); } } private static int getOrder() throws Exception { // throw new Exception("test"); return 1; } private static String getUser() { return "user"; } } 

4.1 短路处理

如果一个getOrder()或getUser()一个子任务失败,则另一个尚未完成的任务将被取消。(这是由实施的关闭策略管理的ShutdownOnFailure;其他策略也是可能的,同时支持自定义策略)。避免了线程资源浪费以及可能的无意义阻塞。

4.2 取消传播

如果线程在调用期间被中断join(),则当线程退出作用域时,两个子任务都会自动取消。避免了线程资源浪费。

4.3 清晰性

上面的代码有一个清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(没有什么需要清理的)。

4.4 可观察性

线程转储 – 线程堆栈信息可以清楚的显示任务层次结构:

Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.Exception: test at Test.main(Test.java:21) Caused by: java.util.concurrent.ExecutionException: java.lang.Exception: test at jdk.incubator.concurrent/jdk.incubator.concurrent.StructuredTaskScope$ShutdownOnFailure.throwIfFailed(StructuredTaskScope.java:1188) at Test.main(Test.java:17) Caused by: java.lang.Exception: test at Test.getOrder(Test.java:26) at Test.lambda$main$1(Test.java:15) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) at java.base/java.lang.VirtualThread.run(VirtualThread.java:305) at java.base/java.lang.VirtualThread$VThreadContinuation.lambda$new$0(VirtualThread.java:177) at java.base/jdk.internal.vm.Continuation.enter0(Continuation.java:327) at java.base/jdk.internal.vm.Continuation.enter(Continuation.java:320) 

5.目前结构化并发的目标

  • 推广一种并发编程风格,可以消除因取消和关闭而产生的常见风险,例如线程泄漏和取消延迟。
  • 提高并发代码的可观察性。

以上就是Java21新特性 – 结构化并发的全部内容。