2024/11/18

RxJava

RxJava 是處理 observable 序列的非同步,事件導向函式庫。實作概念來自 ReactiveX,以非同步方式處理 observable stream 的 API。程式是以 Observer, Iterator Design Pattern 實作,並用 functional programming 的 style 撰寫程式。

RxJava 是 lighweight library,且支援多個 JVM 為基礎的程式語言:Groovy, Clojure, JRuby, Kotlin, Scala。

Base Classes

常用術語

upstream downstream

有一個來源的 dataflow,然後可讓資料經過幾個中間的步驟處理。

source.operator1().operator2().operator3().subscribe(consumer);

// 折行
source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

對於 operator2 來說,左邊的資料來源是 upstream,右邊的 subscriber/consumer 是 downstream。

Objects in motion

RxJava 文件中,只要看到 emission, emits, item, event, signal, data, message,都可視為同義字,都代表通過 data flow 的 obejct 物件。

Backpressure

當資料流已非同步方式經過處理步驟時,每一個步驟可能會有不同的處理速度,為了避免速度太快而造成需要臨時的 buffer,或要跳過/刪除資料增加的記憶體使用量,backpressure 可因應這個問題,在每個步驟不知道 upstream 要送幾個資料過來的情況下,控制流程,限制資料流記憶體使用量。

io.reactivex.rxjava3.core.Flowable 類別就支援 backpressure。

io.reactivex.rxjava3.core.Observable 不支援 backpressure,適用於短的 sequence,GUI 互動。

Single, Maybe, Completable 都不支援 backpressure,因為一定會有空間只放一個資料。

Assembly time

dataflow 準備要套用中間的 operators 的時候,就稱為 assembly time

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

目前資料還在準備階段,還沒有開始處理。

Subscription time

這是當 subscriber() 被呼叫時的暫時狀態,會在內部建立處理步驟鍊。

flow.subscribe(System.out::println)

在這個狀態下,資料來源會開始慢慢地釋放資料項目。

Runtime

在執行階段,資料來源持續釋放資料項目、錯誤或是完成的訊號

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

也就是執行上面這一段程式核心的時候


Simple background computation

RxJava 最常用來在另一個背景 thread 執行某些計算,遠端網路存取,最後在 UI thread 顯示結果或是錯誤。

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

coding style 類似 Builder Pattern,稱為 fluent API。

RxJava 的 reacive type 是 immutable 的,每一個 method call 都會會傳一個處理完成的新的 Flowable。

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

可以透過 subscribeOn 將計算量大,或是 blocking IO 的部分,移到另一個 thread 執行,當資料完成且 ready,可以透過 observeOn 在前景或是 GUI thread 處理結果。

Scheduler

RxJava 的 operators 不能在 Thread 或 ExecutorService 直接執行,而是要透過 Scheduler 呼叫。Scheduler 封裝了 concurrency。RxJava 的幾個標準的 Scheduler 可透過 Scheduler utility 存取

  • Schedulers.computation()

    在背景有固定數量的 threads 用來執行 operators,大部分的非同步 operators 使用這個 default Scheduler

  • Schedulers.io()

    跟 IO 相關的 或是 blocking operations

  • Schedulers.single()

    以 sequntial 與 FIFO 方式在單一 thread 運作

  • Schedulers.trampoline()

    以 sequential 與 FIFO 方式,在特殊的 threads 運作,通常用在測試

這幾個 schedulers 在所有 JVM 都可以使用。但特殊平台,例如 Android,另外有內建 schedulers: AndroidSchedulers.mainThread()、SwingScheduler.instance() 或 JavaFXScheduler.platform()。

可利用 Schedulers.from(Executor) 將現有的 Executor 封裝為 Scheduler。這可用來建立一個固定,且較大的 thread pool

測試程式裡面的 Thread.sleep(2000),是因為 Scheduler 都是運作在 daemon thread,一旦 main thread 停止,這些 daemon threads 也會同時停止正在背景運作的程式。因此需要在 main thread 稍微 sleep 一段時間,讓 console 能夠等待執行結果。

Concurreny with a flow

RxJava 的 Flows 內部天生就是 sequential,但每一個 flow 可以各自獨立平行運作

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

