2026/03/23

Structure Concurrency

是一種讓多執行緒邏輯變得更容易理解與管理的程式設計模型,目標是讓「並行任務的生命週期」像區塊結構(block structure)一樣有明確的範圍。

  • 子任務(threads, tasks)都必須在離開某個區塊前結束

  • 執行緒之間的層級關係(parent/child)是語法上明確可見的。

  • 不會有「孤兒執行緒」(dangling thread)偷偷跑在背景。

傳統的 thread

  • 任務彼此獨立、缺乏邏輯關聯。

  • 錯誤傳遞困難(子執行緒例外不會自動向上傳遞)。

  • 難以在結束前確保所有子任務完成。

void process() {
    Thread t = new Thread(() -> downloadFile());
    t.start();
    // ... do other work ...
    // 忘記 join() 或沒有捕捉例外,就可能造成 thread 泄漏
}

Structure Concurrency (JEP 453 / Java 21)

  • ShutdownOnFailure: 若任一子任務失敗,其他會自動中止。

  • ShutdownOnSuccess: 若有一個成功,其餘終止(常用於競賽式查詢)。

  • 範圍清晰、錯誤可控、資源可預期。

import java.util.concurrent.*;

void process() throws Exception {
    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
        Future<String> f1 = scope.fork(() -> downloadFile());
        Future<String> f2 = scope.fork(() -> fetchMetadata());
        scope.join();              // 等待所有子任務
        scope.throwIfFailed();     // 傳遞例外
        System.out.println(f1.resultNow() + f2.resultNow());
    } // scope 區塊結束時,自動關閉所有未完成子任務
}
  • Virtual Threads 解決「效能 & 可擴充性」。

  • Structured Concurrency 解決「邏輯一致性 & 錯誤處理」。

Example

import java.util.concurrent.*;
import java.lang.management.*;

public class StructuredConcurrencyVirtualThreadsExample {

    public static void main(String[] args) throws Exception {
        System.out.println("Java: " + System.getProperty("java.version"));
        System.out.println();

        System.out.println("=== Example 1: Structured Concurrency (ShutdownOnFailure) ===");
        example();

        System.out.println();
        System.out.println("=== Example 2: Virtual vs Native Thread Performance Test ===");
        compareVirtualAndNativeThreads();
    }

    static void example() {
        // 使用 StructuredTaskScope.open() 創建作用域
        try (var scope = StructuredTaskScope.open()) {
            // 創建三個子任務
            var f1 = scope.fork(() -> simulatedIo("Service-A", 1200, false));
            var f2 = scope.fork(() -> simulatedIo("Service-B", 2500, true));
            var f3 = scope.fork(() -> simulatedIo("Service-C", 3000, false));

            // 等待所有子任務完成
            scope.join();

            // 處理結果
            System.out.println("Results: " +
                (f1.state() == StructuredTaskScope.Subtask.State.SUCCESS ? f1.get() : "Failed") + ", " +
                (f2.state() == StructuredTaskScope.Subtask.State.SUCCESS ? f2.get() : "Failed") + ", " +
                (f3.state() == StructuredTaskScope.Subtask.State.SUCCESS ? f3.get() : "Failed"));

        } catch (Exception e) {
            System.out.println("Scope finished with failure: " + e);
        }
    }

    /**
     * Compare Virtual Threads vs Native Threads with ASCII table output.
     */
    static void compareVirtualAndNativeThreads() throws Exception {
        int taskCount = 10_000;
        System.out.printf("Launching %,d simulated I/O tasks...\n", taskCount);

        long nativeTime = measureThreadTypePerformance(taskCount, false);
        long virtualTime = measureThreadTypePerformance(taskCount, true);

        long usedMem = getUsedMemoryMB();

        // ASCII Table output
        System.out.println();
        System.out.println("+--------------------+----------------+----------------+");
        System.out.println("| Thread Type        | Time (ms)      | Observations   |");
        System.out.println("+--------------------+----------------+----------------+");
        System.out.printf ("| %-18s | %-14d | %-14s |%n", "Native Threads", nativeTime, "limited pool");
        System.out.printf ("| %-18s | %-14d | %-14s |%n", "Virtual Threads", virtualTime, "scales easily");
        System.out.println("+--------------------+----------------+----------------+");
        System.out.printf ("| %-18s | %-14d | %-14s |%n", "Heap Used (MB)", usedMem, "after test");
        System.out.println("+--------------------+----------------+----------------+");
    }

