2024/12/09

Mockito 簡介

在做 Java 專案測試時,常見的方法是使用 JUnit 測試框架。但在實際專案測試時,最常遇到的問題是,準備測試資料。專案程式通常都會連接資料庫,然後透過商業邏輯或一些運算,再將結果存回資料庫。

這在測試時就會遇到很多問題,例如測試案例需要某些特殊的使用者在某個特別的狀態,例如未付款的狀態,然後程式經過了某些付款程序,讓資料變成已付款狀態,又存回資料庫,更新了這個使用者的狀態。這裡遇到的問題是,沒辦法一直產生未付款狀態的使用者,這就導致每一次要測試付款程序時,就要重新準備一次測試資料。

如果該資料跟其他的測試有相依性,又讓這個問題更複雜了。

在使用 JUnit 時,常常會遇到測試的對象裡面因為包含了其他類別的物件,需要先準備/建立這些前置的物件,才能真正地去測試現在想要測試的對象。例如專案中的商業邏輯程式需要使用到資料庫的連線,就必須確實準備一個資料庫,並建立該 DAO 物件,然後才能進行測試。

這跟剛剛提到的測試資料是類似的問題,就是外部資料相依性問題。

假設目前的類別關係如下

flowchart LR
    A --> B --> D & E
    A --> C

因為 A 相依於 B 與 C,故這時候需要製作 B 與 C 的 mock 物件,用來作 A 的測試

flowchart LR
    A --> B[mock of B]
    A --> C[mock of C]

# TDD/BDD and Test Double 這篇文章提到 unit test 必須要

  • 是最小的測試單位

  • 一個案例只能測試一個方法

  • 測試案例之間沒有相依性

  • 沒有外部相依性

  • 不具備邏輯

Mockito 就是用來輔助 JUnit,製作 mock 物件,也就是達成上面所說的沒有外部相依性,也可以讓測試案例之間沒有相依性。Mock 模擬對象 是一種模擬真實對象行為的假的物件,這個假物件可以用來測試其他程式的行為。

單元測試之 mock/stub/spy/fake ? 這邊提到了幾個名詞的差異

  • mock

    模擬的假物件,可讓程式使用 mock 物件驗證商業邏輯或是互動是否正確。mock 有可能會造成測試失敗

  • stub

    也是假物件,但有點替身的意思,跟 mock 類似,是取代真實物件的假物件,使用時,該替身不會造成測試失敗

  • fake

    完全不做事情的假物件,測試僅僅會經過這些物件,但不會做任何驗證,不會造成測試失敗,也就是 stub 的意思

  • dummy

    空物件,只用來填補缺少的參數,或是其他已經測試完成的物件,測試僅僅會經過這些物件,但不會做任何驗證,不會造成測試失敗,也就是 stub 的意思

  • spy

    通常 mock 是製作整個假物件,而 spy 只會偽造類別裡面的某些 method,如果針對該偽造的方法有做驗證測試,就將 spy 視為 mock。如果沒有驗證,那就視為 stub

Mockito

使用 mockito 基本需要了解這三個部分

  • mock

    static method,用來產生 mock 物件

  • when/then

    為剛剛用 mock 產生的物件,自訂其行為,也就是自訂某些 method 回傳的資料

  • verify

    用來檢查 mock 物件的使用狀況

測試準備

測試前,先製作一個要被 mock 的類別

public class DataDAO {
    public String getDataById(String id) {
        return id;
    }

    public int getDataSize() {
        return 0;
    }

    public boolean add(String data) {
        return true;
    }

    public void clear() {
    }
}

引用 libary

        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-core -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>5.12.0</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.mockito/mockito-inline -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-inline</artifactId>
            <version>5.2.0</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
            <version>5.9.1</version>
            <scope>compile</scope>
        </dependency>

mock

  • 用類別定義產生 mock object

    public static <T> T mock(Class<T> classToMock)

  • 用類別名稱產生 mock object 後,指定這個 mock object 的名稱

    public static <T> T mock(Class<T> classToMock, String name)

  • 產生 mock object,自訂 Answer

    public static <T> T mock(Class<T> classToMock, Answer defaultAnswer)

  • 產生 MockSettings 自訂 Answer

    private String randomId() {
        return UUID.randomUUID().toString();
    }
    @Test
    public void mock_test1() {
        // public static <T> T mock(Class<T> classToMock)
        // 用類別定義產生 mock object

        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class);
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );

        // verify 可檢查是否有呼叫 add method
        verify(dataDAOmock).add(anyString());
        // 以 JUnit 檢查 add method 的 return value
        assertFalse(added);
    }

    @Test
    public void mock_test2_name() {
        // public static <T> T mock(Class<T> classToMock, String name)
        // 用類別名稱產生 mock object 後,指定這個 mock object 的名稱

        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class, "test2DataDAOMock");
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );

        TooFewActualInvocations exception = assertThrows(TooFewActualInvocations.class, () -> {
            verify(dataDAOmock, times(2)).add(anyString());
        });
//        exception.printStackTrace();
//        rg.mockito.exceptions.verification.TooFewActualInvocations:
//        test2DataDAOMock.add(<any string>);
//        Wanted 2 times:
//-> at mock.DataDAO.add(DataDAO.java:13)
//        But was 1 time:
//-> at mock.DataDAOMockitoTest.test2(DataDAOMockitoTest.java:41)
//        ......
        assertTrue(exception.getMessage().contains("test2DataDAOMock.add"));
    }

    static class CustomAnswer implements Answer<Boolean> {
        @Override
        public Boolean answer(InvocationOnMock invocation) throws Throwable {
            return false;
        }
    }
    @Test
    public void mock_test3_answer() {
        // public static <T> T mock(Class<T> classToMock, Answer defaultAnswer)
        // 產生 mock object,自訂 Answer
        DataDAO dataDAOmock = mock(DataDAO.class, new CustomAnswer());

        boolean added = dataDAOmock.add( randomId() );

        verify(dataDAOmock).add(anyString());
        assertFalse(added);
    }

    @Test
    public void mock_test4_MockSettings() {
        // 產生 MockSettings 自訂 Answer
        MockSettings customSettings = withSettings().defaultAnswer(new CustomAnswer());

        DataDAO dataDAOmock = mock(DataDAO.class, customSettings);
        boolean added = dataDAOmock.add( randomId() );
        verify(dataDAOmock).add(anyString());
        assertFalse(added);
    }

when/then

  • 當透過這個 mock object 呼叫 method 時,永遠回傳某個值

  • 用 doReturn 方式設定 return 的結果

  • 設定 method 呼叫時,會 throw Excpetion

  • 設定 void return 的 method,會 throw Exception

  • 設定 method 多次呼叫時,有不同的 return 結果

  • 設定 spy 的行為,spy 是對一部分的 method 做 mock

  • 設定呼叫 mock 的某個 method 要呼叫真實的物件的 method

  • 自訂 Answer

    @Test
    public void when_test1() {
        // 利用 mock 產生 DataDAO 的 mock object
        DataDAO dataDAOmock = mock(DataDAO.class);
        // 當透過這個 mock object 呼叫 add method 時,永遠回傳 false
        when(dataDAOmock.add(anyString())).thenReturn(false);

        boolean added = dataDAOmock.add( randomId() );
        assertFalse(added);

        // 用另一種方式設定 return 的結果
        doReturn(false).when(dataDAOmock).add(anyString());
        boolean added2 = dataDAOmock.add( randomId() );
        assertFalse(added2);

        // 設定 method 呼叫時,會 throw Excpetion
        when(dataDAOmock.add(anyString())).thenThrow(IllegalStateException.class);
        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
            dataDAOmock.add( randomId() );
        });

        // 設定 void return 的 method,會 throw Exception
        doThrow(NullPointerException.class).when(dataDAOmock).clear();
        assertThrows(NullPointerException.class, () -> dataDAOmock.clear());

        // 設定 method 多次呼叫時,有不同的 return 結果
        DataDAO dataDAOmock2 = mock(DataDAO.class);
        when(dataDAOmock2.add(anyString()))
                .thenReturn(false)
                .thenThrow(IllegalStateException.class);
        assertThrows(IllegalStateException.class, () -> {
            dataDAOmock2.add( randomId() );
            dataDAOmock2.add( randomId() );
        });

        // 設定 spy 的行為
        // mock 是接管所有物件的 method,但 spy 則是對一部分的 method 做 mock
        DataDAO dataDAO = new DataDAO();
        DataDAO spy = spy(dataDAO);
        doThrow(NullPointerException.class).when(spy).getDataSize();
        assertThrows(NullPointerException.class, () -> spy.getDataSize());
        assertEquals("test", spy.getDataById("test"));

        // 設定呼叫 mock 的某個 method 要呼叫真實的物件的 method
        DataDAO dataDAOmock3 = mock(DataDAO.class);
        when(dataDAOmock3.getDataSize()).thenCallRealMethod();
        assertEquals( 0, dataDAOmock3.getDataSize());

        // 自訂 Answer
        doAnswer(invocation -> "Always the same").when(dataDAOmock3).getDataById(anyString());
        String data = dataDAOmock3.getDataById("1");
        assertEquals("Always the same", data);
    }

