2024/12/02

Retrofit RxJava Adapter

Retrofit 的 RxJava Adapter 用於將 Retrofit 的服務呼叫結果轉換為 RxJava 的 Observable、Single、Maybe 或 Completable 等反應式類別。在進行網路請求時,能充分利用 RxJava 的功能,如流控制、非同步處理、錯誤處理等。

https://reqres.in/ 有一個 List User 的 GET method API,以下以這個 API 實作

pom.xml

廚了 Retrofit2 集 RxJava3 以外,增加 adapter-rxjava3

        <!--retrofit-->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-gson</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.squareup.retrofit2/adapter-rxjava3 -->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>adapter-rxjava3</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Retrofit

透過網頁服務的 API 定義 service 及發送,回傳的資料

UserData.java

public class UserData {
    private long id;
    private String first_name;
    private String last_name;
    private String email;
// getter, setter, toString
}

UserList.java

public class UserList {
    private int page;
    private int per_page;
    private int total;
    private int total_pages;

    private List<UserData> data;
// getter, setter, toString
}

UserListService.java

注意,這邊的回傳結果改為 Observable<UserList>

public interface UserListService {
    @GET("/api/users")
    Observable<UserList> listUser(@Query("page") int pageno);
}

Retrofit RxJava 測試

RetrofitRxJavaTest.java

public class RetrofitRxJavaTest {
    public static void sync() {
        int maxIdleConnections = 10;
        int keepAliveDurationMills = 1000;
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .client(httpClient.build())
                .build();

        UserListService service = retrofit.create(UserListService.class);

        Observable<UserList> observable = service.listUser(2);
        observable.subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.single())
//                    .subscribe(
//                            System.out::println
//                    );
                    .subscribe(new DefaultObserver<UserList>() {
                        @Override
                        public void onNext(UserList response) {
                            System.out.println(response);
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    });
    }

    public static void main(String[] args) {
        System.out.println(new Date());
        sync();

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

如果程式放在 Android APP,則可以改為在主執行緒,也就是主畫面的執行緒上觀察結果

service.listUser(2).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())

2024/11/25

How to use RxJava

測試如何使用 RxJava

maven pom.xml

加上 rxjava3 的 library

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Observable

Observable 可從

這邊是產生 Observable,然後 observer 用 subscribe 註冊 Observable sequence。該 sequence 會發送 items 到 observer,一次發送一個

    public static void test1() {
        // just 會將 item 轉成 Observable
        Observable<String> observerable = Observable.just("Hello", "World");
        // subscribe 註冊 Observable,列印每一個 item
        observerable.subscribe(System.out::println);
    }

observer 有三個 interface

  • OnNext

    當 observerable 發布一個新的 event (item),observer 收到該 item 然後能做一些操作

  • OnComplete

    當 sequence of events 完成時,代表不會再呼叫 OnNext

  • OnError

    如果在 RxJava framework 或是 OnNext 裡面發生 error

    public static void test2() {
        String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
        Observable<String> observable = Observable.fromArray(letters);

        // Java 的 String 是 immutable class,且是 pass by reference
        // 所以這邊如果要儲存結果,不能直接用 String 物件,改用 array 記錄一直改變的 String reference
        String[] result = {""};
        observable.subscribe(
                i -> result[0] += i,  //OnNext
                Throwable::printStackTrace, //OnError
                () -> result[0] += "_Completed" //OnCompleted
        );
        System.out.println(" result[0]="+ result[0]);

        // 也可以用 StringBuilder,這樣就可以一直改變 string 內容
        StringBuilder sb = new StringBuilder();
        observable.subscribe(
                i -> sb.append(i),  //OnNext
                Throwable::printStackTrace, //OnError
                () -> sb.append("_Completed") //OnCompleted
        );
        System.out.println(" sb="+ sb.toString());
        // 結果 abcdefg_Completed

        // 測試 OnError
        // 當處理到 "a",因為無法轉成 integer 會發生 error
        // error 處理為 列印 _err 文字
        String[] letters2 = {"1", "a", "2"};
        Observable<String> observable2 = Observable.fromArray(letters2);
        StringBuilder sb2 = new StringBuilder();
        observable2.subscribe(
                i -> sb2.append(Integer.parseInt(i)),  //OnNext
//                Throwable::printStackTrace, //OnError
                err -> sb2.append("_err"),
                () -> sb2.append("_Completed") //OnCompleted
        );
        System.out.println(" sb2="+ sb2.toString());
        // 結果 1_err
    }

operator

map, flatMap

map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many

