RxJava 是處理 observable 序列的非同步,事件導向函式庫。實作概念來自 ReactiveX,以非同步方式處理 observable stream 的 API。程式是以 Observer, Iterator Design Pattern 實作,並用 functional programming 的 style 撰寫程式。
RxJava 是 lighweight library,且支援多個 JVM 為基礎的程式語言:Groovy, Clojure, JRuby, Kotlin, Scala。
Base Classes
常用術語
upstream downstream
有一個來源的 dataflow,然後可讓資料經過幾個中間的步驟處理。
source.operator1().operator2().operator3().subscribe(consumer);
// 折行
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
對於 operator2 來說,左邊的資料來源是 upstream,右邊的 subscriber/consumer 是 downstream。
Objects in motion
RxJava 文件中,只要看到 emission, emits, item, event, signal, data, message,都可視為同義字,都代表通過 data flow 的 obejct 物件。
Backpressure
當資料流已非同步方式經過處理步驟時,每一個步驟可能會有不同的處理速度,為了避免速度太快而造成需要臨時的 buffer,或要跳過/刪除資料增加的記憶體使用量,backpressure 可因應這個問題,在每個步驟不知道 upstream 要送幾個資料過來的情況下,控制流程,限制資料流記憶體使用量。
io.reactivex.rxjava3.core.Flowable 類別就支援 backpressure。
io.reactivex.rxjava3.core.Observable 不支援 backpressure,適用於短的 sequence,GUI 互動。
Single, Maybe, Completable 都不支援 backpressure,因為一定會有空間只放一個資料。
Assembly time
dataflow 準備要套用中間的 operators 的時候,就稱為 assembly time
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);
目前資料還在準備階段,還沒有開始處理。
Subscription time
這是當 subscriber() 被呼叫時的暫時狀態,會在內部建立處理步驟鍊。
flow.subscribe(System.out::println)
在這個狀態下,資料來源會開始慢慢地釋放資料項目。
Runtime
在執行階段,資料來源持續釋放資料項目、錯誤或是完成的訊號
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
也就是執行上面這一段程式核心的時候
Simple background computation
RxJava 最常用來在另一個背景 thread 執行某些計算,遠端網路存取,最後在 UI thread 顯示結果或是錯誤。
import io.reactivex.rxjava3.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- wait for the flow to finish
coding style 類似 Builder Pattern,稱為 fluent API。
RxJava 的 reacive type 是 immutable 的,每一個 method call 都會會傳一個處理完成的新的 Flowable。
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // imitate expensive computation
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
可以透過 subscribeOn 將計算量大,或是 blocking IO 的部分,移到另一個 thread 執行,當資料完成且 ready,可以透過 observeOn 在前景或是 GUI thread 處理結果。
Scheduler
RxJava 的 operators 不能在 Thread 或 ExecutorService 直接執行,而是要透過 Scheduler 呼叫。Scheduler 封裝了 concurrency。RxJava 的幾個標準的 Scheduler 可透過 Scheduler utility 存取
Schedulers.computation()
在背景有固定數量的 threads 用來執行 operators,大部分的非同步 operators 使用這個 default Scheduler
Schedulers.io()
跟 IO 相關的 或是 blocking operations
Schedulers.single()
以 sequntial 與 FIFO 方式在單一 thread 運作
Schedulers.trampoline()
以 sequential 與 FIFO 方式,在特殊的 threads 運作,通常用在測試
這幾個 schedulers 在所有 JVM 都可以使用。但特殊平台,例如 Android,另外有內建 schedulers: AndroidSchedulers.mainThread()、SwingScheduler.instance() 或 JavaFXScheduler.platform()。
可利用 Schedulers.from(Executor)
將現有的 Executor 封裝為 Scheduler。這可用來建立一個固定,且較大的 thread pool
測試程式裡面的 Thread.sleep(2000),是因為 Scheduler 都是運作在 daemon thread,一旦 main thread 停止,這些 daemon threads 也會同時停止正在背景運作的程式。因此需要在 main thread 稍微 sleep 一段時間,讓 console 能夠等待執行結果。
Concurreny with a flow
RxJava 的 Flows 內部天生就是 sequential,但每一個 flow 可以各自獨立平行運作
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
這是對 110 的數字進行平方運算,最後在 main thread (呼叫 blockingSubscribe 的 thread) 處理結果。map(v -> v * v)
並不是以 concurrent 方式運作,他是在 computation() thread,依照順序接收 110
Parallel Processing
數字平方例子如果要平行運算
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
RxJava 的平行運算,代表分別獨立的 flows,並在最後整合到一個 flow。flatMap 就是將每個數字提供給各自獨立的 Flowable,最後合併到 main thread
flatMap 無法保證執行順序,另外有其他替代的 operators
Flowable.parallel() 跟 ParallelFlowable 也可以達到 parallel processing 的目的
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
Dependent sub-flows
flatMap 的用途
ex: 如果有一個 service 會回傳 Flowable,我們可用第一個 service 的值,呼叫另一個 service
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource
.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
.map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
.subscribe(System.out::println);
Continuations
當出現某個 item 時,需要執行對應的運算。
Dependent
最常見的例子:給定某個 value,執行另一個 service,並等待該 service 的結果繼續下去。
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
另一個例子: flatMap 裡面對每個 value,呼叫另一個 service
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
這段程式碼實現了三個連續的 API 呼叫,每個 API 呼叫結果都依賴於前一個呼叫的結果。
apiCall
發出 value
。
anotherApiCall(value)
發出 next
。
finalCallBoth(value, next)
使用這兩個結果來進行最後的調用。
Non-dependent
flatMapSingle 裡面的 ignored 是忽略的來源,全部都對應到 someSingleSource
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
另一種方法,改用 Completable 中介,然後用 andThen 執行其他操作
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
Deferred-dependent
有時候,sequence 前後可能有隱藏的相依性。
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
結果會是 0,因為 Single.just(count.get())
在 assembly time 就決定了結果,因此必須要延遲 Single 的計算到 runtime
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或是寫成
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
Type conversion
有時可能會遇到 source/service 回傳了跟 flow 所需要的類別不同的狀況。有兩種解決方式:轉換為需要的類別 或是 找到支援該類別的另一個 operator
Converting to the desired type
|
Flowable |
Observable |
Single |
Maybe |
Completable |
Flowable |
|
toObservable |
first, firstOrError, single, singleOrError, last, lastOrError |
firstElement, singleElement, lastElement |
ignoreElements |
Observable |
toFlowable |
|
first, firstOrError, single, singleOrError, last, lastOrError |
firstElement, singleElement, lastElement |
ignoreElements |
Single |
toFlowable |
toObservable |
|
toMaybe |
ignoreElements |
Maybe |
toFlowable |
toObservable |
toSingle |
|
ignoreElements |
Completable |
toFlowable |
toObservable |
toSingle |
toMaybe |
|
Using an overload with the desired type
Operator |
Overloads |
flatMap |
flatMapSingle , flatMapMaybe , flatMapCompletable , flatMapIterable |
concatMap |
concatMapSingle , concatMapMaybe , concatMapCompletable , concatMapIterable |
switchMap |
switchMapSingle , switchMapMaybe , switchMapCompletable |
Operator naming conventions
operator 命名的習慣
Unusable keywords
因為 Rx.NET 發出一個 item,稱為 Return(T)
,這跟 java 的 return 關鍵字有衝突,所以 RxJava 改為 just(T)
類似狀況是 Switch 被命名為 switchOnNext
Catch
命名為 onErrorResumeNext
Type erasure
在 operator 回傳特定類別時,會將該類別加在 operator 名稱後面
Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)
Type ambiguities
有幾個 concatWith 會 overloading 接受不同類別的參數,用在 lambda 可能會造成困擾
Flowable<T> concatWith(Publisher<? extends T> other);
Flowable<T> concatWith(SingleSource<? extends T> other);
ex: 以下程式無法編譯
someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);
這時候,需要延遲計算直到 someSource 完成,所以要用 defer
someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);
有時候為避免邏輯的模糊問題,會增加 suffix
suffix 可避免 logical ambiguities,但可能在 flow 產生錯誤的類別
Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> mergeArray(Publisher<? extends T>... sources);
Error Handling
flow 失敗時,會發生 error,可能會發生多個來源失敗,這時可選擇是不是要等所有來源完成或失敗。operator 後面會加上 DelayError
Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);
Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);
多個 suffix 可能同時出現
Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);
Base class vs base type
Type |
Class |
Interface |
Consumer |
0..N backpressured |
Flowable |
Publisher |
Subscriber |
0..N unbounded |
Observable |
ObservableSource |
Observer |
1 element or error |
Single |
SingleSource |
SingleObserver |
0..1 element or error |
Maybe |
MaybeSource |
MaybeObserver |
0 element or error |
Completable |
CompletableSource |
CompletableObserver |
References
GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.