verify

  • 檢查是否有呼叫某個 method

  • 檢查呼叫某個 method 的次數

  • 檢查是否沒有使用 mock object

  • 檢查是不是沒有呼叫某個 method

  • 檢查是不是沒有非預期的操作互動 verifyNoMoreInteractions

  • 檢查 呼叫 method 操作的順序

  • 檢查是不是沒有呼叫某個 method

  • 檢查呼叫 method 的次數,至少 或是 最多 幾次

  • 檢查是否有使用某個參數呼叫 method

  • 檢查是否有使用任意參數呼叫 method

  • 利用 argument capture 檢查

    @Test
    public void verify_test1() {
        DataDAO dataDAOmock = mock(DataDAO.class);
        dataDAOmock.getDataSize();
        // 檢查是否有呼叫某個 method
        verify(dataDAOmock).getDataSize();

        // 檢查呼叫某個 method 的次數
        verify(dataDAOmock, times(1)).getDataSize();

        DataDAO dataDAOmock2 = mock(DataDAO.class);
        // 檢查是否沒有使用 mock object
        verifyNoInteractions(dataDAOmock2);
        // 檢查是不是沒有呼叫某個 method
        verify(dataDAOmock2, times(0)).getDataSize();

        // 檢查是不是沒有非預期的操作互動 verifyNoMoreInteractions
        DataDAO dataDAOmock3 = mock(DataDAO.class);
        dataDAOmock3.getDataSize();
        dataDAOmock3.clear();
        verify(dataDAOmock3).getDataSize();
        assertThrows(NoInteractionsWanted.class, () -> verifyNoMoreInteractions(dataDAOmock3));

        // 檢查 呼叫 method 操作的順序
        DataDAO dataDAOmock4 = mock(DataDAO.class);
        dataDAOmock4.getDataSize();
        dataDAOmock4.add("a parameter");
        dataDAOmock4.clear();

        InOrder inOrder = Mockito.inOrder(dataDAOmock4);
        inOrder.verify(dataDAOmock4).getDataSize();
        inOrder.verify(dataDAOmock4).add("a parameter");
        inOrder.verify(dataDAOmock4).clear();

        // 檢查是不是沒有呼叫某個 method
        verify(dataDAOmock4, never()).getDataById("");

        // 檢查呼叫 method 的次數,至少 或是 最多 幾次
        DataDAO dataDAOmock5 = mock(DataDAO.class);
        dataDAOmock5.clear();
        dataDAOmock5.clear();
        dataDAOmock5.clear();
        verify(dataDAOmock5, atLeast(1)).clear();
        verify(dataDAOmock5, atMost(5)).clear();

        // 檢查是否有使用某個參數呼叫 method
        DataDAO dataDAOmock6 = mock(DataDAO.class);
        dataDAOmock6.getDataById("test1");
        verify(dataDAOmock6).getDataById("test1");
        assertThrows(WantedButNotInvoked.class, () -> verify(dataDAOmock6).getDataById("test"));
        // 檢查是否有使用任意參數呼叫 method
        verify(dataDAOmock6).getDataById(anyString());

        // 利用 argument capture 檢查
        DataDAO dataDAOmock7 = mock(DataDAO.class);
        dataDAOmock7.getDataById("someElement");

        ArgumentCaptor<String> argumentCaptor = ArgumentCaptor.forClass(String.class);
        verify(dataDAOmock7).getDataById(argumentCaptor.capture());

        String capturedArgument = argumentCaptor.getValue();
        assertEquals( "someElement", capturedArgument);
    }

Note

在執行測試時,有遇到這樣的錯誤資訊

OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended

原因是 VM warning: Sharing is only supported for boot loader classes · Issue #3111 · mockito/mockito · GitHub

CDS: Class Data Sharing 將一組類別預處理為共享存檔文件,然後可以在運行時進行內存映射以減少啓動時間。主要目的是減少啓動時間。應用程式對於它使用的核心類別的數量越小,節省的啓動時間部分就越大。自 JDK 12 開始,就預先打包了一份預設的 CDS 檔案。

解決方式是加上 JVM 執行參數

-Xshare:off

Reference

Mockito - 維基百科,自由的百科全書

Mockito - mockito-core 5.12.0 javadoc

# SpringBoot - 單元測試工具 Mockito

Mockito 简明教程| waylau.com

Mockito's Mock Methods | Baeldung

Mockito When/Then Cookbook | Baeldung

Mockito Verify Cookbook | Baeldung

2024/12/02

Retrofit RxJava Adapter

Retrofit 的 RxJava Adapter 用於將 Retrofit 的服務呼叫結果轉換為 RxJava 的 Observable、Single、Maybe 或 Completable 等反應式類別。在進行網路請求時,能充分利用 RxJava 的功能,如流控制、非同步處理、錯誤處理等。

https://reqres.in/ 有一個 List User 的 GET method API,以下以這個 API 實作

pom.xml

廚了 Retrofit2 集 RxJava3 以外,增加 adapter-rxjava3

        <!--retrofit-->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-gson</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.squareup.retrofit2/adapter-rxjava3 -->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>adapter-rxjava3</artifactId>
            <version>2.11.0</version>
        </dependency>

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Retrofit

透過網頁服務的 API 定義 service 及發送,回傳的資料

UserData.java

public class UserData {
    private long id;
    private String first_name;
    private String last_name;
    private String email;
// getter, setter, toString
}

UserList.java

public class UserList {
    private int page;
    private int per_page;
    private int total;
    private int total_pages;

    private List<UserData> data;
// getter, setter, toString
}

UserListService.java

注意,這邊的回傳結果改為 Observable<UserList>

public interface UserListService {
    @GET("/api/users")
    Observable<UserList> listUser(@Query("page") int pageno);
}

Retrofit RxJava 測試

RetrofitRxJavaTest.java

public class RetrofitRxJavaTest {
    public static void sync() {
        int maxIdleConnections = 10;
        int keepAliveDurationMills = 1000;
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava3CallAdapterFactory.create())
                .client(httpClient.build())
                .build();

        UserListService service = retrofit.create(UserListService.class);

        Observable<UserList> observable = service.listUser(2);
        observable.subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.single())
//                    .subscribe(
//                            System.out::println
//                    );
                    .subscribe(new DefaultObserver<UserList>() {
                        @Override
                        public void onNext(UserList response) {
                            System.out.println(response);
                        }

                        @Override
                        public void onError(@NonNull Throwable e) {

                        }

                        @Override
                        public void onComplete() {

                        }
                    });
    }

    public static void main(String[] args) {
        System.out.println(new Date());
        sync();

        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

如果程式放在 Android APP,則可以改為在主執行緒,也就是主畫面的執行緒上觀察結果

service.listUser(2).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())

2024/11/25

How to use RxJava

測試如何使用 RxJava

maven pom.xml

加上 rxjava3 的 library

        <!-- rxjava -->
        <dependency>
            <groupId>io.reactivex.rxjava3</groupId>
            <artifactId>rxjava</artifactId>
            <version>3.1.8</version>
        </dependency>

Observable

Observable 可從

這邊是產生 Observable,然後 observer 用 subscribe 註冊 Observable sequence。該 sequence 會發送 items 到 observer,一次發送一個

    public static void test1() {
        // just 會將 item 轉成 Observable
        Observable<String> observerable = Observable.just("Hello", "World");
        // subscribe 註冊 Observable,列印每一個 item
        observerable.subscribe(System.out::println);
    }

observer 有三個 interface

  • OnNext

    當 observerable 發布一個新的 event (item),observer 收到該 item 然後能做一些操作

  • OnComplete

    當 sequence of events 完成時,代表不會再呼叫 OnNext

  • OnError

    如果在 RxJava framework 或是 OnNext 裡面發生 error

    public static void test2() {
        String[] letters = {"a", "b", "c", "d", "e", "f", "g"};
        Observable<String> observable = Observable.fromArray(letters);

        // Java 的 String 是 immutable class,且是 pass by reference
        // 所以這邊如果要儲存結果,不能直接用 String 物件,改用 array 記錄一直改變的 String reference
        String[] result = {""};
        observable.subscribe(
                i -> result[0] += i,  //OnNext
                Throwable::printStackTrace, //OnError
                () -> result[0] += "_Completed" //OnCompleted
        );
        System.out.println(" result[0]="+ result[0]);

        // 也可以用 StringBuilder,這樣就可以一直改變 string 內容
        StringBuilder sb = new StringBuilder();
        observable.subscribe(
                i -> sb.append(i),  //OnNext
                Throwable::printStackTrace, //OnError
                () -> sb.append("_Completed") //OnCompleted
        );
        System.out.println(" sb="+ sb.toString());
        // 結果 abcdefg_Completed

        // 測試 OnError
        // 當處理到 "a",因為無法轉成 integer 會發生 error
        // error 處理為 列印 _err 文字
        String[] letters2 = {"1", "a", "2"};
        Observable<String> observable2 = Observable.fromArray(letters2);
        StringBuilder sb2 = new StringBuilder();
        observable2.subscribe(
                i -> sb2.append(Integer.parseInt(i)),  //OnNext
//                Throwable::printStackTrace, //OnError
                err -> sb2.append("_err"),
                () -> sb2.append("_Completed") //OnCompleted
        );
        System.out.println(" sb2="+ sb2.toString());
        // 結果 1_err
    }

operator

map, flatMap

map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many

    public static void operator_test_map_flatmap() {
        // map: 對每一個 item 執行某個 operation,會回傳任意物件
        String[] letters = {"a", "b", "c", "d"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .map(String::toUpperCase)
                .subscribe(letter -> sb.append(letter));
        System.out.println("operator_test_1 sb="+ sb.toString());

        // flatMap 會回傳結果的 Observable
        // flatMap 不保證 sequence 順序
        // map 是 1-to-1 處理的,但是 flatMap 可以 1-to-many, many-to-many
        Observable<String> observable = Observable.just("Hello", "World");
        observable
            .flatMap(item -> Observable.fromArray(item.split("")))
            .subscribe(System.out::println);
    }

filter

filter 會過濾 item,符合條件的才會通過

    public static void operator_test_filter() {
        // filter 會過濾 item,符合條件的才會通過
        Observable<Integer> observable = Observable.just(1, 2, 3, 4, 5);
        observable.filter(item -> item % 2 == 0)
                .subscribe(System.out::println);
        // 結果是
        // 2
        // 4
    }

scan

scan 會將每次計算的結果,發送給下一個 步驟

    public static void operator_test_scan() {

        // scan 會將每次計算的結果,發送給下一個 步驟
        String[] letters = {"a", "b", "c"};

        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .scan(new StringBuilder(), StringBuilder::append)
                .subscribe(total -> sb.append(total.toString()+" ") );

        System.out.println("operator_test_scan sb="+ sb.toString());
        // a ab abc

        // 累加 1~5
        StringBuilder sb2 = new StringBuilder();
        Observable.range(1,5)
                .scan( (Integer res1, Integer res2) -> res1+res2  )
                .subscribe(total -> sb2.append(total.toString()+" ") );
        System.out.println("operator_test_scan sb="+ sb2.toString());
        // 1 3 6 10 15
    }

groupBy

將 event 分類

ex: 將數字依照單雙數分類

    public static void operator_test_groupby() {
        String[] nums = {"", ""};
        Observable.range(1,5)
                .groupBy(i -> 0 == (i % 2) ? "EVEN" : "ODD")
                .subscribe(group ->
                        group.subscribe((number) -> {
                            if (group.getKey().toString().equals("EVEN")) {
                                nums[0] += number+" ";
                            } else {
                                nums[1] += number+" ";
                            }
                        })
                );
        System.out.println("operator_test_groupby nums="+ Arrays.deepToString(nums));
        // 結果  operator_test_groupby nums=[2 4 , 1 3 5 ]
    }

conditional

defaultIfEmpty

first

takeWhile

    public static void operator_test_conditional() {
        // empty() 可產生一個 空的 Observable
        // defaultIfEmpty() 在 observable 空的時候有作用
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .subscribe(System.out::println);

        // first  發出第一個來源資料,如果沒有則發送預設值。
        String[] letters = {"a", "b", "c"};
        StringBuilder sb = new StringBuilder();
        Observable.fromArray(letters)
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb.append(res) );
        System.out.println("operator_test_conditional sb="+sb);
        // operator_test_conditional sb=a

        StringBuilder sb2 = new StringBuilder();
        Observable.empty()
                .defaultIfEmpty("Observable is empty")
                .first("default first")
                .subscribe( res -> sb2.append(res) );
        System.out.println("operator_test_conditional empty sb2="+sb2);
        // operator_test_conditional empty sb2=Observable is empty

        // takeWhile 只在滿足條件時,取用此 item,否則就丟棄該 item
        int[] sum = {0};
        Observable.range(1,10)
                .takeWhile(i -> i < 5)
                .subscribe(s -> sum[0] += s);
        System.out.println("operator_test_conditional takeWhile sum="+sum[0]);
        // operator_test_conditional takeWhile sum=10
    }