    public static void operator_test_map_flatmap() {
        // map: 對每一個 item 執行某個 operation,會回傳任意物件
        String[] letters = {"a", "b", "c", "d"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .map(String::toUpperCase)
                .subscribe(letter -> sb.append(letter));
        System.out.println("operator_test_1 sb="+ sb.toString());

        // flatMap 會回傳結果的 Observable
        // flatMap 不保證 sequence 順序
        // map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
        Observable<String> observable = Observable.just("Hello", "World");
        observable
            .flatMap(item -> Observable.fromArray(item.split("")))
            .subscribe(System.out::println);
    }

filter

filter 會過濾 item,符合條件的才會通過

    public static void operator_test_filter() {
        // filter 會過濾 item,符合條件的才會通過
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(item -> item % 2 == 0)
                .subscribe(System.out::println);
        // 結果是
        // 2
        // 4
    }

scan

scan 會將每次計算的結果,發送給下一個 步驟

    public static void operator_test_scan() {

        // scan 會將每次計算的結果,發送給下一個 步驟
        String[] letters = {"a", "b", "c"};

        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .scan(new StringBuilder(), StringBuilder::append)
                .subscribe(total -> sb.append(total.toString()+" ") );

        System.out.println("operator_test_scan sb="+ sb.toString());
        // a ab abc

        // 累加 1~5
        StringBuilder sb2 = new StringBuilder();
        Observable.range(1,5)
                .scan( (Integer res1, Integer res2) -> res1+res2  )
                .subscribe(total -> sb2.append(total.toString()+" ") );
        System.out.println("operator_test_scan sb="+ sb2.toString());
        // 1 3 6 10 15
    }

groupBy

將 event 分類

ex: 將數字依照單雙數分類

    public static void operator_test_groupby() {
        String[] nums = {"", ""};
        Observable.range(1,5)
                .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
                .subscribe(group ->
                        group.subscribe((number) -> {
                            if (group.getKey().toString().equals("EVEN")) {
                                nums[0] += number+" ";
                            } else {
                                nums[1] += number+" ";
                            }
                        })
                );
        System.out.println("operator_test_groupby nums="+ Arrays.deepToString(nums));
        // 結果  operator_test_groupby nums=[2 4 , 1 3 5 ]
    }

conditional

defaultIfEmpty

first

takeWhile

    public static void operator_test_conditional() {
        // empty() 可產生一個 空的 Observable
        // defaultIfEmpty() 在 observable 空的時候有作用
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .subscribe(System.out::println);

        // first  發出第一個來源資料,如果沒有則發送預設值。
        String[] letters = {"a", "b", "c"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb.append(res) );
        System.out.println("operator_test_conditional sb="+sb);
        // operator_test_conditional sb=a

        StringBuilder sb2 = new StringBuilder();
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb2.append(res) );
        System.out.println("operator_test_conditional empty sb2="+sb2);
        // operator_test_conditional empty sb2=Observable is empty

        // takeWhile 只在滿足條件時,取用此 item,否則就丟棄該 item
        int[] sum = {0};
        Observable.range(1,10)
                .takeWhile(i -> i < 5)
                .subscribe(s -> sum[0] += s);
        System.out.println("operator_test_conditional takeWhile sum="+sum[0]);
        // operator_test_conditional takeWhile sum=10
    }

combining

