測試如何使用 RxJava
maven pom.xml
加上 rxjava3 的 library
<!-- rxjava -->
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>3.1.8</version>
</dependency>
Observable
Observable 可從
這邊是產生 Observable,然後 observer 用 subscribe 註冊 Observable sequence。該 sequence 會發送 items 到 observer,一次發送一個
public static void test1() {
// just 會將 item 轉成 Observable
Observable<String> observerable = Observable.just("Hello", "World");
// subscribe 註冊 Observable,列印每一個 item
observerable.subscribe(System.out::println);
}
observer 有三個 interface
OnNext
當 observerable 發布一個新的 event (item),observer 收到該 item 然後能做一些操作
OnComplete
當 sequence of events 完成時,代表不會再呼叫 OnNext
OnError
如果在 RxJava framework 或是 OnNext 裡面發生 error
public static void test2() {
String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
Observable<String> observable = Observable.fromArray(letters);
// Java 的 String 是 immutable class,且是 pass by reference
// 所以這邊如果要儲存結果,不能直接用 String 物件,改用 array 記錄一直改變的 String reference
String[] result = {""};
observable.subscribe(
i -> result[0] += i, //OnNext
Throwable::printStackTrace, //OnError
() -> result[0] += "_Completed" //OnCompleted
);
System.out.println(" result[0]="+ result[0]);
// 也可以用 StringBuilder,這樣就可以一直改變 string 內容
StringBuilder sb = new StringBuilder();
observable.subscribe(
i -> sb.append(i), //OnNext
Throwable::printStackTrace, //OnError
() -> sb.append("_Completed") //OnCompleted
);
System.out.println(" sb="+ sb.toString());
// 結果 abcdefg_Completed
// 測試 OnError
// 當處理到 "a",因為無法轉成 integer 會發生 error
// error 處理為 列印 _err 文字
String[] letters2 = {"1", "a", "2"};
Observable<String> observable2 = Observable.fromArray(letters2);
StringBuilder sb2 = new StringBuilder();
observable2.subscribe(
i -> sb2.append(Integer.parseInt(i)), //OnNext
// Throwable::printStackTrace, //OnError
err -> sb2.append("_err"),
() -> sb2.append("_Completed") //OnCompleted
);
System.out.println(" sb2="+ sb2.toString());
// 結果 1_err
}
operator
map, flatMap
map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
public static void operator_test_map_flatmap() {
// map: 對每一個 item 執行某個 operation,會回傳任意物件
String[] letters = {"a", "b", "c", "d"};
StringBuilder sb = new StringBuilder();
Observable.fromArray(letters)
.map(String::toUpperCase)
.subscribe(letter -> sb.append(letter));
System.out.println("operator_test_1 sb="+ sb.toString());
// flatMap 會回傳結果的 Observable
// flatMap 不保證 sequence 順序
// map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
Observable<String> observable = Observable.just("Hello", "World");
observable
.flatMap(item -> Observable.fromArray(item.split("")))
.subscribe(System.out::println);
}
filter
filter 會過濾 item,符合條件的才會通過
public static void operator_test_filter() {
// filter 會過濾 item,符合條件的才會通過
Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
observable.filter(item -> item % 2 == 0)
.subscribe(System.out::println);
// 結果是
// 2
// 4
}
scan
scan 會將每次計算的結果,發送給下一個 步驟
public static void operator_test_scan() {
// scan 會將每次計算的結果,發送給下一個 步驟
String[] letters = {"a", "b", "c"};
StringBuilder sb = new StringBuilder();
Observable.fromArray(letters)
.scan(new StringBuilder(), StringBuilder::append)
.subscribe(total -> sb.append(total.toString()+" ") );
System.out.println("operator_test_scan sb="+ sb.toString());
// a ab abc
// 累加 1~5
StringBuilder sb2 = new StringBuilder();
Observable.range(1,5)
.scan( (Integer res1, Integer res2) -> res1+res2 )
.subscribe(total -> sb2.append(total.toString()+" ") );
System.out.println("operator_test_scan sb="+ sb2.toString());
// 1 3 6 10 15
}
groupBy
將 event 分類
ex: 將數字依照單雙數分類
public static void operator_test_groupby() {
String[] nums = {"", ""};
Observable.range(1,5)
.groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
.subscribe(group ->
group.subscribe((number) -> {
if (group.getKey().toString().equals("EVEN")) {
nums[0] += number+" ";
} else {
nums[1] += number+" ";
}
})
);
System.out.println("operator_test_groupby nums="+ Arrays.deepToString(nums));
// 結果 operator_test_groupby nums=[2 4 , 1 3 5 ]
}
conditional
defaultIfEmpty
first
takeWhile
public static void operator_test_conditional() {
// empty() 可產生一個 空的 Observable
// defaultIfEmpty() 在 observable 空的時候有作用
Observable.empty()
.defaultIfEmpty("Observable is empty")
.subscribe(System.out::println);
// first 發出第一個來源資料,如果沒有則發送預設值。
String[] letters = {"a", "b", "c"};
StringBuilder sb = new StringBuilder();
Observable.fromArray(letters)
.defaultIfEmpty("Observable is empty")
.first("default first")
.subscribe( res -> sb.append(res) );
System.out.println("operator_test_conditional sb="+sb);
// operator_test_conditional sb=a
StringBuilder sb2 = new StringBuilder();
Observable.empty()
.defaultIfEmpty("Observable is empty")
.first("default first")
.subscribe( res -> sb2.append(res) );
System.out.println("operator_test_conditional empty sb2="+sb2);
// operator_test_conditional empty sb2=Observable is empty
// takeWhile 只在滿足條件時,取用此 item,否則就丟棄該 item
int[] sum = {0};
Observable.range(1,10)
.takeWhile(i -> i < 5)
.subscribe(s -> sum[0] += s);
System.out.println("operator_test_conditional takeWhile sum="+sum[0]);
// operator_test_conditional takeWhile sum=10
}
combining
public static void operator_test_combining() {
// merge: 將多個 Observables 合併成一個
Observable<String> observable1 = Observable.just("Hello");
Observable<String> observable2 = Observable.just("World");
Observable.merge(observable1, observable2)
.subscribe(System.out::println);
//Hello
//World
// zip: 以某個方式,合併多個 Observables
Observable.zip(observable1, observable2, (item1, item2) -> item1 + " " + item2)
.subscribe(System.out::println);
//Hello World
// startWith: 將指定的 Observable 合併到另一個 的開頭
Observable<String> names = Observable.just("Project", "zero");
Observable<String> otherNames = Observable.just("Git", "Code");
names.startWith(otherNames).subscribe(System.out::println);
//Git
//Code
//Project
//zero
}
Operators Category
ReactiveX - Operators 這邊可查詢到所有 ReactiveX 的 operators 定義
Alphabetical List of Observable Operators · ReactiveX/RxJava Wiki · GitHub 這邊是 RxJava 的 operators
依照 ReactiveX 的分類有
Creating
Transforming
Filtering
Combining
Observable Utility
Conditional and Boolean
Mathematical and Aggregate
Backpressure
Connectable Observable
Operators to Convert
Scheduler
subscribeOn: 指定 Observable 的執行緒,只能寫一次,影響 Observable 的執行。會影響從訂閱開始到第一個 observeOn 的所有操作
observeOn: 在哪個執行緒上觀察 Observable 的結果,可以寫很多次切換執行緒。
public static void test_scheduler() {
printCurrentThread("start");
Observable<Object> observable = Observable.just("Hello", "World");
observable
.map(s -> {
printCurrentThread("first map: "+s);
return s;
})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.map(s -> {
printCurrentThread("second map: "+s);
return s;
})
.observeOn(Schedulers.single())
.subscribe(
s -> {
printCurrentThread("subscribe: "+s);
}
);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Thread: main, start
// Thread: RxComputationThreadPool-1, first map: Hello
// Thread: RxCachedThreadScheduler-1, second map: Hello
// Thread: RxComputationThreadPool-1, first map: World
// Thread: RxCachedThreadScheduler-1, second map: World
// Thread: RxSingleScheduler-1, subscribe: Hello
// Thread: RxSingleScheduler-1, subscribe: World
}
private static void printCurrentThread(String message) {
System.out.format("Thread: %s, %s%n", Thread.currentThread().getName(), message);
}
執行結果
Thread: main, start
Thread: RxComputationThreadPool-1, first map: Hello
Thread: RxCachedThreadScheduler-1, second map: Hello
Thread: RxComputationThreadPool-1, first map: World
Thread: RxCachedThreadScheduler-1, second map: World
Thread: RxSingleScheduler-1, subscribe: Hello
Thread: RxSingleScheduler-1, subscribe: World
Error Handling
observer 有三個 interface: OnNext, OnError, OnComplete
public static void test_error() {
Observable<String> observable = Observable.just("Hello", "World")
.map(item -> {
if (item.equals("World")) {
throw new RuntimeException("Error occurred");
}
return item;
});
observable.subscribe(
System.out::println,
error -> System.err.println("Error: " + error.getMessage()),
() -> System.out.println("Completed!")
);
//Hello
//Error: Error occurred
}