combining

    public static void operator_test_combining() {
        // merge: 將多個 Observables 合併成一個
        Observable<String> observable1 = Observable.just("Hello");
        Observable<String> observable2 = Observable.just("World");
        Observable.merge(observable1, observable2)
                .subscribe(System.out::println);
        //Hello
        //World

        // zip: 以某個方式,合併多個 Observables
        Observable.zip(observable1, observable2, (item1, item2) -> item1 + " " + item2)
                .subscribe(System.out::println);
        //Hello World

        // startWith: 將指定的 Observable 合併到另一個 的開頭
        Observable<String> names = Observable.just("Project", "zero");
        Observable<String> otherNames = Observable.just("Git", "Code");
        names.startWith(otherNames).subscribe(System.out::println);
        //Git
        //Code
        //Project
        //zero
    }

Operators Category

ReactiveX - Operators 這邊可查詢到所有 ReactiveX 的 operators 定義

Alphabetical List of Observable Operators · ReactiveX/RxJava Wiki · GitHub 這邊是 RxJava 的 operators

依照 ReactiveX 的分類有

  • Creating

  • Transforming

  • Filtering

  • Combining

  • Observable Utility

  • Conditional and Boolean

  • Mathematical and Aggregate

  • Backpressure

  • Connectable Observable

  • Operators to Convert

Scheduler

subscribeOn: 指定 Observable 的執行緒,只能寫一次,影響 Observable 的執行。會影響從訂閱開始到第一個 observeOn 的所有操作

observeOn: 在哪個執行緒上觀察 Observable 的結果,可以寫很多次切換執行緒。

    public static void test_scheduler() {
        printCurrentThread("start");
        Observable<Object> observable = Observable.just("Hello", "World");
        observable
                .map(s -> {
                    printCurrentThread("first map: "+s);
                    return s;
                })
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.io())
                .map(s -> {
                    printCurrentThread("second map: "+s);
                    return s;
                })
                .observeOn(Schedulers.single())
                .subscribe(
                    s -> {
                        printCurrentThread("subscribe: "+s);
                    }
                );

        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
//        Thread: main, start
//        Thread: RxComputationThreadPool-1, first map: Hello
//        Thread: RxCachedThreadScheduler-1, second map: Hello
//        Thread: RxComputationThreadPool-1, first map: World
//        Thread: RxCachedThreadScheduler-1, second map: World
//        Thread: RxSingleScheduler-1, subscribe: Hello
//        Thread: RxSingleScheduler-1, subscribe: World
    }

    private static void printCurrentThread(String message) {
        System.out.format("Thread: %s, %s%n", Thread.currentThread().getName(), message);
    }

執行結果

Thread: main, start
Thread: RxComputationThreadPool-1, first map: Hello
Thread: RxCachedThreadScheduler-1, second map: Hello
Thread: RxComputationThreadPool-1, first map: World
Thread: RxCachedThreadScheduler-1, second map: World
Thread: RxSingleScheduler-1, subscribe: Hello
Thread: RxSingleScheduler-1, subscribe: World

Error Handling

observer 有三個 interface: OnNext, OnError, OnComplete

    public static void test_error() {
        Observable<String> observable = Observable.just("Hello", "World")
                .map(item -> {
                    if (item.equals("World")) {
                        throw new RuntimeException("Error occurred");
                    }
                    return item;
                });

        observable.subscribe(
                System.out::println,
                error -> System.err.println("Error: " + error.getMessage()),
                () -> System.out.println("Completed!")
        );
        //Hello
        //Error: Error occurred
    }

References

Introduction to RxJava | Baeldung

Guide to RxJava in Java

RxJava3 用法 - petercao - 博客园

2024/11/18

RxJava

RxJava 是處理 observable 序列的非同步,事件導向函式庫。實作概念來自 ReactiveX,以非同步方式處理 observable stream 的 API。程式是以 Observer, Iterator Design Pattern 實作,並用 functional programming 的 style 撰寫程式。

RxJava 是 lighweight library,且支援多個 JVM 為基礎的程式語言:Groovy, Clojure, JRuby, Kotlin, Scala。

Base Classes

常用術語

upstream downstream

有一個來源的 dataflow,然後可讓資料經過幾個中間的步驟處理。

source.operator1().operator2().operator3().subscribe(consumer);

// 折行
source
  .operator1()
  .operator2()
  .operator3()
  .subscribe(consumer);

source.flatMap(value -> source.operator1().operator2().operator3());

對於 operator2 來說,左邊的資料來源是 upstream,右邊的 subscriber/consumer 是 downstream。

Objects in motion

RxJava 文件中,只要看到 emission, emits, item, event, signal, data, message,都可視為同義字,都代表通過 data flow 的 obejct 物件。

Backpressure

當資料流已非同步方式經過處理步驟時,每一個步驟可能會有不同的處理速度,為了避免速度太快而造成需要臨時的 buffer,或要跳過/刪除資料增加的記憶體使用量,backpressure 可因應這個問題,在每個步驟不知道 upstream 要送幾個資料過來的情況下,控制流程,限制資料流記憶體使用量。

io.reactivex.rxjava3.core.Flowable 類別就支援 backpressure。

io.reactivex.rxjava3.core.Observable 不支援 backpressure,適用於短的 sequence,GUI 互動。

Single, Maybe, Completable 都不支援 backpressure,因為一定會有空間只放一個資料。

Assembly time

dataflow 準備要套用中間的 operators 的時候,就稱為 assembly time

Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0);

目前資料還在準備階段,還沒有開始處理。

Subscription time

這是當 subscriber() 被呼叫時的暫時狀態,會在內部建立處理步驟鍊。

flow.subscribe(System.out::println)

在這個狀態下,資料來源會開始慢慢地釋放資料項目。

Runtime

在執行階段,資料來源持續釋放資料項目、錯誤或是完成的訊號

Observable.create(emitter -> {
     while (!emitter.isDisposed()) {
         long time = System.currentTimeMillis();
         emitter.onNext(time);
         if (time % 2 != 0) {
             emitter.onError(new IllegalStateException("Odd millisecond!"));
             break;
         }
     }
})
.subscribe(System.out::println, Throwable::printStackTrace);

也就是執行上面這一段程式核心的時候


Simple background computation

RxJava 最常用來在另一個背景 thread 執行某些計算,遠端網路存取,最後在 UI thread 顯示結果或是錯誤。

import io.reactivex.rxjava3.schedulers.Schedulers;

Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
})
  .subscribeOn(Schedulers.io())
  .observeOn(Schedulers.single())
  .subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000); // <--- wait for the flow to finish

coding style 類似 Builder Pattern,稱為 fluent API。

RxJava 的 reacive type 是 immutable 的,每一個 method call 都會會傳一個處理完成的新的 Flowable。

Flowable<String> source = Flowable.fromCallable(() -> {
    Thread.sleep(1000); //  imitate expensive computation
    return "Done";
});

Flowable<String> runBackground = source.subscribeOn(Schedulers.io());

Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());

showForeground.subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(2000);

可以透過 subscribeOn 將計算量大,或是 blocking IO 的部分,移到另一個 thread 執行,當資料完成且 ready,可以透過 observeOn 在前景或是 GUI thread 處理結果。

Scheduler

RxJava 的 operators 不能在 Thread 或 ExecutorService 直接執行,而是要透過 Scheduler 呼叫。Scheduler 封裝了 concurrency。RxJava 的幾個標準的 Scheduler 可透過 Scheduler utility 存取

  • Schedulers.computation()

    在背景有固定數量的 threads 用來執行 operators,大部分的非同步 operators 使用這個 default Scheduler

  • Schedulers.io()

    跟 IO 相關的 或是 blocking operations

  • Schedulers.single()

    以 sequntial 與 FIFO 方式在單一 thread 運作

  • Schedulers.trampoline()

    以 sequential 與 FIFO 方式,在特殊的 threads 運作,通常用在測試

這幾個 schedulers 在所有 JVM 都可以使用。但特殊平台,例如 Android,另外有內建 schedulers: AndroidSchedulers.mainThread()、SwingScheduler.instance() 或 JavaFXScheduler.platform()。

可利用 Schedulers.from(Executor) 將現有的 Executor 封裝為 Scheduler。這可用來建立一個固定,且較大的 thread pool

測試程式裡面的 Thread.sleep(2000),是因為 Scheduler 都是運作在 daemon thread,一旦 main thread 停止,這些 daemon threads 也會同時停止正在背景運作的程式。因此需要在 main thread 稍微 sleep 一段時間,讓 console 能夠等待執行結果。

Concurreny with a flow

RxJava 的 Flows 內部天生就是 sequential,但每一個 flow 可以各自獨立平行運作

Flowable.range(1, 10)
  .observeOn(Schedulers.computation())
  .map(v -> v * v)
  .blockingSubscribe(System.out::println);

這是對 110 的數字進行平方運算,最後在 main thread (呼叫 blockingSubscribe 的 thread) 處理結果。map(v -> v * v) 並不是以 concurrent 方式運作,他是在 computation() thread,依照順序接收 110

Parallel Processing

數字平方例子如果要平行運算

Flowable.range(1, 10)
  .flatMap(v ->
      Flowable.just(v)
        .subscribeOn(Schedulers.computation())
        .map(w -> w * w)
  )
  .blockingSubscribe(System.out::println);

