Reactive Programming
這是一種以資料流為主的程式開發模型,目前比較流行的 Vue, React 也是類似這個概念,畫面上顯示的運算結果或呈現的資料,會隨時跟著資料異動而自動更新。
傳統的程式設計是用 Interative Programming 方法,例如
b:=2;
c:=3;
a:=b+c
// a 為 5
b:=3;
// 當 b 的值異動後,必須重新運算一次 b+c,才能更新 a
Excel 就是使用 Reactive Programming,當在某個儲存格 C1 填寫為 C1=A1+B1
,C1 的值,就會隨著 A1 與 B1 異動而自動更新。
RSocket
rsocket 是類似 gRPC 的訊息傳遞協定,支援 TCP, WebSocket, Aeron(UDP),主要有四種互動的方式
Interaction Model | Behavior |
---|---|
fire-and-forget | 不需要 response |
request-and-response | one-to-one 傳統通訊,一個 request,一個response,然後不斷重複 |
request-response-stream | one-to-many 發一個訊息,可連續收到多個 response |
channel | many-to-many bi-directional stream |
Sample
ref: RSocket + WebSocket + Spring Boot = Real Time Application
這個網頁用 spring boot 做了stream 及 channel 兩個 sample server
client 部分是用 rsocket-js,透過 nodejs 啟動
Server
用 Java Sprint Boot 撰寫
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.vinsguru</groupId>
<artifactId>rsocket-websocket</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rsocket-websocket</name>
<description>Demo project for Spring Boot, RSocket WebSocket</description>
<properties>
<java.version>11</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
resources/application.properties
spring.rsocket.server.port=6565
spring.rsocket.server.mapping-path=/rsocket
spring.rsocket.server.transport=websocket
RsocketWebsocketApplication
package com.vinsguru.rsocketwebsocket;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RsocketWebsocketApplication {
public static void main(String[] args) {
SpringApplication.run(RsocketWebsocketApplication.class, args);
}
}
RSocketController
package com.vinsguru.rsocketwebsocket.controller;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import java.time.Duration;
@Controller
public class RSocketController {
@MessageMapping("number.stream")
public Flux<Integer> responseStream(Integer number) {
return Flux.range(1, number)
.delayElements(Duration.ofSeconds(1));
}
@MessageMapping("number.channel")
public Flux<Long> biDirectionalStream(Flux<Long> numberFlux) {
return numberFlux
.map(n -> n * n)
.onErrorReturn(-1L);
}
}
- number.stream 就是 request-response-stream,client 會發送一個 Integer 給 Server,Server 會定時回傳 1, 2 ..... 直到該 Integer 停止
- number.channel 是 many-to-many 的傳輸,可連續發送多個 Long 給 Server,Server 會回傳該數的平方
Client
最主要的是 index.js
有兩個部分,分別是 number.stream 與 number.channel
import { RSocketClient, JsonSerializer, IdentitySerializer } from 'rsocket-core';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import {FlowableProcessor} from 'rsocket-flowable';
// backend ws endpoint
const wsURL = 'ws://localhost:6565/rsocket';
// rsocket client
const client = new RSocketClient({
serializers: {
data: JsonSerializer,
metadata: IdentitySerializer
},
setup: {
keepAlive: 60000,
lifetime: 180000,
dataMimeType: 'application/json',
metadataMimeType: 'message/x.rsocket.routing.v0',
},
transport: new RSocketWebSocketClient({
url: wsURL
})
});
// error handler
const errorHanlder = (e) => console.log(e);
// response handler
const responseHanlder = (payload) => {
const li = document.createElement('li');
li.innerText = payload.data;
li.classList.add('list-group-item', 'small')
document.getElementById('result').appendChild(li);
}
/////////
// number.stream
////////
/*
const numberRequester = (socket, value) => {
socket.requestStream({
data: value,
metadata: String.fromCharCode('number.stream'.length) + 'number.stream'
}).subscribe({
onError: errorHanlder,
onNext: responseHanlder,
onSubscribe: subscription => {
subscription.request(100); // set it to some max value
}
})
}
client.connect().then(sock => {
document.getElementById('n').addEventListener('change', ({srcElement}) => {
numberRequester(sock, parseInt(srcElement.value));
})
}, errorHanlder);
*/
/////////
// number.channel
////////
// reactive stream processor
const processor = new FlowableProcessor(sub => {});
const numberRequester = (socket, processor) => {
socket.requestChannel(processor.map(i => {
return {
data: i,
metadata: String.fromCharCode('number.channel'.length) + 'number.channel'
}
})).subscribe({
onError: errorHanlder,
onNext: responseHanlder,
onSubscribe: subscription => {
subscription.request(100); // set it to some max value
}
})
}
client.connect().then(sock => {
numberRequester(sock, processor);
document.getElementById('n').addEventListener('keyup', ({srcElement}) => {
if(srcElement.value.length > 0){
processor.onNext(parseInt(srcElement.value))
}
})
}, errorHanlder);
安裝 node_modules
npm install
設定
npm run build
啟動 client
npm run serve
問題
一直找不到如何單獨使用 rsocket-js 的方法,就是不透過 nodejs 提供 網頁 service 的方法。現在很多 js library 只有用在 nodejs 的 sample,但傳統的網頁,應該只要透過 webserver 直接載入 javascript file,就可以使用才對。如果可以這樣用,就可以將 html, js 合併到 sping boot 裡面,也不需要兩個 service port。
沒有留言:
張貼留言