    public static void operator_test_combining() {
        // merge: 將多個 Observables 合併成一個
        Observable<String> observable1 = Observable.just("Hello");
        Observable<String> observable2 = Observable.just("World");
        Observable.merge(observable1, observable2)
                .subscribe(System.out::println);
        //Hello
        //World

        // zip: 以某個方式,合併多個 Observables
        Observable.zip(observable1, observable2, (item1, item2) -> item1 + " " + item2)
                .subscribe(System.out::println);
        //Hello World

        // startWith: 將指定的 Observable 合併到另一個 的開頭
        Observable<String> names = Observable.just("Project", "zero");
        Observable<String> otherNames = Observable.just("Git", "Code");
        names.startWith(otherNames).subscribe(System.out::println);
        //Git
        //Code
        //Project
        //zero
    }

Operators Category

ReactiveX - Operators 這邊可查詢到所有 ReactiveX 的 operators 定義

Alphabetical List of Observable Operators · ReactiveX/RxJava Wiki · GitHub 這邊是 RxJava 的 operators

依照 ReactiveX 的分類有

  • Creating

  • Transforming

  • Filtering

  • Combining

  • Observable Utility

  • Conditional and Boolean

  • Mathematical and Aggregate

  • Backpressure

  • Connectable Observable

  • Operators to Convert

Scheduler

subscribeOn: 指定 Observable 的執行緒,只能寫一次,影響 Observable 的執行。會影響從訂閱開始到第一個 observeOn 的所有操作

observeOn: 在哪個執行緒上觀察 Observable 的結果,可以寫很多次切換執行緒。