RxJava 的平行運算,代表分別獨立的 flows,並在最後整合到一個 flow。flatMap 就是將每個數字提供給各自獨立的 Flowable,最後合併到 main thread

flatMap 無法保證執行順序,另外有其他替代的 operators

  • concatMap

    一次執行一個 inner flow

  • concatMapEager

    一次執行所有內部的 flows,但用 inner flows 建立的順序輸出結果

Flowable.parallel() 跟 ParallelFlowable 也可以達到 parallel processing 的目的

Flowable.range(1, 10)
  .parallel()
  .runOn(Schedulers.computation())
  .map(v -> v * v)
  .sequential()
  .blockingSubscribe(System.out::println);

Dependent sub-flows

flatMap 的用途

ex: 如果有一個 service 會回傳 Flowable,我們可用第一個 service 的值,呼叫另一個 service

Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();

inventorySource
    .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
            .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
    .subscribe(System.out::println);

Continuations

當出現某個 item 時,需要執行對應的運算。

Dependent

最常見的例子:給定某個 value,執行另一個 service,並等待該 service 的結果繼續下去。

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

另一個例子: flatMap 裡面對每個 value,呼叫另一個 service

service.apiCall()
.flatMap(value ->
    service.anotherApiCall(value)
    .flatMap(next -> service.finalCallBoth(value, next))
)

這段程式碼實現了三個連續的 API 呼叫,每個 API 呼叫結果都依賴於前一個呼叫的結果。

  • apiCall 發出 value
  • anotherApiCall(value) 發出 next
  • finalCallBoth(value, next) 使用這兩個結果來進行最後的調用。

Non-dependent

flatMapSingle 裡面的 ignored 是忽略的來源,全部都對應到 someSingleSource

Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
  .subscribe(System.out::println, Throwable::printStackTrace);

另一種方法,改用 Completable 中介,然後用 andThen 執行其他操作

sourceObservable
  .ignoreElements()           // returns Completable
  .andThen(someSingleSource)
  .map(v -> v.toString())

Deferred-dependent

有時候,sequence 前後可能有隱藏的相依性。

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.just(count.get()))
  .subscribe(System.out::println);

結果會是 0,因為 Single.just(count.get()) 在 assembly time 就決定了結果,因此必須要延遲 Single 的計算到 runtime

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.defer(() -> Single.just(count.get())))
  .subscribe(System.out::println);

或是寫成

AtomicInteger count = new AtomicInteger();

Observable.range(1, 10)
  .doOnNext(ignored -> count.incrementAndGet())
  .ignoreElements()
  .andThen(Single.fromCallable(() -> count.get()))
  .subscribe(System.out::println);

Type conversion

有時可能會遇到 source/service 回傳了跟 flow 所需要的類別不同的狀況。有兩種解決方式:轉換為需要的類別 或是 找到支援該類別的另一個 operator

Converting to the desired type

Flowable Observable Single Maybe Completable
Flowable toObservable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Observable toFlowable first, firstOrError, single, singleOrError, last, lastOrError firstElement, singleElement, lastElement ignoreElements
Single toFlowable toObservable toMaybe ignoreElements
Maybe toFlowable toObservable toSingle ignoreElements
Completable toFlowable toObservable toSingle toMaybe

Using an overload with the desired type

Operator Overloads
flatMap flatMapSingleflatMapMaybeflatMapCompletableflatMapIterable
concatMap concatMapSingleconcatMapMaybeconcatMapCompletableconcatMapIterable
switchMap switchMapSingleswitchMapMaybeswitchMapCompletable

Operator naming conventions

operator 命名的習慣

Unusable keywords

因為 Rx.NET 發出一個 item,稱為 Return(T) ,這跟 java 的 return 關鍵字有衝突,所以 RxJava 改為 just(T)

類似狀況是 Switch 被命名為 switchOnNext

Catch 命名為 onErrorResumeNext

Type erasure

在 operator 回傳特定類別時,會將該類別加在 operator 名稱後面

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)

Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper)

Type ambiguities

有幾個 concatWith 會 overloading 接受不同類別的參數,用在 lambda 可能會造成困擾

Flowable<T> concatWith(Publisher<? extends T> other);

Flowable<T> concatWith(SingleSource<? extends T> other);

ex: 以下程式無法編譯

someSource.concatWith(s -> Single.just(2))
.subscribe(System.out::println, Throwable::printStackTrace);

這時候,需要延遲計算直到 someSource 完成,所以要用 defer

someSource.concatWith(Single.defer(() -> Single.just(2)))
.subscribe(System.out::println, Throwable::printStackTrace);

有時候為避免邏輯的模糊問題,會增加 suffix

suffix 可避免 logical ambiguities,但可能在 flow 產生錯誤的類別

Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> mergeArray(Publisher<? extends T>... sources);

Error Handling

flow 失敗時,會發生 error,可能會發生多個來源失敗,這時可選擇是不是要等所有來源完成或失敗。operator 後面會加上 DelayError

Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources);

Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources);

多個 suffix 可能同時出現

Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources);

Base class vs base type

Type Class Interface Consumer
0..N backpressured Flowable Publisher Subscriber
0..N unbounded Observable ObservableSource Observer
1 element or error Single SingleSource SingleObserver
0..1 element or error Maybe MaybeSource MaybeObserver
0 element or error Completable CompletableSource CompletableObserver

References

GitHub - ReactiveX/RxJava: RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.

2024/11/11

Method Reference

因為 Java lambda expression 語法,需要有一種語法,可以引用 method,但不直接呼叫該 method。Method Reference 就是利用 :: 雙冒號,讓 Java 可以做到這件事。

Method Reference 有四種

Kind Syntax Examples
參考 class 的 static method *ContainingClass*::*staticMethodName* Person::compareByAge
MethodReferencesExamples::appendStrings
參考一個 object instance 的 method *containingObject*::*instanceMethodName* myComparisonProvider::compareByName
myApp::appendStrings2
參考特定類別,任意物件的 instance method *ContainingType*::*methodName* String::compareToIgnoreCase
String::concat
參考 constructor *ClassName*::new HashSet::new

物件比較範例

首先定義一個 Person.java data class

import java.util.Date;

public class Person {
    private String name;
    private int age;
    private Date birthday;

    public static int compareByAge(Person a, Person b) {
        return a.birthday.compareTo(b.birthday);
    }

    public Person(String name, int age, Date birthday) {
        this.name = name;
        this.age = age;
        this.birthday = birthday;
    }

    public String getName() {
        return name;
    }

    public int getAge() {
        return age;
    }

    public Date getBirthday() {
        return birthday;
    }

    @Override
    public String toString() {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                ", birthday=" + birthday +
                '}';
    }
}

傳統的物件比較,會用 comparator 來實作。目前可以進化改用 lambda expression,或是 method reference

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.stream.Stream;

public class PersonExample {
    public static Person[] newArray() {
        try {
            SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd", Locale.ENGLISH);
            Person a = new Person("A", 10, formatter.parse("2014-02-01"));
            Person b = new Person("B", 20, formatter.parse("2004-03-01"));
            Person[] rosterAsArray = {a, b};
            return rosterAsArray;
        } catch (ParseException e) {
            throw new RuntimeException(e);
        }
    }