這是對 110 的數字進行平方運算,最後在 main thread (呼叫 blockingSubscribe 的 thread) 處理結果。map(v -> v * v) 並不是以 concurrent 方式運作,他是在 computation() thread,依照順序接收 110

Parallel Processing

數字平方例子如果要平行運算

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

RxJava 的平行運算,代表分別獨立的 flows,並在最後整合到一個 flow。flatMap 就是將每個數字提供給各自獨立的 Flowable,最後合併到 main thread

flatMap 無法保證執行順序,另外有其他替代的 operators

  • concatMap

    一次執行一個 inner flow

  • concatMapEager

    一次執行所有內部的 flows,但用 inner flows 建立的順序輸出結果

Flowable.parallel() 跟 ParallelFlowable 也可以達到 parallel processing 的目的

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap 的用途

ex: 如果有一個 service 會回傳 Flowable,我們可用第一個 service 的值,呼叫另一個 service

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

當出現某個 item 時,需要執行對應的運算。

Dependent

最常見的例子:給定某個 value,執行另一個 service,並等待該 service 的結果繼續下去。

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

另一個例子: flatMap 裡面對每個 value,呼叫另一個 service

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

這段程式碼實現了三個連續的 API 呼叫,每個 API 呼叫結果都依賴於前一個呼叫的結果。

  • apiCall 發出 value
  • anotherApiCall(value) 發出 next
  • finalCallBoth(value, next) 使用這兩個結果來進行最後的調用。

Non-dependent

flatMapSingle 裡面的 ignored 是忽略的來源,全部都對應到 someSingleSource

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

另一種方法,改用 Completable 中介,然後用 andThen 執行其他操作

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

Deferred-dependent

有時候,sequence 前後可能有隱藏的相依性。

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

結果會是 0,因為 Single.just(count.get()) 在 assembly time 就決定了結果,因此必須要延遲 Single 的計算到 runtime

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或是寫成

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversion

有時可能會遇到 source/service 回傳了跟 flow 所需要的類別不同的狀況。有兩種解決方式:轉換為需要的類別 或是 找到支援該類別的另一個 operator

Converting to the desired type

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Observable toFlowable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Single toFlowable toObservable toMaybe ignoreElements
Maybe toFlowable toObservable toSingle ignoreElements
Completable toFlowable toObservable toSingle toMaybe

Using an overload with the desired type

Operator Overloads
flatMap flatMapSingleflatMapMaybeflatMapCompletableflatMapIterable
concatMap concatMapSingleconcatMapMaybeconcatMapCompletableconcatMapIterable
switchMap switchMapSingleswitchMapMaybeswitchMapCompletable

Operator naming conventions

operator 命名的習慣

Unusable keywords

因為 Rx.NET 發出一個 item,稱為 Return(T) ,這跟 java 的 return 關鍵字有衝突,所以 RxJava 改為 just(T)

類似狀況是 Switch 被命名為 switchOnNext

Catch 命名為 onErrorResumeNext

Type erasure

在 operator 回傳特定類別時,會將該類別加在 operator 名稱後面

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

有幾個 concatWith 會 overloading 接受不同類別的參數,用在 lambda 可能會造成困擾

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

ex: 以下程式無法編譯

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

這時候,需要延遲計算直到 someSource 完成,所以要用 defer

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有時候為避免邏輯的模糊問題,會增加 suffix

suffix 可避免 logical ambiguities,但可能在 flow 產生錯誤的類別

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

Error Handling

flow 失敗時,會發生 error,可能會發生多個來源失敗,這時可選擇是不是要等所有來源完成或失敗。operator 後面會加上 DelayError

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

多個 suffix 可能同時出現

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

Type Class Interface Consumer
0..N backpressured Flowable Publisher Subscriber
0..N unbounded Observable ObservableSource Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

References

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

2024/11/11

Method Reference

因為 Java lambda expression 語法,需要有一種語法,可以引用 method,但不直接呼叫該 method。Method Reference 就是利用 :: 雙冒號,讓 Java 可以做到這件事。

Method Reference 有四種

Kind Syntax Examples
參考 class 的 static method *ContainingClass*::*staticMethodName* Person::compareByAge
MethodReferencesExamples::appendStrings
參考一個 object instance 的 method *containingObject*::*instanceMethodName* myComparisonProvider::compareByName
myApp::appendStrings2
參考特定類別,任意物件的 instance method *ContainingType*::*methodName* String::compareToIgnoreCase
String::concat
參考 constructor *ClassName*::new HashSet::new

