2024/11/25

How to use RxJava

測試如何使用 RxJava

maven pom.xml

加上 rxjava3 的 library

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Observable

Observable 可從

這邊是產生 Observable,然後 observer 用 subscribe 註冊 Observable sequence。該 sequence 會發送 items 到 observer,一次發送一個

    public static void test1() {
        // just 會將 item 轉成 Observable
        Observable<String> observerable = Observable.just("Hello", "World");
        // subscribe 註冊 Observable,列印每一個 item
        observerable.subscribe(System.out::println);
    }

observer 有三個 interface

  • OnNext

    當 observerable 發布一個新的 event (item),observer 收到該 item 然後能做一些操作

  • OnComplete

    當 sequence of events 完成時,代表不會再呼叫 OnNext

  • OnError

    如果在 RxJava framework 或是 OnNext 裡面發生 error

    public static void test2() {
        String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
        Observable<String> observable = Observable.fromArray(letters);

        // Java 的 String 是 immutable class,且是 pass by reference
        // 所以這邊如果要儲存結果,不能直接用 String 物件,改用 array 記錄一直改變的 String reference
        String[] result = {""};
        observable.subscribe(
                i -> result[0] += i,  //OnNext
                Throwable::printStackTrace, //OnError
                () -> result[0] += "_Completed" //OnCompleted
        );
        System.out.println(" result[0]="+ result[0]);

        // 也可以用 StringBuilder,這樣就可以一直改變 string 內容
        StringBuilder sb = new StringBuilder();
        observable.subscribe(
                i -> sb.append(i),  //OnNext
                Throwable::printStackTrace, //OnError
                () -> sb.append("_Completed") //OnCompleted
        );
        System.out.println(" sb="+ sb.toString());
        // 結果 abcdefg_Completed

        // 測試 OnError
        // 當處理到 "a",因為無法轉成 integer 會發生 error
        // error 處理為 列印 _err 文字
        String[] letters2 = {"1", "a", "2"};
        Observable<String> observable2 = Observable.fromArray(letters2);
        StringBuilder sb2 = new StringBuilder();
        observable2.subscribe(
                i -> sb2.append(Integer.parseInt(i)),  //OnNext
//                Throwable::printStackTrace, //OnError
                err -> sb2.append("_err"),
                () -> sb2.append("_Completed") //OnCompleted
        );
        System.out.println(" sb2="+ sb2.toString());
        // 結果 1_err
    }

operator

map, flatMap

map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many

    public static void operator_test_map_flatmap() {
        // map: 對每一個 item 執行某個 operation,會回傳任意物件
        String[] letters = {"a", "b", "c", "d"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .map(String::toUpperCase)
                .subscribe(letter -> sb.append(letter));
        System.out.println("operator_test_1 sb="+ sb.toString());

        // flatMap 會回傳結果的 Observable
        // flatMap 不保證 sequence 順序
        // map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
        Observable<String> observable = Observable.just("Hello", "World");
        observable
            .flatMap(item -> Observable.fromArray(item.split("")))
            .subscribe(System.out::println);
    }

filter

filter 會過濾 item,符合條件的才會通過

    public static void operator_test_filter() {
        // filter 會過濾 item,符合條件的才會通過
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(item -> item % 2 == 0)
                .subscribe(System.out::println);
        // 結果是
        // 2
        // 4
    }

scan

scan 會將每次計算的結果,發送給下一個 步驟

    public static void operator_test_scan() {

        // scan 會將每次計算的結果,發送給下一個 步驟
        String[] letters = {"a", "b", "c"};

        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .scan(new StringBuilder(), StringBuilder::append)
                .subscribe(total -> sb.append(total.toString()+" ") );

        System.out.println("operator_test_scan sb="+ sb.toString());
        // a ab abc

        // 累加 1~5
        StringBuilder sb2 = new StringBuilder();
        Observable.range(1,5)
                .scan( (Integer res1, Integer res2) -> res1+res2  )
                .subscribe(total -> sb2.append(total.toString()+" ") );
        System.out.println("operator_test_scan sb="+ sb2.toString());
        // 1 3 6 10 15
    }

groupBy

將 event 分類

ex: 將數字依照單雙數分類

    public static void operator_test_groupby() {
        String[] nums = {"", ""};
        Observable.range(1,5)
                .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
                .subscribe(group ->
                        group.subscribe((number) -> {
                            if (group.getKey().toString().equals("EVEN")) {
                                nums[0] += number+" ";
                            } else {
                                nums[1] += number+" ";
                            }
                        })
                );
        System.out.println("operator_test_groupby nums="+ Arrays.deepToString(nums));
        // 結果  operator_test_groupby nums=[2 4 , 1 3 5 ]
    }

conditional

defaultIfEmpty

first

takeWhile

    public static void operator_test_conditional() {
        // empty() 可產生一個 空的 Observable
        // defaultIfEmpty() 在 observable 空的時候有作用
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .subscribe(System.out::println);

        // first  發出第一個來源資料,如果沒有則發送預設值。
        String[] letters = {"a", "b", "c"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb.append(res) );
        System.out.println("operator_test_conditional sb="+sb);
        // operator_test_conditional sb=a

        StringBuilder sb2 = new StringBuilder();
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb2.append(res) );
        System.out.println("operator_test_conditional empty sb2="+sb2);
        // operator_test_conditional empty sb2=Observable is empty

        // takeWhile 只在滿足條件時,取用此 item,否則就丟棄該 item
        int[] sum = {0};
        Observable.range(1,10)
                .takeWhile(i -> i < 5)
                .subscribe(s -> sum[0] += s);
        System.out.println("operator_test_conditional takeWhile sum="+sum[0]);
        // operator_test_conditional takeWhile sum=10
    }