    public static void test1_comparator() {
        Person[] rosterAsArray = newArray();
        // 定義 Comparator
        class PersonAgeComparator implements Comparator<Person> {
            public int compare(Person a, Person b) {
                return a.getBirthday().compareTo(b.getBirthday());
            }
        }
        Arrays.sort(rosterAsArray, new PersonAgeComparator());
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test2_lambda() {
        Person[] rosterAsArray = newArray();
        // 使用 lambda expression
        Arrays.sort(rosterAsArray,
                (Person p1, Person p2) -> {
                    return p1.getBirthday().compareTo(p2.getBirthday());
                }
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test3_static_method() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method
        Arrays.sort(rosterAsArray,
                (p3, p4) -> Person.compareByAge(p3, p4)
        );
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void test4_static_method_reference() {
        Person[] rosterAsArray = newArray();
        // 使用 Person 裡面已經定義的 static method reference
        Arrays.sort(rosterAsArray, Person::compareByAge);
        Stream.of(rosterAsArray).forEach(System.out::println);
        System.out.println("");
    }

    public static void main(String... args) {
        test1_comparator();
        test2_lambda();
        test3_static_method();
        test4_static_method_reference();
    }
}

MethodReference 測試

以下例子利用上面的前三種 Method Reference 方式,列印 "Hello World!",第一個是直接使用 lambda expression 實作。

import java.util.function.BiFunction;

public class MethodReferencesExamples {

    public static <T> T mergeThings(T a, T b, BiFunction<T, T, T> merger) {
        return merger.apply(a, b);
    }

    public static String appendStrings(String a, String b) {
        return a + b;
    }

    public String appendStrings2(String a, String b) {
        return a + b;
    }

    public static void main(String[] args) {

        MethodReferencesExamples myApp = new MethodReferencesExamples();

        // Calling the method mergeThings with a lambda expression
        // 以 lambda expression 呼叫 mergeThings
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", (a, b) -> a + b));

        // Reference to a static method
        // static method 的 Method Reference
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", MethodReferencesExamples::appendStrings));

        // Reference to an instance method of a particular object
        // 參考一個 object instance 的 method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", myApp::appendStrings2));

        // Reference to an instance method of an arbitrary object of a
        // particular type
        // 參考特定類別,任意物件的 instance method
        System.out.println(MethodReferencesExamples.
                mergeThings("Hello ", "World!", String::concat));
    }
}

Reference

Method References

2024/10/28

Retrofit

因為現今網路服務最常見的就是用 http 協定提供,一般資料面向的服務,也是以 JSON 資料格式作為輸入與輸出的資料格式。Retrofit 定義自己是 type-safe HTTP client for Android and Java。這邊 type-safe 的意思是,透過 Retrofit 的包裝,可以自動將 http request 與 response 資料轉換為 java 類別物件,在自動轉換的過程中,就能以類別的定義,去檢查資料是不是符合類別的定義,而不是未經定義的其他資料。因為是遠端網路服務,Retrofit 提供了同步與非同步兩種使用 http 服務的方式,非同步的呼叫方法可以解決網路延遲,甚至是故障的問題。

測試 http 服務

網路上提供 Fake HTTP API service,有以下網站可以用

GET https://reqres.in/api/users/2

{
    "data": {
        "id": 2,
        "email": "janet.weaver@reqres.in",
        "first_name": "Janet",
        "last_name": "Weaver",
        "avatar": "https://reqres.in/img/faces/2-image.jpg"
    },
    "support": {
        "url": "https://reqres.in/#support-heading",
        "text": "To keep ReqRes free, contributions towards server costs are appreciated!"
    }
}

POST https://reqres.in/api/users

輸入參數

{
    "name": "John",
    "job": "leader"
}

輸出

{
    "name": "morpheus",
    "job": "leader",
    "id": "63",
    "createdAt": "2024-05-22T03:31:37.941Z"
}

Maven pom.xml

在 xml 裡面引用 retrofits 以及 converter-gson 的 library

        <!--retrofit-->
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>retrofit</artifactId>
            <version>2.11.0</version>
        </dependency>
        <dependency>
            <groupId>com.squareup.retrofit2</groupId>
            <artifactId>converter-gson</artifactId>
            <version>2.11.0</version>
        </dependency>

同步/非同步

透過 service 產生的 API Call 有 同步/非同步 兩種呼叫方式

  • execute() – Synchronously send the request and return its response.
  • enqueue(retrofit2.Callback) – Asynchronously send the request and notify callback of its response or if an error occurred talking to the server, creating the request, or processing the response.

Converter

retrofit2 的 conveter 是用在 http request 與 response 的資料轉換,目前支援這些格式

  • Gsoncom.squareup.retrofit2:converter-gson
  • Jacksoncom.squareup.retrofit2:converter-jackson
  • Moshicom.squareup.retrofit2:converter-moshi
  • Protobufcom.squareup.retrofit2:converter-protobuf
  • Wirecom.squareup.retrofit2:converter-wire
  • Simple XMLcom.squareup.retrofit2:converter-simplexml
  • JAXBcom.squareup.retrofit2:converter-jaxb
  • Scalars (primitives, boxed, and String): com.squareup.retrofit2:converter-scalars

比較常見的是 json,可以使用 Gson 或 Jackson 協助轉換。

實作

data class

對應剛剛的兩個 service,分別有不同的 data class

User.java

public class User {
    private long id;
    private String first_name;
    private String last_name;
    private String email;
    // getter, setter, toString
}

UserResponse.java

public class UserResponse {
    private User data;
    // getter, setter, toString
}

Account.java

public class Account {
    private String id;
    private String name;
    private String job;
    private Date createdAt;

    // getter, setter, toString
}

service

UserService.java

import retrofit.data.Account;
import retrofit.data.UserResponse;
import retrofit2.Call;
import retrofit2.http.Body;
import retrofit2.http.GET;
import retrofit2.http.POST;
import retrofit2.http.Path;

public interface UserService {
    @GET("/api/users/{id}")
    public Call<UserResponse> getUser(@Path("id") long id);

    @POST("/api/users")
    Call<Account> createUser(@Body Account account);
}

main

RetrofitTest.java

import okhttp3.OkHttpClient;
import retrofit.data.Account;
import retrofit.data.UserResponse;
import retrofit.service.UnsafeOkHttpClient;
import retrofit.service.UserService;
import retrofit2.Call;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.Callback;
import retrofit2.converter.gson.GsonConverterFactory;

public class RetrofitTest {
    public static void sync() {
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder();
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(httpClient.build())
//                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

        UserService service = retrofit.create(UserService.class);

        // Calling '/api/users/2'
        Call<UserResponse> callSync = service.getUser(2);
        try {
            Response<UserResponse> response = callSync.execute();
            UserResponse apiResponse = response.body();
            System.out.println("sync: "+apiResponse);
        } catch (Exception ex) {
            ex.printStackTrace();
        }

        // Calling 'https://reqres.in/api/users'
        Account account = new Account();
        account.setName("John");
        account.setJob("leader");
        Call<Account> callSync2 = service.createUser(account);
        try {
            Response<Account> response2 = callSync2.execute();
            Account apiResponseAccount = response2.body();
            System.out.println(apiResponseAccount);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }

    public static void async() {
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder();
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
                .client(httpClient.build())
//                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

        UserService service = retrofit.create(UserService.class);

        // Calling '/api/users/2'
        Call<UserResponse> callAsync = service.getUser(2);

        callAsync.enqueue(new Callback<>() {
            @Override
            public void onResponse(Call<UserResponse> call, Response<UserResponse> response) {
                int responseCode = response.code();
                UserResponse user = response.body();
                System.out.println("async responseCode="+responseCode+", result=" + user);
            }

            @Override
            public void onFailure(Call<UserResponse> call, Throwable throwable) {
                System.out.println(throwable);
            }
        });

        // Calling 'https://reqres.in/api/users'
        Account account = new Account();
        account.setName("John");
        account.setJob("leader");
        Call<Account> callAsync2 = service.createUser(account);

        callAsync2.enqueue(new Callback<>() {
            @Override
            public void onResponse(Call<Account> call, Response<Account> response) {
                int responseCode = response.code();
                Account accountResponse = response.body();
                System.out.println("async responseCode="+responseCode+", result=" + accountResponse);
            }

            @Override
            public void onFailure(Call<Account> call, Throwable throwable) {
                System.out.println(throwable);
            }
        });
    }

    public static void main(String[] args) {
        sync();
        System.exit(0);
//        async();
    }
}

注意:這邊特定要呼叫 System.exit(0) 來停掉程式,這是因為 Retrofit 內部使用的 OkHttp 採用了 ThreadPoolExecutor,參考這個網址的 issue 討論:Tomcat is not able to stop because of OkHttp ConnectionPool Issue #5542 · square/okhttp · GitHub ,討論寫說沒有直接停掉的方法。

裡面有說到大約 6mins 後就會 Conenction Pool 停掉。實際上實測,大約等了 5mins。

目前如果要調整這個問題,必須要覆蓋 connectionPool 的原始設定。方法是在產生 OkHttpClient.Builder() 的時候,指定一個新的 ConnectionPool,並將參數 keepAliveDurationMills 改短。

        int maxIdleConnections = 10;
        int keepAliveDurationMills = 1000;
        OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
                .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));

這樣修改,只有在同步呼叫時有用。如果是非同步呼叫,程式還是會等 3mins 才會停下來。

自訂 Http Client

有時候在開發時,https 網站會採用自己產生的 SSL 憑證,這時候需要調整 http client,不檢查 domain 來源

UnsafeOkHttpClient.java

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

import javax.net.ssl.*;
import java.security.cert.CertificateException;
import java.util.concurrent.TimeUnit;

public class UnsafeOkHttpClient {

    public static OkHttpClient getUnsafeOkHttpClient() {
        try {
            // Create a trust manager that does not validate certificate chains
            final TrustManager[] trustAllCerts = new TrustManager[]{
                    new X509TrustManager() {
                        @Override
                        public void checkClientTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}

                        @Override
                        public void checkServerTrusted(java.security.cert.X509Certificate[] chain, String authType) throws CertificateException {}

                        @Override
                        public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                            return new java.security.cert.X509Certificate[]{};
                        }
                    }
            };

            // Install the all-trusting trust manager
            final SSLContext sslContext = SSLContext.getInstance("SSL");
            sslContext.init(null, trustAllCerts, new java.security.SecureRandom());
            // Create an ssl socket factory with our all-trusting manager
            final SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();

            int maxIdleConnections = 10;
            int keepAliveDurationMills = 1000;
            OkHttpClient.Builder builder = new OkHttpClient.Builder()
                    .connectionPool(new ConnectionPool(maxIdleConnections, keepAliveDurationMills, TimeUnit.MILLISECONDS));
            builder.sslSocketFactory(sslSocketFactory, (X509TrustManager)trustAllCerts[0]);
            builder.hostnameVerifier(new HostnameVerifier() {
                @Override
                public boolean verify(String hostname, SSLSession session) {
                    return true;
                }
            });

            return builder.build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

在使用時,只需要在產生 Retrofit 時,改用這個 http client builder

        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("https://reqres.in/")
                .addConverterFactory(GsonConverterFactory.create())
//                .client(httpClient.build())
                .client(UnsafeOkHttpClient.getUnsafeOkHttpClient())
                .build();

Service Generator

可製作 service generator,將產生 service 的程式碼再包裝起來

UserServiceGenerator.java

import okhttp3.OkHttpClient;
import okhttp3.Request;
import retrofit2.Retrofit;
import retrofit2.converter.gson.GsonConverterFactory;

public class UserServiceGenerator {
    private static final String BASE_URL = "https://reqres.in/";

    private static Retrofit.Builder builder = new Retrofit.Builder()
            .baseUrl(BASE_URL)
            .addConverterFactory(GsonConverterFactory.create());

    private static Retrofit retrofit = builder.build();

    private static OkHttpClient.Builder httpClient = new OkHttpClient.Builder()
        .connectionPool(new ConnectionPool(10, 1000, TimeUnit.MILLISECONDS));;

    public static <S> S createService(Class<S> serviceClass) {
        return retrofit.create(serviceClass);
    }

    public static <S> S createService(Class<S> serviceClass, final String token ) {
        if ( token != null ) {
            httpClient.interceptors().clear();
            httpClient.addInterceptor( chain -> {
                Request original = chain.request();
                Request request = original.newBuilder()
                        .header("Authorization", token)
                        .build();
                return chain.proceed(request);
            });
            builder.client(httpClient.build());
            retrofit = builder.build();
        }
        return retrofit.create(serviceClass);
    }
}

使用時

UserService service = UserServiceGenerator.createService(UserService.class);

References

Retrofit

Introduction to Retrofit | Baeldung

# Retrofit 操作教學

Retrofit 2 Tutorial: Declarative REST Client for Android

Retrofit2 完全解析 探索与okhttp之间的关系_okhttp retro2-CSDN博客

2024/10/21

Java Json library: Jackson, Gson Tree Model

因為 Json 文件本身就是一個樹狀結構,處理 JSON 的 libary 都有對應可處理每一個 json property node 的工具,以下記錄如何使用 Jackson 與 Gson,對 json 做 pretty print,parsing 每個節點,新增/移除節點的方法。

pom

在 pom.xml 加上兩個 libary 的來源

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.10.1</version>
        </dependency>

Gson

package json;
import com.google.gson.*;

public class GsonTester {
    public static void main(String args[]) {

        String jsonString =
                "{\"name\":\"John Lin\", \"age\":21,\"verified\":false,\"geo\": [100.11,90.85]}";
//        System.out.println("jsonString="+jsonString);

        // 以 JsonParser parsing 後,取得 JsonObject
        JsonObject details = JsonParser.parseString(jsonString).getAsJsonObject();

        //************/
        // pretty print json string
//            {
//                "name" : "John Lin",
//                "age" : 21,
//                "verified" : false,
//                "geo" : [
//                    100.11,
//                    90.85
//                ]
//            }
//        System.out.println(details.toString());

        System.out.println();
        System.out.println("*** original Json");
        Gson gson = new GsonBuilder().setPrettyPrinting().create();
        String jsonOutput = gson.toJson(details);
        System.out.println(jsonOutput);

        //************/
        // parsing json
        System.out.println();
        System.out.println("*** parsing Json");
        if (details.isJsonObject()) {
            // JsonElement 對應 "name":"John Lin"
            JsonElement nameNode = details.get("name");
            System.out.println("Name: " +nameNode.getAsString());

            JsonElement ageNode = details.get("age");
            System.out.println("Age: " + ageNode.getAsInt());

            JsonElement verifiedNode = details.get("verified");
            System.out.println("Verified: " + (verifiedNode.getAsBoolean() ? "Yes":"No"));

            // JsonArray 對應 [100.11,90.85]
            JsonArray geoNode = details.getAsJsonArray("geo");
            System.out.print("geo: ");
            for (int i = 0; i < geoNode.size(); i++) {
                JsonPrimitive value = geoNode.get(i).getAsJsonPrimitive();
                System.out.print(value.getAsFloat() + " ");
            }
            System.out.println();
        }

        //************/
        // add/remove property
        System.out.println();
        System.out.println("*** new Json After add/remove property");
        details.addProperty("school", "Tsing-Hua");
        details.remove("verified");
        String jsonOutput2 = gson.toJson(details);
        System.out.println(jsonOutput2);
    }
}

Jackson

package json;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonPrimitive;

public class JacksonTest {
    public static void main(String args[]) {
        try {
            String jsonString =
                    "{\"name\":\"John Lin\", \"age\":21,\"verified\":false,\"geo\": [100.11,90.85]}";
            ObjectMapper mapper = new ObjectMapper();
            JsonNode jsonObject = mapper.readTree(jsonString);

            //************/
            // pretty print json string
//            {
//                "name" : "John Lin",
//                    "age" : 21,
//                    "verified" : false,
//                    "geo" : [ 100.11, 90.85 ]
//            }
            System.out.println();
            System.out.println("*** original Json");
            String prettyJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject);
            System.out.println(prettyJson);

            //************/
            // parsing json
            System.out.println();
            System.out.println("*** parsing Json");
            JsonNode jsonNodeName = jsonObject.get("name");
            System.out.println("Name: " +jsonNodeName.asText() );

            JsonNode jsonNodeAge = jsonObject.get("age");
            System.out.println("Age: " + jsonNodeAge.asInt());

            JsonNode jsonNodeVerified = jsonObject.get("verified");
            System.out.println("Verified: " + (jsonNodeVerified.asBoolean() ? "Yes":"No"));

            JsonNode jsonNodeGeo = jsonObject.get("geo");
            System.out.print("geo: ");
            for (int i = 0; i < jsonNodeGeo.size(); i++) {
                double value = jsonNodeGeo.get(i).asDouble();
                System.out.print(value + " ");
            }
            System.out.println();

            //************/
            // add/remove property
            System.out.println();
            System.out.println("*** new Json After add/remove property");
            ObjectNode jsonObject2 = ((ObjectNode) jsonObject).put("school", "Tsing-Hua");
            JsonNode removedNode = jsonObject2.remove("verified" );
            String prettyJson2 = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonObject2);
            System.out.println(prettyJson2);

        } catch (JsonProcessingException e1) {

        }
    }
}

結果

*** original Json
{
  "name" : "John Lin",
  "age" : 21,
  "verified" : false,
  "geo" : [ 100.11, 90.85 ]
}

*** parsing Json
Name: John Lin
Age: 21
Verified: No
geo: 100.11 90.85 

*** new Json After add/remove property
{
  "name" : "John Lin",
  "age" : 21,
  "geo" : [ 100.11, 90.85 ],
  "school" : "Tsing-Hua"
}

References

Gson - Tree Model

Working with Tree Model Nodes in Jackson | Baeldung

Jackson - Marshall String to JsonNode | Baeldung

Pretty-Print a JSON in Java | Baeldung

2024/10/14

TopoJSON, GeoJSON

GeoJSON 是一種用 JSON 文件格式描述地圖的格式,2016 年 IETF 於 RFC 7946 規範了 GeoJSON 的規格。GeoJSON 的幾何物件有點:表示地理位置、線:表示街道公路邊界、多邊形:表示國家鄉鎮市界。

TopoJSON 是 GeoJSON 的擴充,TopoJSON 以一連串的點組合成的 Arcs 描述,line 與 polygon 都改用 arcs 描述,如果是邊界,在 TopoJSON 裡面的 arc 只會定義一次,這樣可有效減少文件的大小。

要將 TopoJSON 與 GeoJSON 文件互相轉換,可使用 node module

npm install topojson
npm install geojson

安裝後切換到 node_modules/topojson/node_modules/topojson-server/bin 這個目錄,可看到 geo2topo 指令

以下指令可將 GeoJSON 檔案轉換為 TopoJSON

./geo2topo towns-09007.geo.json > towns-09007.topo.json

切換到 node_modules/topojson/node_modules/topojson-client/bin 這個目錄,可看到 topo2geo 指令

這個指令可查詢 TopoJSON 裡面的地圖名稱

./topo2geo -l < towns-090007.topo.json
# towns-09007.geo

這邊會查詢到名稱為 towns-09007.geo

用以下指令將 TopoJSON 轉為 GeoJSON

./topo2geo towns-09007.geo=towns-090007-2.geo.json < towns-090007.topo.json

java jts library

以下節錄 GeoJSON 文件結構

{
    "type": "FeatureCollection",
    "features": [
        {
            "type": "Feature",
            "properties": {
                "id": "10005160",
                "name": "三灣鄉"
            },
            "geometry": {
                "type": "Polygon",
                "coordinates": [
                    [
                        [
                            120.97453105516638,
                            24.583295428280817
                        ],
                        [
                            120.96669830509721,
                            24.586708627549427
                        ],
                        ......
                    ]
                ]
            },
            ......
        }
    ]
}

這邊使用了兩個 library: jts, jackson,jackson 是處理 JSON 文件,jts 是處理向量圖形的 library