物件比較範例

首先定義一個 Person.java data class

import java.util.Date;

public class Person {
    private String name;
    private int age;
    private Date birthday;

    public static int compareByAge(Person a, Person b) {
        return a.birthday.compareTo(b.birthday);
    }

    public Person(String name, int age, Date birthday) {
        this.name = name;
        this.age = age;
        this.birthday = birthday;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public Date getBirthday() {
        return birthday;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", birthday=" + birthday +
                '}';
    }
}

傳統的物件比較,會用 comparator 來實作。目前可以進化改用 lambda expression,或是 method reference

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.stream.Stream;

public class PersonExample {
    public static Person[] newArray() {
        try {
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd", Locale.ENGLISH);
            Person a = new Person("A", 10, formatter.parse("2014-02-01"));
            Person b = new Person("B", 20, formatter.parse("2004-03-01"));
            Person[] rosterAsArray = {a, b};
            return rosterAsArray;
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }

    public static void test1_comparator() {
        Person[] rosterAsArray = newArray();
        // 定義 Comparator
        class PersonAgeComparator implements Comparator<Person> {
            public int compare(Person a, Person b) {
                return a.getBirthday().compareTo(b.getBirthday());
            }
        }
        Arrays.sort(rosterAsArray, new PersonAgeComparator());
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test2_lambda() {
        Person[] rosterAsArray = newArray();
        // 使用 lambda expression
        Arrays.sort(rosterAsArray,
                (Person p1, Person p2) -> {
                    return p1.getBirthday().compareTo(p2.getBirthday());
                }
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test3_static_method() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method
        Arrays.sort(rosterAsArray,
                (p3, p4) -> Person.compareByAge(p3, p4)
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test4_static_method_reference() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method reference
        Arrays.sort(rosterAsArray, Person::compareByAge);
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void main(String... args) {
        test1_comparator();
        test2_lambda();
        test3_static_method();
        test4_static_method_reference();
    }
}

MethodReference 測試

以下例子利用上面的前三種 Method Reference 方式,列印 "Hello World!",第一個是直接使用 lambda expression 實作。

import java.util.function.BiFunction;

public class MethodReferencesExamples {

    public static <T> T mergeThings(T a, T b, BiFunction<T, T, T> merger) {
        return merger.apply(a, b);
    }

    public static String appendStrings(String a, String b) {
        return a + b;
    }

    public String appendStrings2(String a, String b) {
        return a + b;
    }

    public static void main(String[] args) {

        MethodReferencesExamples myApp = new MethodReferencesExamples();

        // Calling the method mergeThings with a lambda expression
        // 以 lambda expression 呼叫 mergeThings
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", (a, b) -> a + b));

        // Reference to a static method
        // static method 的 Method Reference
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", MethodReferencesExamples::appendStrings));

        // Reference to an instance method of a particular object
        // 參考一個 object instance 的 method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", myApp::appendStrings2));

        // Reference to an instance method of an arbitrary object of a
        // particular type
        // 參考特定類別,任意物件的 instance method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", String::concat));
    }
}

Reference

Method References

2024/10/28

Retrofit

因為現今網路服務最常見的就是用 http 協定提供,一般資料面向的服務,也是以 JSON 資料格式作為輸入與輸出的資料格式。Retrofit 定義自己是 type-safe HTTP client for Android and Java。這邊 type-safe 的意思是,透過 Retrofit 的包裝,可以自動將 http request 與 response 資料轉換為 java 類別物件,在自動轉換的過程中,就能以類別的定義,去檢查資料是不是符合類別的定義,而不是未經定義的其他資料。因為是遠端網路服務,Retrofit 提供了同步與非同步兩種使用 http 服務的方式,非同步的呼叫方法可以解決網路延遲,甚至是故障的問題。

測試 http 服務

網路上提供 Fake HTTP API service,有以下網站可以用

GET https://reqres.in/api/users/2

{
    "data": {
        "id": 2,
        "email": "janet.weaver@reqres.in",
        "first_name": "Janet",
        "last_name": "Weaver",
        "avatar": "https://reqres.in/img/faces/2-image.jpg"
    },
    "support": {
        "url": "https://reqres.in/#support-heading",
        "text": "To keep ReqRes free, contributions towards server costs are appreciated!"
    }
}

POST https://reqres.in/api/users

輸入參數

{
    "name": "John",
    "job": "leader"
}

輸出

{
    "name": "morpheus",
    "job": "leader",
    "id": "63",
    "createdAt": "2024-05-22T03:31:37.941Z"
}

Maven pom.xml

在 xml 裡面引用 retrofits 以及 converter-gson 的 library

        <!--retrofit-->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-gson</artifactId>
            <version>2.11.0</version>
        </dependency>

同步/非同步

透過 service 產生的 API Call 有 同步/非同步 兩種呼叫方式

  • execute() – Synchronously send the request and return its response.
  • enqueue(retrofit2.Callback) – Asynchronously send the request and notify callback of its response or if an error occurred talking to the server, creating the request, or processing the response.

Converter

retrofit2 的 conveter 是用在 http request 與 response 的資料轉換,目前支援這些格式

  • Gsoncom.squareup.retrofit2:converter-gson
  • Jacksoncom.squareup.retrofit2:converter-jackson
  • Moshicom.squareup.retrofit2:converter-moshi
  • Protobufcom.squareup.retrofit2:converter-protobuf
  • Wirecom.squareup.retrofit2:converter-wire
  • Simple XMLcom.squareup.retrofit2:converter-simplexml
  • JAXBcom.squareup.retrofit2:converter-jaxb
  • Scalars (primitives, boxed, and String): com.squareup.retrofit2:converter-scalars

比較常見的是 json,可以使用 Gson 或 Jackson 協助轉換。

實作

data class

對應剛剛的兩個 service,分別有不同的 data class

User.java

public class User {
    private long id;
    private String first_name;
    private String last_name;
    private String email;
    // getter, setter, toString
}

UserResponse.java

public class UserResponse {
    private User data;
    // getter, setter, toString
}

Account.java

public class Account {
    private String id;
    private String name;
    private String job;
    private Date createdAt;

    // getter, setter, toString
}

service

UserService.java

import retrofit.data.Account;
import retrofit.data.UserResponse;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;

public interface UserService {
    @GET("/api/users/{id}")
    public Call<UserResponse> getUser(@Path("id") long id);

    @POST("/api/users")
    Call<Account> createUser(@Body Account account);
}

main

RetrofitTest.java

import okhttp3.OkHttpClient;
import retrofit.data.Account;
import retrofit.data.UserResponse;
import retrofit.service.UnsafeOkHttpClient;
import retrofit.service.UserService;
import retrofit2.Call;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.Callback;
import retrofit2.converter.gson.GsonConverterFactory;

public class RetrofitTest {
    public static void sync() {
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder();
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(httpClient.build())
//                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

        UserService service = retrofit.create(UserService.class);

        // Calling '/api/users/2'
        Call<UserResponse> callSync = service.getUser(2);
        try {
            Response<UserResponse> response = callSync.execute();
            UserResponse apiResponse = response.body();
            System.out.println("sync: "+apiResponse);
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        // Calling 'https://reqres.in/api/users'
        Account account = new Account();
        account.setName("John");
        account.setJob("leader");
        Call<Account> callSync2 = service.createUser(account);
        try {
            Response<Account> response2 = callSync2.execute();
            Account apiResponseAccount = response2.body();
            System.out.println(apiResponseAccount);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static void async() {
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder();
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(httpClient.build())
//                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

        UserService service = retrofit.create(UserService.class);

        // Calling '/api/users/2'
        Call<UserResponse> callAsync = service.getUser(2);

        callAsync.enqueue(new Callback<>() {
            @Override
            public void onResponse(Call<UserResponse> call, Response<UserResponse> response) {
                int responseCode = response.code();
                UserResponse user = response.body();
                System.out.println("async responseCode="+responseCode+", result=" + user);
            }

            @Override
            public void onFailure(Call<UserResponse> call, Throwable throwable) {
                System.out.println(throwable);
            }
        });

        // Calling 'https://reqres.in/api/users'
        Account account = new Account();
        account.setName("John");
        account.setJob("leader");
        Call<Account> callAsync2 = service.createUser(account);

        callAsync2.enqueue(new Callback<>() {
            @Override
            public void onResponse(Call<Account> call, Response<Account> response) {
                int responseCode = response.code();
                Account accountResponse = response.body();
                System.out.println("async responseCode="+responseCode+", result=" + accountResponse);
            }

            @Override
            public void onFailure(Call<Account> call, Throwable throwable) {
                System.out.println(throwable);
            }
        });
    }

    public static void main(String[] args) {
        sync();
        System.exit(0);
//        async();
    }
}

注意:這邊特定要呼叫 System.exit(0) 來停掉程式,這是因為 Retrofit 內部使用的 OkHttp 採用了 ThreadPoolExecutor,參考這個網址的 issue 討論:Tomcat is not able to stop because of OkHttp ConnectionPool Issue #5542 · square/okhttp · GitHub ,討論寫說沒有直接停掉的方法。

裡面有說到大約 6mins 後就會 Conenction Pool 停掉。實際上實測,大約等了 5mins。

目前如果要調整這個問題,必須要覆蓋 connectionPool 的原始設定。方法是在產生 OkHttpClient.Builder() 的時候,指定一個新的 ConnectionPool,並將參數 keepAliveDurationMills 改短。

        int maxIdleConnections = 10;
        int keepAliveDurationMills = 1000;
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));

這樣修改,只有在同步呼叫時有用。如果是非同步呼叫,程式還是會等 3mins 才會停下來。

自訂 Http Client

有時候在開發時,https 網站會採用自己產生的 SSL 憑證,這時候需要調整 http client,不檢查 domain 來源

UnsafeOkHttpClient.java

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

import javax.net.ssl.*;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;

public class UnsafeOkHttpClient {

    public static OkHttpClient getUnsafeOkHttpClient() {
        try {
            // Create a trust manager that does not validate certificate chains
            final TrustManager[] trustAllCerts = new TrustManager[]{
                    new X509TrustManager() {
                        @Override
                        public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}

                        @Override
                        public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}

                        @Override
                        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                            return new java.security.cert.X509Certificate[]{};
                        }
                    }
            };

            // Install the all-trusting trust manager
            final SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
            // Create an ssl socket factory with our all-trusting manager
            final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();

            int maxIdleConnections = 10;
            int keepAliveDurationMills = 1000;
            OkHttpClient.Builder builder = new OkHttpClient.Builder()
                    .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));
            builder.sslSocketFactory(sslSocketFactory, (X509TrustManager)trustAllCerts[0]);
            builder.hostnameVerifier(new HostnameVerifier() {
                @Override
                public boolean verify(String hostname, SSLSession session) {
                    return true;
                }
            });

            return builder.build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

在使用時,只需要在產生 Retrofit 時,改用這個 http client builder

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
//                .client(httpClient.build())
                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

Service Generator

可製作 service generator,將產生 service 的程式碼再包裝起來

UserServiceGenerator.java

import okhttp3.OkHttpClient;
import okhttp3.Request;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

public class UserServiceGenerator {
    private static final String BASE_URL = "https://reqres.in/";

    private static Retrofit.Builder builder = new Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addConverterFactory(GsonConverterFactory.create());

    private static Retrofit retrofit = builder.build();

    private static OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
        .connectionPool(new ConnectionPool(10, 1000, TimeUnit.MILLISECONDS));;

    public static <S> S createService(Class<S> serviceClass) {
        return retrofit.create(serviceClass);
    }

    public static <S> S createService(Class<S> serviceClass, final String token ) {
        if ( token != null ) {
            httpClient.interceptors().clear();
            httpClient.addInterceptor( chain -> {
                Request original = chain.request();
                Request request = original.newBuilder()
                        .header("Authorization", token)
                        .build();
                return chain.proceed(request);
            });
            builder.client(httpClient.build());
            retrofit = builder.build();
        }
        return retrofit.create(serviceClass);
    }
}

使用時

UserService service = UserServiceGenerator.createService(UserService.class);

References

Retrofit

Introduction to Retrofit | Baeldung

# Retrofit 操作教學

Retrofit 2 Tutorial: Declarative REST Client for Android

Retrofit2 完全解析 探索与okhttp之间的关系_okhttp retro2-CSDN博客