combining

    public static void operator_test_combining() {
        // merge: 將多個 Observables 合併成一個
        Observable<String> observable1 = Observable.just("Hello");
        Observable<String> observable2 = Observable.just("World");
        Observable.merge(observable1, observable2)
                .subscribe(System.out::println);
        //Hello
        //World

        // zip: 以某個方式,合併多個 Observables
        Observable.zip(observable1, observable2, (item1, item2) -> item1 + " " + item2)
                .subscribe(System.out::println);
        //Hello World

        // startWith: 將指定的 Observable 合併到另一個 的開頭
        Observable<String> names = Observable.just("Project", "zero");
        Observable<String> otherNames = Observable.just("Git", "Code");
        names.startWith(otherNames).subscribe(System.out::println);
        //Git
        //Code
        //Project
        //zero
    }

Operators Category

ReactiveX - Operators 這邊可查詢到所有 ReactiveX 的 operators 定義

Alphabetical List of Observable Operators · ReactiveX/RxJava Wiki · GitHub 這邊是 RxJava 的 operators

依照 ReactiveX 的分類有

  • Creating

  • Transforming

  • Filtering

  • Combining

  • Observable Utility

  • Conditional and Boolean

  • Mathematical and Aggregate

  • Backpressure

  • Connectable Observable

  • Operators to Convert

Scheduler

subscribeOn: 指定 Observable 的執行緒,只能寫一次,影響 Observable 的執行。會影響從訂閱開始到第一個 observeOn 的所有操作

observeOn: 在哪個執行緒上觀察 Observable 的結果,可以寫很多次切換執行緒。

    public static void test_scheduler() {
        printCurrentThread("start");
        Observable<Object> observable = Observable.just("Hello", "World");
        observable
                .map(s -> {
                    printCurrentThread("first map: "+s);
                    return s;
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .map(s -> {
                    printCurrentThread("second map: "+s);
                    return s;
                })
                .observeOn(Schedulers.single())
                .subscribe(
                    s -> {
                        printCurrentThread("subscribe: "+s);
                    }
                );

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
//        Thread: main, start
//        Thread: RxComputationThreadPool-1, first map: Hello
//        Thread: RxCachedThreadScheduler-1, second map: Hello
//        Thread: RxComputationThreadPool-1, first map: World
//        Thread: RxCachedThreadScheduler-1, second map: World
//        Thread: RxSingleScheduler-1, subscribe: Hello
//        Thread: RxSingleScheduler-1, subscribe: World
    }

    private static void printCurrentThread(String message) {
        System.out.format("Thread: %s, %s%n", Thread.currentThread().getName(), message);
    }

執行結果

Thread: main, start
Thread: RxComputationThreadPool-1, first map: Hello
Thread: RxCachedThreadScheduler-1, second map: Hello
Thread: RxComputationThreadPool-1, first map: World
Thread: RxCachedThreadScheduler-1, second map: World
Thread: RxSingleScheduler-1, subscribe: Hello
Thread: RxSingleScheduler-1, subscribe: World

Error Handling

observer 有三個 interface: OnNext, OnError, OnComplete

    public static void test_error() {
        Observable<String> observable = Observable.just("Hello", "World")
                .map(item -> {
                    if (item.equals("World")) {
                        throw new RuntimeException("Error occurred");
                    }
                    return item;
                });

        observable.subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed!")
        );
        //Hello
        //Error: Error occurred
    }

References

Introduction to RxJava | Baeldung

Guide to RxJava in Java

RxJava3 用法 - petercao - 博客园

沒有留言:

張貼留言