        <!-- https://github.com/locationtech/jts -->
        <!-- https://mvnrepository.com/artifact/org.locationtech.jts/jts-core -->
        <dependency>
            <groupId>org.locationtech.jts</groupId>
            <artifactId>jts-core</artifactId>
            <version>1.19.0</version>
        </dependency>
        <dependency>
            <groupId>org.locationtech.jts.io</groupId>
            <artifactId>jts-io-common</artifactId>
            <version>1.19.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.17.0</version>
        </dependency>

透過 Jackson,將 FeatureCollection 裡面的 features array 分開,每一個獨立去查詢,GPS 點跟 每一個 feature 的 Polygon// 測試每一個 point 跟 polygon 的關係。

相關的 methods 有這些:

  • 相等(Equals):幾何形狀拓撲上相等。

  • 脫節(Disjoint):幾何形狀沒有共有的點。

  • 相交(Intersects):幾何形狀至少有一個共有點(區別於脫節)

  • 接觸(Touches):幾何形狀有至少一個公共的邊界點,但是沒有內部點。

  • 交叉(Crosses):幾何形狀共享一些但不是所有的內部點。

  • 內含(Within):幾何形狀A的線都在幾何形狀B內部。

  • 包含(Contains):幾何形狀B的線都在幾何形狀A內部(區別於內含)

  • 重疊(Overlaps):幾何形狀共享一部分但不是所有的公共點,而且相交處有他們自己相同的區域。

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.locationtech.jts.geom.*;
import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.geojson.GeoJsonReader;

import java.io.File;
import java.io.IOException;

public class PointInsidePolygon {