    public static void test_scheduler() {
        printCurrentThread("start");
        Observable<Object> observable = Observable.just("Hello", "World");
        observable
                .map(s -> {
                    printCurrentThread("first map: "+s);
                    return s;
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .map(s -> {
                    printCurrentThread("second map: "+s);
                    return s;
                })
                .observeOn(Schedulers.single())
                .subscribe(
                    s -> {
                        printCurrentThread("subscribe: "+s);
                    }
                );

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
//        Thread: main, start
//        Thread: RxComputationThreadPool-1, first map: Hello
//        Thread: RxCachedThreadScheduler-1, second map: Hello
//        Thread: RxComputationThreadPool-1, first map: World
//        Thread: RxCachedThreadScheduler-1, second map: World
//        Thread: RxSingleScheduler-1, subscribe: Hello
//        Thread: RxSingleScheduler-1, subscribe: World
    }

    private static void printCurrentThread(String message) {
        System.out.format("Thread: %s, %s%n", Thread.currentThread().getName(), message);
    }

執行結果

Thread: main, start
Thread: RxComputationThreadPool-1, first map: Hello
Thread: RxCachedThreadScheduler-1, second map: Hello
Thread: RxComputationThreadPool-1, first map: World
Thread: RxCachedThreadScheduler-1, second map: World
Thread: RxSingleScheduler-1, subscribe: Hello
Thread: RxSingleScheduler-1, subscribe: World

Error Handling

observer 有三個 interface: OnNext, OnError, OnComplete

    public static void test_error() {
        Observable<String> observable = Observable.just("Hello", "World")
                .map(item -> {
                    if (item.equals("World")) {
                        throw new RuntimeException("Error occurred");
                    }
                    return item;
                });

        observable.subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed!")
        );
        //Hello
        //Error: Error occurred
    }

References

Introduction to RxJava | Baeldung

Guide to RxJava in Java

RxJava3 用法 - petercao - 博客园

2024/11/18

RxJava

RxJava 是處理 observable 序列的非同步,事件導向函式庫。實作概念來自 ReactiveX,以非同步方式處理 observable stream 的 API。程式是以 Observer, Iterator Design Pattern 實作,並用 functional programming 的 style 撰寫程式。

RxJava 是 lighweight library,且支援多個 JVM 為基礎的程式語言:Groovy, Clojure, JRuby, Kotlin, Scala。

Base Classes

常用術語

upstream downstream

有一個來源的 dataflow,然後可讓資料經過幾個中間的步驟處理。

source.operator1().operator2().operator3().subscribe(consumer);

// 折行
source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

對於 operator2 來說,左邊的資料來源是 upstream,右邊的 subscriber/consumer 是 downstream。

Objects in motion

RxJava 文件中,只要看到 emission, emits, item, event, signal, data, message,都可視為同義字,都代表通過 data flow 的 obejct 物件。

Backpressure

當資料流已非同步方式經過處理步驟時,每一個步驟可能會有不同的處理速度,為了避免速度太快而造成需要臨時的 buffer,或要跳過/刪除資料增加的記憶體使用量,backpressure 可因應這個問題,在每個步驟不知道 upstream 要送幾個資料過來的情況下,控制流程,限制資料流記憶體使用量。

io.reactivex.rxjava3.core.Flowable 類別就支援 backpressure。

io.reactivex.rxjava3.core.Observable 不支援 backpressure,適用於短的 sequence,GUI 互動。

Single, Maybe, Completable 都不支援 backpressure,因為一定會有空間只放一個資料。

Assembly time

dataflow 準備要套用中間的 operators 的時候,就稱為 assembly time

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

目前資料還在準備階段,還沒有開始處理。

Subscription time

這是當 subscriber() 被呼叫時的暫時狀態,會在內部建立處理步驟鍊。

flow.subscribe(System.out::println)

在這個狀態下,資料來源會開始慢慢地釋放資料項目。

Runtime

在執行階段,資料來源持續釋放資料項目、錯誤或是完成的訊號

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

也就是執行上面這一段程式核心的時候


Simple background computation

RxJava 最常用來在另一個背景 thread 執行某些計算,遠端網路存取,最後在 UI thread 顯示結果或是錯誤。

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

coding style 類似 Builder Pattern,稱為 fluent API。

RxJava 的 reacive type 是 immutable 的,每一個 method call 都會會傳一個處理完成的新的 Flowable。

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

可以透過 subscribeOn 將計算量大,或是 blocking IO 的部分,移到另一個 thread 執行,當資料完成且 ready,可以透過 observeOn 在前景或是 GUI thread 處理結果。

Scheduler

RxJava 的 operators 不能在 Thread 或 ExecutorService 直接執行,而是要透過 Scheduler 呼叫。Scheduler 封裝了 concurrency。RxJava 的幾個標準的 Scheduler 可透過 Scheduler utility 存取

  • Schedulers.computation()

    在背景有固定數量的 threads 用來執行 operators,大部分的非同步 operators 使用這個 default Scheduler

  • Schedulers.io()

    跟 IO 相關的 或是 blocking operations

  • Schedulers.single()

    以 sequntial 與 FIFO 方式在單一 thread 運作

  • Schedulers.trampoline()

    以 sequential 與 FIFO 方式,在特殊的 threads 運作,通常用在測試

這幾個 schedulers 在所有 JVM 都可以使用。但特殊平台,例如 Android,另外有內建 schedulers: AndroidSchedulers.mainThread()、SwingScheduler.instance() 或 JavaFXScheduler.platform()。

可利用 Schedulers.from(Executor) 將現有的 Executor 封裝為 Scheduler。這可用來建立一個固定,且較大的 thread pool

測試程式裡面的 Thread.sleep(2000),是因為 Scheduler 都是運作在 daemon thread,一旦 main thread 停止,這些 daemon threads 也會同時停止正在背景運作的程式。因此需要在 main thread 稍微 sleep 一段時間,讓 console 能夠等待執行結果。

Concurreny with a flow

RxJava 的 Flows 內部天生就是 sequential,但每一個 flow 可以各自獨立平行運作

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

這是對 110 的數字進行平方運算,最後在 main thread (呼叫 blockingSubscribe 的 thread) 處理結果。map(v -> v * v) 並不是以 concurrent 方式運作,他是在 computation() thread,依照順序接收 110

Parallel Processing

數字平方例子如果要平行運算

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

RxJava 的平行運算,代表分別獨立的 flows,並在最後整合到一個 flow。flatMap 就是將每個數字提供給各自獨立的 Flowable,最後合併到 main thread

flatMap 無法保證執行順序,另外有其他替代的 operators

  • concatMap

    一次執行一個 inner flow

  • concatMapEager

    一次執行所有內部的 flows,但用 inner flows 建立的順序輸出結果

Flowable.parallel() 跟 ParallelFlowable 也可以達到 parallel processing 的目的

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap 的用途

ex: 如果有一個 service 會回傳 Flowable,我們可用第一個 service 的值,呼叫另一個 service

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

當出現某個 item 時,需要執行對應的運算。

Dependent

最常見的例子:給定某個 value,執行另一個 service,並等待該 service 的結果繼續下去。

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

另一個例子: flatMap 裡面對每個 value,呼叫另一個 service

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

這段程式碼實現了三個連續的 API 呼叫,每個 API 呼叫結果都依賴於前一個呼叫的結果。

  • apiCall 發出 value
  • anotherApiCall(value) 發出 next
  • finalCallBoth(value, next) 使用這兩個結果來進行最後的調用。

Non-dependent

flatMapSingle 裡面的 ignored 是忽略的來源,全部都對應到 someSingleSource

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

另一種方法,改用 Completable 中介,然後用 andThen 執行其他操作

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

Deferred-dependent

有時候,sequence 前後可能有隱藏的相依性。

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

結果會是 0,因為 Single.just(count.get()) 在 assembly time 就決定了結果,因此必須要延遲 Single 的計算到 runtime

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或是寫成

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversion

有時可能會遇到 source/service 回傳了跟 flow 所需要的類別不同的狀況。有兩種解決方式:轉換為需要的類別 或是 找到支援該類別的另一個 operator

Converting to the desired type

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Observable toFlowable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Single toFlowable toObservable toMaybe ignoreElements
Maybe toFlowable toObservable toSingle ignoreElements
Completable toFlowable toObservable toSingle toMaybe

Using an overload with the desired type

Operator Overloads
flatMap flatMapSingleflatMapMaybeflatMapCompletableflatMapIterable
concatMap concatMapSingleconcatMapMaybeconcatMapCompletableconcatMapIterable
switchMap switchMapSingleswitchMapMaybeswitchMapCompletable

Operator naming conventions

operator 命名的習慣

Unusable keywords

因為 Rx.NET 發出一個 item,稱為 Return(T) ,這跟 java 的 return 關鍵字有衝突,所以 RxJava 改為 just(T)

類似狀況是 Switch 被命名為 switchOnNext

Catch 命名為 onErrorResumeNext

Type erasure

在 operator 回傳特定類別時,會將該類別加在 operator 名稱後面

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

有幾個 concatWith 會 overloading 接受不同類別的參數,用在 lambda 可能會造成困擾

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

ex: 以下程式無法編譯

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

這時候,需要延遲計算直到 someSource 完成,所以要用 defer

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有時候為避免邏輯的模糊問題,會增加 suffix

suffix 可避免 logical ambiguities,但可能在 flow 產生錯誤的類別

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

Error Handling

flow 失敗時,會發生 error,可能會發生多個來源失敗,這時可選擇是不是要等所有來源完成或失敗。operator 後面會加上 DelayError

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

多個 suffix 可能同時出現

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

Type Class Interface Consumer
0..N backpressured Flowable Publisher Subscriber
0..N unbounded Observable ObservableSource Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

References

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.