    static long measureThreadTypePerformance(int count, boolean virtual) throws Exception {
        long start = System.currentTimeMillis();

        ExecutorService executor = virtual ? Executors.newVirtualThreadPerTaskExecutor()
                                           : Executors.newFixedThreadPool(200);

        try (executor) {
            for (int i = 0; i < count; i++) {
                int id = i;
                executor.submit(() -> {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException ignored) {}
                    return null;
                });
            }
        }

        long end = System.currentTimeMillis();
        return end - start;
    }

    static String simulatedIo(String name, long millis, boolean fail) throws InterruptedException {
        System.out.printf("[%s] running on %s, sleep %dms%n", name, Thread.currentThread(), millis);
        Thread.sleep(millis);
        if (fail) throw new RuntimeException(name + " failed!");
        return name + "-done";
    }

    static long getUsedMemoryMB() {
        MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
        long used = mbean.getHeapMemoryUsage().getUsed();
        return used / (1024 * 1024);
    }
}

編譯與執行

javac --enable-preview --release 25 StructuredConcurrencyVirtualThreadsExample.java
java --enable-preview StructuredConcurrencyVirtualThreadsExample

執行結果

Java: 25

=== Example 1: Structured Concurrency (ShutdownOnFailure) ===
[Service-A] running on VirtualThread[#26]/runnable@ForkJoinPool-1-worker-1, sleep 1200ms
[Service-C] running on VirtualThread[#30]/runnable@ForkJoinPool-1-worker-2, sleep 3000ms
[Service-B] running on VirtualThread[#28]/runnable@ForkJoinPool-1-worker-4, sleep 2500ms
Scope finished with failure: java.util.concurrent.StructuredTaskScope$FailedException: java.lang.RuntimeException: Service-B failed!

=== Example 2: Virtual vs Native Thread Performance Test ===
Launching 10,000 simulated I/O tasks...

+--------------------+----------------+----------------+
| Thread Type        | Time (ms)      | Observations   |
+--------------------+----------------+----------------+
| Native Threads     | 10232          | limited pool   |
| Virtual Threads    | 240            | scales easily  |
+--------------------+----------------+----------------+
| Heap Used (MB)     | 61             | after test     |
+--------------------+----------------+----------------+

同時支援舊的 Native Thread

  • 自動 join 或取消子任務;

  • 錯誤可統一處理;

  • 可以使用任意 Executor 來決定底層執行緒類型。

    static void example2() {
        try (var scope = StructuredTaskScope.open()) {

            // Virtual Thread Executor
            ExecutorService virtualExecutor = Executors.newVirtualThreadPerTaskExecutor();

            // fork 只能傳 Callable,不傳 Executor
            Callable<String> task1 = () -> simulatedIo("VT-A", 500, false);
            Callable<String> task2 = () -> simulatedIo("VT-B", 800, false);

            var f1 = scope.fork(task1);
            var f2 = scope.fork(task2);

            // Native Thread Executor
            ExecutorService nativeExecutor = Executors.newFixedThreadPool(2);
            Callable<String> cpuTask = () -> heavyComputation("NT-A");
            var f3 = scope.fork(cpuTask);

            scope.join(); // 等待所有子任務完成

            System.out.println("Results: " + f1.get() + ", " + f2.get() + ", " + f3.get());

            virtualExecutor.shutdown();
        } catch (Exception e) {
            System.out.println("Scope finished with failure: " + e);
        }

    }

    // 模擬 CPU 任務
    static String heavyComputation(String name) {
        System.out.printf("[%s] CPU task on %s%n", name, Thread.currentThread());
        long sum = 0;
        for (long i = 0; i < 10_000_000L; i++) sum += i;
        return name + "-done";
    }

沒有留言:

張貼留言