    public static void main(String[] args) throws IOException {
        // https://blog.csdn.net/qq_36427942/article/details/129123733
        // jackson lib to read json document
        ObjectMapper mapper = new ObjectMapper();
        ObjectNode geoJsonObject = (ObjectNode) mapper.readTree(new File("towns-10005.geo.json"));

//        System.out.println("geoJsonObject.toString()="+geoJsonObject.toString());

        // 透過 Jackson,將 FeatureCollection 裡面的 features array 分開
        // 每一個獨立去查詢,GPS 點跟 每一個 feature 的 Polygon
        // 測試每一個 point 跟 polygon 的關係
        Coordinate GPSPint = new Coordinate(120.97453105516638,24.583295428280817);
        JsonNode node2 = geoJsonObject.get("features");
//        System.out.println("node2="+node2);
        for(JsonNode node3: node2) {
//            System.out.println("node3="+node3);
            JsonNodeType node3Type = node3.getNodeType();
            JsonNode node3PropertiesNode = node3.get("properties");
            JsonNode node3PropertiesId = node3PropertiesNode.get("id");
            JsonNode node3PropertiesName = node3PropertiesNode.get("name");

            System.out.println("");
            System.out.println("node3PropertiesId="+node3PropertiesId+", node3PropertiesName="+node3PropertiesName);

            GeoJsonReader reader = new GeoJsonReader();
            Geometry geometry = null;
            try {
                geometry = reader.read(node3.toString());
            } catch (ParseException e) {
                throw new RuntimeException(e);
            }

            String geometryType = geometry.getGeometryType();
            // geometryType=GeometryCollection
            System.out.println("geometryType="+geometryType+", length="+ geometry.getLength());

//        Coordinate[] cors = geometry.getCoordinates();
//        System.out.println("cors length="+cors.length);
//        for(Coordinate c: cors) {
//            System.out.println("c ="+c.toString() );
//        }
            // get ExteriorRing
            if (geometry instanceof Polygon) {
                geometry = ((Polygon) geometry).getExteriorRing();
            } else {
                System.err.println("Invalid Polygon");
                return;
            }

            // JTS Geometry
            GeometryFactory geometryFactory = new GeometryFactory();
            Coordinate[] coordinates = geometry.getCoordinates();
            Coordinate[] jtsCoordinates = new Coordinate[coordinates.length];
            for (int i = 0; i < coordinates.length; i++) {
                jtsCoordinates[i] = new Coordinate(coordinates[i].x, coordinates[i].y);
            }
            Polygon polygon = geometryFactory.createPolygon(jtsCoordinates);

            // GPS Point
            Point gpsPoint = geometryFactory.createPoint(GPSPint);

//            相等(Equals):幾何形狀拓撲上相等。
//            脫節(Disjoint):幾何形狀沒有共有的點。
//            相交(Intersects):幾何形狀至少有一個共有點(區別於脫節)
//            接觸(Touches):幾何形狀有至少一個公共的邊界點,但是沒有內部點。
//            交叉(Crosses):幾何形狀共享一些但不是所有的內部點。
//            內含(Within):幾何形狀A的線都在幾何形狀B內部。
//            包含(Contains):幾何形狀B的線都在幾何形狀A內部(區別於內含)
//            重疊(Overlaps):幾何形狀共享一部分但不是所有的公共點,而且相交處有他們自己相同的區域。
            boolean isInside = polygon.contains(gpsPoint);
            boolean isWithin = polygon.within(gpsPoint);
            boolean intersects = polygon.intersects(gpsPoint);
            boolean overlaps = polygon.overlaps(gpsPoint);
            boolean crosses = polygon.crosses(gpsPoint);
            boolean touches = polygon.touches(gpsPoint);
            boolean disjoint = polygon.disjoint(gpsPoint);

            System.out.println("gps "+gpsPoint);
            System.out.println(" contains=" + isInside+", within=" + isWithin+", intersects="+intersects+". overlaps="+overlaps+", crosses="+crosses+", touches="+ touches+", disjoint="+disjoint);
        }
    }
}

執行結果如下:

node3PropertiesId="10005160", node3PropertiesName="三灣鄉"
geometryType=Polygon, length=0.4571892567423512
gps POINT (120.97453105516638 24.583295428280817)
 contains=false, within=false, intersects=true. overlaps=false, crosses=false, touches=true, disjoint=false

node3PropertiesId="10005110", node3PropertiesName="南庄鄉"
geometryType=Polygon, length=0.6073270143853203
gps POINT (120.97453105516638 24.583295428280817)
 contains=false, within=false, intersects=true. overlaps=false, crosses=false, touches=true, disjoint=false

node3PropertiesId="10005010", node3PropertiesName="苗栗市"
geometryType=Polygon, length=0.26286982385854196
gps POINT (120.97453105516638 24.583295428280817)
 contains=false, within=false, intersects=false. overlaps=false, crosses=false, touches=false, disjoint=true

References

D3.js應用

「GIS教程」将GeoJSON转换成TopoJSON的方法 | 麻辣GIS

GeoJSON - Wikipedia

2024/09/30

RTSP

RTSP是1996年由 RealNetworks, Netscape, 哥倫比亞大學開發,提交草案給 IETF,1998年發布為 RFC2326,2016年RTSP 2.0 發布於 RFC 7826。RTSP 是串流媒體伺服器的控制協定,可建立與控制終端設備跟伺服器之間的多媒體 session。

RTSP 協定本身看起來跟 HTTP 類似,但 HTTP 本身是 stateless,而RTSP 是 stateful,故需要追蹤 session。RTSP 是用 TCP 連線,而多媒體本身,是用 RTP 傳輸,可以用 TCP 或 UDP,常見狀況是為求傳輸速度快,使用 UDP。

節錄一個 RTSP client 取得 RTSP 的封包過程

  1. Options

    查詢 RTSP server 支援的 command

    Client -> Server

    Request: OPTIONS rtsp://192.168.1.11:8554/mystream RTSP/1.0\r\n
    CSeq: 2\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 2\r\n
    Public: DESCRIBE, ANNOUNCE, SETUP, PLAY, RECORD, PAUSE, GET_PARAMETER, TEARDOWN\r\n
    Server: gortsplib\r\n
    \r\n
  2. Describe

    查詢可處理的多媒體資料格式,server 以 SDP 方式回覆

    Client -> Server

    Request: DESCRIBE rtsp://192.168.1.11:8554/mystream RTSP/1.0\r\n
    CSeq: 3\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Accept: application/sdp\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 3\r\n
    Content-Base: rtsp://192.168.1.11:8554/mystream/\r\n
    Content-length: 560
    Content-type: application/sdp
    Server: gortsplib\r\n
    \r\n
    Session Description Protocol Version (v): 0
    Owner/Creator, Session Id (o): - 0 0 IN IP4 127.0.0.1
    Session Name (s): test
    Connection Information (c): IN IP4 0.0.0.0
    Time Description, active time (t): 0 0
    Media Description, name and address (m): video 0 RTP/AVP 96
    Media Attribute (a): control:rtsp://192.168.1.11:8554/mystream/trackID=0
    Media Attribute (a): rtpmap:96 H264/90000
    Media Attribute (a): fmtp:96 packetization-mode=1; profile-level-id=640032; sprop-parameter-sets=Z2QAMqzIUB4AiflwEQAAAwPpAAC7gA8YMZY=,aOk4XLIs
    Media Description, name and address (m): audio 0 RTP/AVP 97
    Media Attribute (a): control:rtsp://192.168.1.11:8554/mystream/trackID=1
    Media Attribute (a): rtpmap:97 mpeg4-generic/48000/2
    Media Attribute (a): fmtp:97 config=1190; indexdeltalength=3; indexlength=3; mode=AAC-hbr; profile-level-id=1; sizelength=13; streamtype=5
  3. Setup

    要求 server 設定傳送某一個 media stream,setup 要在 play 之前完成

    Client -> Server

    Request: SETUP rtsp://192.168.1.11:8554/mystream/trackID=0 RTSP/1.0\r\n
    CSeq: 4\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Transport: RTP/AVP/TCP;unicast;interleaved=0-1
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 4\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Transport: RTP/AVP/TCP;unicast;interleaved=0-1;ssrc=03DAD41B
    \r\n
  4. Setup

    client 透過 setup 跟 server 確認 stream session

    Client -> Server

    Request: SETUP rtsp://192.168.1.11:8554/mystream/trackID=1 RTSP/1.0\r\n
    CSeq: 5\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Transport: RTP/AVP/TCP;unicast;interleaved=2-3
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 5\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Transport: RTP/AVP/TCP;unicast;interleaved=2-3;ssrc=294122AE
    \r\n
  5. Play

    在 setup 設定的 session 裡面,開始播放 media

    Client -> Server

    Request: PLAY rtsp://192.168.1.11:8554/mystream/ RTSP/1.0\r\n
    CSeq: 6\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    Range: npt=0.000-\r\n
    \r\n

    Server -> Client

    Response: RTSP/1.0 200 OK\r\n
    CSeq: 6\r\n
    RTP-Info: url=rtsp://192.168.1.11:8554/mystream/trackID=0;seq=10453;rtptime=1024864644,url=rtsp://192.168.1.11:8554/mystream/trackID=1;seq=4824;rtptime=3779844790\r\n
    Server: gortsplib\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n
  6. RTP

    用 RTP 的格式,傳送 media 內容

    Server -> Client

    10.. .... = Version: RFC 1889 Version (2)
    ..0. .... = Padding: False
    ...0 .... = Extension: False
    .... 0000 = Contributing source identifiers count: 0
    0... .... = Marker: False
    Payload type: DynamicRTP-Type-96 (96)
    Sequence number: 10453
    Timestamp: 1024871144
    Synchronization Source identifier: 0x03dad41b (64672795)
    Payload: 09f0
  7. Teardown

    client 通知 server 停止播放 media

    Client -> Server

    Request: TEARDOWN rtsp://192.168.1.11:8554/mystream/ RTSP/1.0\r\n
    CSeq: 7\r\n
    User-Agent: LibVLC/3.0.20 (LIVE555 Streaming Media v2016.11.28)\r\n
    Session: 262553a7d5084e8fb57f9d8f485c89f4
    \r\n

References

即時串流協定 - 維基百科,自由的百科全書

2024/09/23

Secure Reliable Transport (SRT)

Secure Reliable Transport (SRT) 是開放的 video transport protocol,由 Halvision 提出,目的是要提供一個加密、低延遲的視訊串流傳輸協定,過去比較常見的協定是 RTMP, RTSP, HLS, WebRTC。SRT 是在 2013 年由 Haivision 發表,後來 Haivision 在 2017 年將 protocol 開放,交給 SRT Alliance,然後慢慢有更多廠商支援這個協定。

在這麼長久的網路視訊串流發展歷史中,RTMP 常見於網路影片直播,尤其是行動網路的直播,RTSP 常見於網路攝影機。RTMP 是以 TCP 為基礎,因為發展當時的網路頻寬不大,必須要用 TCP 本身的連線穩定度,封包傳送機制,來確保網路直播的可用性。

SRT 則是完全使用 UDP,在 Haivision 的文件中,提出 SRT 的延遲比 RTMP 少 2.5~3.2 倍。SRT 跟 RTP 的差異是,SRT 借鑒了 RTMP 控制機制,傳輸中除了 video 資料封包,還有控制封包,控制封包可根據網路延遲及品質,動態調整發送端的 video 發送速度,也能有限制地決定要不要重傳遺失的封包。

SRT 可套用加密機制,使用最常見的 AES-128/256 加密方法。

SRT 只是一種影片切割與包裝的方法,因此能適用於任何一種影片 codec。

Server

GitHub - Edward-Wu/srt-live-server: srt live server for low latency 這是一個 SRT streaming server。

安裝前,必須要先安裝 GitHub - Haivision/srt: Secure, Reliable, Transport SRT library,我們在 CentOS 測試,根據這個文件說明,依照以下步驟安裝

sudo yum install tcl pkgconfig openssl-devel cmake gcc gcc-c++ make automake
./configure
make
make install

安裝 SRT library 後,可安裝 server,下載 srt-live-server-master.zip,解壓縮後,直接 make 即可

sudo make

執行

cd bin
./sls -c ../sls.conf

Client

測試 SRT 可使用 OBS Studio

發布的網址為

srt://192.168.1.11:8080?streamid=uplive.sls.com/live/test

接收部分,可用 VLC video player 測試,網址為

srt://192.168.1.11:8080?streamid=live.sls.com/live/test

實際上實測,OBS 發佈到 VLC 接收,大約有 4~5 秒的延遲

iOS app

可安裝 Haivision Play Pro - Haivision iOS APP,這個 app 也可以接收 SRT streaming

設定方式如下

References

什麼是SRT 安全可靠傳輸協議

Secure Reliable Transport - Wikipedia

【ProAV Lab】SRT,互聯網上的最佳視訊串流協定 | Lumens

RTMP vs. SRT: Comparing Latency and Maximum Bandwidth - Haivision

2024/09/09

InfiniBand IB

作為網路互連的技術方案,Ethernet 跟 InfiniBand (IB) 兩者的發展目標不同,導致現在應用的領域跟範圍都不同。

