2023/06/19

Java CompletableFuture

CompletableFuture 是 Java 8 在 java.util.concurrent 中新增的非同步處理類別

該類別主要有兩種執行非同步工作的方法:runAsync, supplyAsync,差別是有沒有需要回傳結果,在沒有 executor 的狀況下,會使用預設的 thread pool

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

java sample

    private static void test1() {
        try {
            CompletableFuture<String> hello = CompletableFuture.supplyAsync(() -> {
                System.out.println("supplyAsync START");
                System.out.println("say supplyAsync Hello World");
                return "supplyAsync Hello World";
            });
            System.out.println(hello.get());

            CompletableFuture<Void> world = CompletableFuture.runAsync(() -> {
                System.out.println("runAsync START");
                System.out.println("say runAsync Hello World");
                return ;
            });

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        // console 結果
//        supplyAsync START
//        say supplyAsync Hello World
//        supplyAsync Hello World
//        runAsync START
//        say runAsync Hello World

    }

用 Executors 產生自訂的 thread pool

ExecutorService threadPool = Executors.newFixedThreadPool(10);
CompletableFuture<Void> runAsync = CompletableFuture.runAsync(() -> System.out.println("runAsync"), threadPool);
CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> "supplyAsync", threadPool);

java sample


處理 exception

可在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作

相關的 method 是

CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// catch exception
CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)

// use your own thread pool
CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

sample code:

// 在工作處理完成後,取得結果,在處理過程中,遇到 exception 時,可以攔截 exception 並執行特定的工作
    private static void test3() {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            String doSomething = "Hello World";
            return doSomething;
        }, threadPool);

        supplyAsync.whenCompleteAsync((result, ex) -> {
            System.out.println("---supplyAsync---");
            System.out.println("supplyAsync result: " + result);
            System.out.println("exception: " + ex);
        }, threadPool).exceptionally(ex -> {
            System.out.println("exceptionally: " + ex.getMessage());
            return ex.getMessage();
        }).join();

        CompletableFuture<String> supplyAsyncException = CompletableFuture.supplyAsync(() -> {
            throw new CompletionException(new Exception("throw exception"));
        }, threadPool);

        supplyAsyncException.whenCompleteAsync((result, ex) -> {
            System.out.println("---supplyAsyncException---");
            System.out.println("supplyAsyncException result: " + result);
            System.out.println("exception: " + ex);
        }, threadPool).exceptionally(ex -> {
            System.out.println("exceptionally: " + ex.getMessage());
            return ex.getMessage();
        }).join();
        // 執行結果:
//        ---supplyAsync---
//        supplyAsync result: Hello World
//        exception: null
//        ---supplyAsyncException---
//        supplyAsyncException result: null
//        exception: java.util.concurrent.CompletionException: java.lang.Exception: throw exception
//        exceptionally: java.lang.Exception: throw exception
        threadPool.shutdown();
    }

也可以改用 handle 處理 exception,差別是 handle 可以有回傳值

CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)

// use your own thread pool
CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

sample code:

    private static void test4() {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> supplyAsync = CompletableFuture.supplyAsync(() -> {
            throw new CompletionException(new Exception("throw exception"));
        }, threadPool);

        String res = supplyAsync.handle((result, ex) -> (null != ex) ? ex.getMessage() : result).join();
        System.out.println("res: " + res);

        // 結果
//        res: java.lang.Exception: throw exception
        threadPool.shutdown();
    }

資料轉換

可使用 thenApply 進行資料轉換,還有 thenCompose,不需要回傳值的 thenAccept

CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
// use your own thread pool
CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
//-----------
CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)

CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
//-----------
CompletableFuture<Void> thenAccept(Consumer<? super T> action)
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
// use your own thread pool
CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)

"兩個"獨立 CompletableFuture 要整合結果

當有"兩個"獨立 CompletableFuture 要整合結果時,可使用

thenCombine 可將兩個獨立的 CompletableFuture 執行結果整合在一起

thenAcceptBoth 類似 thenCombine,但不需要回傳值

//-----------
CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn)
// use your own thread pool
CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn, Executor executor)

///////
CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,  Runnable action)

// use your own thread pool
CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

java sample

    // 將 supplyAsync1 和 supplyAsync2 結果整合
    public static void test5() {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
        CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);

        String ans = supplyAsync1.thenCombine(supplyAsync2, (result1, result2) -> result1 + ", " + result2).join();
        System.out.println("ans: " + ans);
        threadPool.shutdown();
        // ans: supplyAsync 1, supplyAsync 2

        //-----------
        // 不需要回傳值
        ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync21 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool2);
        CompletableFuture<String> supplyAsync22 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool2);

        supplyAsync21.thenAcceptBothAsync(supplyAsync22, (result1, result2) -> System.out.println(result1 + ", " + result2), threadPool2).join();
        threadPool2.shutdown();

        // supplyAsync 1, supplyAsync 2
    }

多個 CompletableFuture

如果超過兩個以上的 CompletableFuture,可使用 allOf 等待所有的 CompletableFuture 結果,串接一個 thenRun

    public static void test6() {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        CompletableFuture<String> supplyAsync1 = CompletableFuture.supplyAsync(() -> "supplyAsync 1", threadPool);
        CompletableFuture<String> supplyAsync2 = CompletableFuture.supplyAsync(() -> "supplyAsync 2", threadPool);
        CompletableFuture<String> supplyAsync3 = CompletableFuture.supplyAsync(() -> "supplyAsync 3", threadPool);

        CompletableFuture.allOf(supplyAsync1, supplyAsync2, supplyAsync3).thenRun(() -> {
            try {
                StringBuffer ans = new StringBuffer();
                ans.append(supplyAsync1.get()).append(", ")
                        .append(supplyAsync2.get()).append(", ")
                        .append(supplyAsync3.get());
                System.out.println("ans: " + ans.toString());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }).join();

        threadPool.shutdown();
        // ans: supplyAsync 1, supplyAsync 2, supplyAsync 3

    }

只想要其中一個有完成就往下執行,可使用 applyToEither,沒有回傳值的 acceptEither,用在多個 CompletableFuture 的anyOf

CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)

// use your own thread pool
CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

///////

CompletableFuture<Void>  acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)

// use your own thread pool
CompletableFuture<Void>  acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)


///////
CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

References

Guide To CompletableFuture | Baeldung

Java 9 CompletableFuture API Improvements | Baeldung

JDK8 - CompletableFuture 非同步處理簡介

CompletableFuture

【Java 8 新特性】Java CompletableFuture thenApply()_.thenapply_猫巳的博客-CSDN博客

【JDK8】CompletableFuture 非同步處理

關於 Java CompletableFuture 的用法 | HengLin31

沒有留言:

張貼留言