  • 使用場景

一般人比較常聽到 Ethernet,Ethernet 用在區域網路中,可將多個網路設備連接起來,以光纖連接 Internet,以無線網路連接手持設備。InfiniteBand 主要用在HPC 與 data center,這類對高頻寬低延遲的網路連接需求較高的應用場景

  • 頻寬

Ethernet 的應用發展,並不是以高速頻寬為主要發展的重點,他的重點在讓多種異質網路終端能夠互相連接起來,所以頻寬並不是發展的重點。通常 Ethernet 是在 1Gbps 到 100 Gbps,而InfiniBand 可到 100 Gbps 或 200 Gbps,像這份報導的說明,InfiniBand進入400Gb/s世代,Nvidia新款網路交換器揭開序幕 | iThome 已經到了 400Gbps 的時代。

  • 應用範圍

Ethneret 面向一般終端的消費者,用來做個人使用的資料傳輸並連接 Internet。InfiniBand 是用在大量的資料運算,在現在最熱門的 AI 時代,要建構一個 AI cluster,會使用 InfiniBand 作為 cluster 節點的連接技術。

InfiniBand 另一個應用是在超級電腦裡面,因為超級電腦叢集也需要這種高速低延遲的資料傳輸。

  • 成本

InfiniBnad 的效能高,相對成本就很高,沒有特殊的需求,一般是不會使用 InfiniBand

  • 網路模式

InfiniBand 比 Ethernet 容易管理,每一個 end node 會透過一個 layer 2 switch 配置網路節點 ID,再往上以 Router 統計計算網路資料轉發路徑。

Ethernet 是用硬體的 MAC 網路模式,往上使用 IP 搭配 ARP 建構網路,且網路本身沒有學習機制,容易產生環狀網路,這又要透過 STP 協定解決,增加了網路的複雜度

Socket Direct Protocol (SDP)

Trail: Sockets Direct Protocol (The Java™ Tutorials)

Java 7 Sockets Direct Protocol – Write Once, Run Everywhere …. and Run (Some Places) Blazingly - InfoQ

SDP

在 JDK 7 裡面,就提供了一種不同於 TCP/IP 傳統的 Socket 網路的 Socket Direct Protocol,SDP 能夠透過 InifiBand 的 Remote Direct Memory Access (RDMA) ,以低延遲的方法,不透過 OS,遠端存取其他電腦的記憶體。

透過這張圖的說明,可以了解為什麼 SDP 能夠提供高速的通訊,傳統的 TCP/IP 需要一層一層接過 Application Layer -> Transport Layer -> Network Layer -> Physical Layer,SDP 則是從 Application 一步直接穿到 RDMA enabled channel adapter card

參考這個連結的圖片: SDP

References

InfiniBand - 維基百科,自由的百科全書

InfiniBand與以太網:它們是什麼? | 飛速(FS)社區

InfiniBand之技術架構介紹

2024/09/02

Quartz

Quartz 是 java 的 job-scheduling framwork,可使用類似 linux 的 crontab 方式設定定時啟動某個工作。

pom.xml

        <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.3.2</version>
        </dependency>

API

quartz API 的核心是 scheduler,啟動後,Scheduler 會產生多個 worker threads,也就是利用 ThreadPool,執行 Jobs。

主要的 API interface

  • Scheduler

  • Job

  • JobDetail

    • 定義 job instances
  • Trigger

    • 決定 scheduler 的定時機制

    • SimpleTrigger 跟 CronTrigger 兩種

  • JobBuilder

    • 用來產生 JobDetail instances
  • TriggerBuilder

    • 用來產生 Trigger instances

Example

SimpleJob.java

package quartz;

import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

// 要實作 Job interface
public class SimpleJob implements Job {
    // trigger 會透過 scheduler 的 worker 呼叫 execute 執行 job
    // JobExecutionContext 可提供 runtime environment 資訊
    public void execute(JobExecutionContext context) throws JobExecutionException {
        // 在 產生 JobDetail 時,JobData 會透過 JobDataMap 傳進 Job
        // Job 利用 JobDataMap 取得 JobData
        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
        String jobSays = dataMap.getString("jobSays");
        float myFloatValue = dataMap.getFloat("myFloatValue");

        System.out.println("Job says: " + jobSays + ", and val is: " + myFloatValue);
    }
}

JobA.java

@DisallowConcurrentExecution 可限制 Job 不會同時被執行兩次

package quartz;

import org.quartz.*;

@DisallowConcurrentExecution
public class JobA implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        TriggerKey triggerKey= context.getTrigger().getKey();
        System.out.println("job A. triggerKey="+triggerKey.getName()+", group="+triggerKey.getGroup() + ", fireTime="+context.getFireTime());
    }

}

JobB.java

package quartz;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.TriggerKey;

public class JobB implements Job {

    public void execute(JobExecutionContext context) throws JobExecutionException {
        TriggerKey triggerKey= context.getTrigger().getKey();
        System.out.println("job B. triggerKey="+triggerKey.getName()+", group="+triggerKey.getGroup() + ", fireTime="+context.getFireTime());
    }

}

QuartzExample.java

package quartz;

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzExample {

    public static void main(String args[]) {
        try {
            // 透過 SchedulerFactory 產生 Scheduler
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();
            // 使用 start 啟動,使用 shutdown 停掉這個 scheduler
            // 要先將 job, trigger 綁定 scheduler 後,再啟動 scheduler
            // sched.start();
            // sched.shutdown();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail job = JobBuilder.newJob(SimpleJob.class)
                                      .withIdentity("myJob", "group1")
                                      .usingJobData("jobSays", "Hello World!")
                                      .usingJobData("myFloatValue", 3.141f)
                                      .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                            .build();

            ///////
            JobDetail jobA = JobBuilder.newJob(JobA.class)
                                       .withIdentity("jobA", "group2")
                                       .build();
            JobDetail jobB = JobBuilder.newJob(JobB.class)
                                       .withIdentity("jobB", "group2")
                                       .build();

            // triggerA 比 triggerB 有較高的 priority,比較早先被執行
            // 每 40s 啟動一次
            Trigger triggerA = TriggerBuilder.newTrigger()
                                             .withIdentity("triggerA", "group2")
                                             .startNow()
                                             .withPriority(15)
                                             .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                             .build();

            // 每 20s 啟動一次
            Trigger triggerB = TriggerBuilder.newTrigger()
                                             .withIdentity("triggerB", "group2")
                                             .startNow()
                                             .withPriority(10)
                                             .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(20).repeatForever())
                                             .build();

            sched.scheduleJob(job, trigger);
            sched.scheduleJob(jobA, triggerA);
            sched.scheduleJob(jobB, triggerB);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

執行結果

Job says: Hello World!, and val is: 3.141
job A. triggerKey=triggerA, group=group2, fireTime=Wed Aug 28 16:16:37 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:16:37 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:16:57 CST 2024
Job says: Hello World!, and val is: 3.141
job A. triggerKey=triggerA, group=group2, fireTime=Wed Aug 28 16:17:17 CST 2024
job B. triggerKey=triggerB, group=group2, fireTime=Wed Aug 28 16:17:17 CST 2024

QuartzExample2.java

package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

public class QuartzExample2 {

    public static void main(String args[]) {
        try {
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail job = JobBuilder.newJob(SimpleJob.class)
                                      .withIdentity("myJob", "group1")
                                      .usingJobData("jobSays", "Hello World!")
                                      .usingJobData("myFloatValue", 3.141f)
                                      .build();

            Trigger trigger = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                            .build();

            // 跟上面的 Scheduler 一樣
            // 如果 worker thread 不足,job 可能不會被執行
            // 當 quartz 發現有這種狀況時,會使用 .withMisfireHandlingInstructionFireNow() 這個規則
            // 在 misfire 時,馬上執行一次
            Trigger trigger2 = TriggerBuilder.newTrigger()
                                            .withIdentity("myTrigger", "group1")
                                            .startNow()
                                            .withSchedule(SimpleScheduleBuilder.simpleSchedule().withMisfireHandlingInstructionFireNow().withIntervalInSeconds(40).repeatForever())
                                            .build();

            sched.scheduleJob(job, trigger);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

QuartzExample3.java

package quartz;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.util.Date;

public class QuartzExample3 {

    public static void main(String args[]) {
        try {
            SchedulerFactory schedFact = new StdSchedulerFactory();
            Scheduler sched = schedFact.getScheduler();

            // 產生 SimpleJob,利用 JobData 傳入資料到 Job
            JobDetail jobA = JobBuilder.newJob(JobA.class)
                                       .withIdentity("jobA", "group2")
                                       .build();

            // SimpleTrigger 可設定在某個特定時間開始
            // 3 seconds 後啟動 trigger
            // 同樣可加上 SimpleScheduleBuilder 定時每 40s 啟動一次
            java.util.Calendar calendar = java.util.Calendar.getInstance();
            calendar.add(java.util.Calendar.SECOND, 3);
            Date myStartTime = calendar.getTime();
            SimpleTrigger trigger =
                    (SimpleTrigger) TriggerBuilder.newTrigger()
                                                  .withIdentity("trigger1", "group1")
                                                  .startAt(myStartTime)
                                                  .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(40).repeatForever())
                                                  .build();

            JobDetail jobB = JobBuilder.newJob(JobB.class)
                                       .withIdentity("jobB", "group1")
                                       .build();
            // 透過 CronScheduleBuilder 產生 scheduler
            // "0 0/1 * * * ?" 代表每分鐘執行一次
            // 然後產生 CronTrigger
            CronTrigger trigger2 = TriggerBuilder.newTrigger()
                                                 .withIdentity("trigger2", "group1")
                                                 .withSchedule(CronScheduleBuilder.cronSchedule("0 0/1 * * * ?"))
                                                 .build();

            sched.scheduleJob(jobA, trigger);
            sched.scheduleJob(jobB, trigger2);
            sched.start();

        } catch (SchedulerException e) {
            e.printStackTrace();
        }
    }

}

執行結果

job B. triggerKey=trigger2, group=group1, fireTime=Wed Aug 28 16:11:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:11:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:11:40 CST 2024
job B. triggerKey=trigger2, group=group1, fireTime=Wed Aug 28 16:12:00 CST 2024
job A. triggerKey=trigger1, group=group1, fireTime=Wed Aug 28 16:12:20 CST 2024

References

Introduction to Quartz | Baeldung