WebSocket 是在 http protocol 進行雙向通訊傳輸的協定,可以用 UTF-8 Text 或 Binary format。message 沒有長度限制,但 framing 有限制長度。可發送無限個訊息。訊息必須依照順序傳送,無法支援 interleaved messages。
WebSocket connection state
有四種
| State | Description | 
|---|---|
| Connecting | 當 HTTP upgrade 到 Websocket | 
| Open | socket is open, ready to read/write | 
| Closing | 啟動 WebSocket Close Handshake | 
| Closed | websocket is closed | 
WebSocket Events
| Event | Description | 
|---|---|
| on Connect | 成功連線,會收到 org.eclipse.jetty.websocket.api.Session object reference,這是該 socket 的 session | 
| on Close | 會有 Status Code | 
| on Error | websocket 發生 error | 
| on Message | 代表收到完整的 message,可以是 UTF-8 Text 或是 raw BINARY message | 
Jetty 提供的 WebSocket Spec
- RFC-6455 
 目前支援 WebSocket Protocol version 13
- JSR-356 
 Java WebScoket API (javax.webscoket),這是處理 websocket 的標準 java API
目前還不穩定的功能
- perframe-compression 
 Per Frame Compression Extension- 這是 Google/Chromium team 提供的 frame compression,但還在 early draft,Jetty 支援 draft-04 spec,目前已經被 permessage-compression 取代 
- permessage-compression 
 Per Message Compression Extension- 將壓縮改為整個 message,而不是每一個 frame 
WebSocket Session
websocket Session 物件有以下的使用方式
- 檢查 connection state (opened or not) - if(session.isOpen()) { }
- 檢查 secure - if(session.isSecure()) { // connection is using 'wss://' }
- 有哪些在 Upgrade Request and Response - UpgradeRequest req = session.getUpgradeRequest(); String channelName = req.getParameterMap().get("channelName"); UpgradeResponse resp = session.getUpgradeResponse(); String subprotocol = resp.getAcceptedSubProtocol();
- 取得 Local and Remote Address - InetSocketAddress remoteAddr = session.getRemoteAddress();
- 存取 idle timeout - session.setIdleTimeout(2000); // 2 second timeout
Jetty WebSocket API
同時支援 server 及 client
要開發 Jetty Websocket 程式,首先要在 Maven POM 加上 library,因測試同時要支援 RFC-6455 及 JSR-356,故同時加上了兩種 library
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>tw.com.maxkit</groupId>
    <artifactId>test</artifactId>
    <version>0.1</version>
    <properties>
        <jetty.version>9.4.12.v20180830</jetty.version>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-maven-plugin</artifactId>
                <version>${jetty.version}</version>
                <configuration>
                    <scanIntervalSeconds>2</scanIntervalSeconds>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--Jetty dependencies start here -->
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-servlet</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty dependencies end here -->
        <!--Jetty Websocket server side dependencies start here -->
        <!--Jetty JSR-356 Websocket server side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>javax-websocket-server-impl</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket API server side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-server</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket server dependencies end here -->
        <!--Jetty Websocket client side dependencies start here -->
        <!--JSR-356 Websocket client side depencency  -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>javax-websocket-client-impl</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket API client side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-client</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket client side  dependencies end here -->
    </dependencies>
</project>RFC-6455 websocket Server
首先要將 Jetty path 透過 WebSocketServlet 跟 WebSocket class 綁定。
以下是 ToUpperWebSocketServlet 的 servlet,會處理 /toUpper 這個 url,因為在 IDE 裡面,通常會將 webapp 對應到某個 context,假設是 test,那麼 websocket 服務的 url,應該是 ws://localhost:8080/test/toUpper
ToUpperWebSocketServlet.java
package tw.com.maxkit.jetty.server;
import javax.servlet.annotation.WebServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
@WebServlet(name = "ToUpper WebSocket Servlet", urlPatterns="/toUpper")
public class ToUpperWebSocketServlet  extends WebSocketServlet{
    @Override
    public void configure(WebSocketServletFactory factory) {
        // set a 10 second timeout
        factory.getPolicy().setIdleTimeout(10000);
//      factory.register(ToUpperWebSocket.class);
//      factory.register(ToUpperWebSocketListener.class);
        factory.register(ToUpperWebSocketAdapter.class);
    }
}程式裡面設定了 ide timeout 的時間為 10s,另外有三種真正實作 websocket 訊息的方式,如果要使用某一種實作方式,只要調整 register 的 implementation class 即可。
//      factory.register(ToUpperWebSocket.class);
//      factory.register(ToUpperWebSocketListener.class);
        factory.register(ToUpperWebSocketAdapter.class);
- WebSocket annotation
| annotation | description | 
|---|---|
| @WebSocket | 將這個 POJO 標記為 WebSocket,class 不能是 abstract and public | 
| @OnWebSocketClose | (optional) 收到 onClose event | 
| @OnWebSocketMessage | (optional) 有兩個 method,分別是 TEXT 與 BINARY message | 
| @OnWebSocketError | (optional) 收到 error event | 
| @OnWebSocketFrame | (optional) 收到 frame event | 
ToUppderWebSocket.java
package tw.com.maxkit.jetty.server;
import java.io.IOException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket
public class ToUpperWebSocket {
    @OnWebSocketMessage
    public void onText(Session session, String message) throws IOException {
        System.out.println("ToUpperWebSocket received:" + message);
        if (session.isOpen()) {
            String response = message.toUpperCase();
            session.getRemote().sendString(response);
        }
    }
    @OnWebSocketConnect
    public void onConnect(Session session) throws IOException {
        System.out.println( session.getRemoteAddress().getHostName() + " connected!");
    }
    @OnWebSocketClose
    public void onClose(Session session, int status, String reason) {
        System.out.println(session.getRemoteAddress().getHostName() + " closed!");
    }
}- WebSocketListener
ToUpperWebSocketListener.java
package tw.com.maxkit.jetty.server;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;
public class ToUpperWebSocketListener implements WebSocketListener {
    private Session outbound;
    public void onWebSocketBinary(byte[] payload, int offset, int len) {
        /* only interested in text messages */
    }
    public void onWebSocketClose(int statusCode, String reason) {
        this.outbound = null;
    }
    public void onWebSocketConnect(Session session) {
        this.outbound = session;
    }
    public void onWebSocketError(Throwable cause) {
        cause.printStackTrace(System.err);
    }
    public void onWebSocketText(String message) {
        if ((outbound != null) && (outbound.isOpen())) {
            System.out.printf("ToUpperWebSocketListener [%s]%n", message);
            // echo the message back
            outbound.getRemote().sendString(message.toUpperCase(), null);
        }
    }
}- WebSocketAdpapter
比 listener 簡單,提供檢查 session state 的 methods
ToUpperWebSocketAdapter.java
package tw.com.maxkit.jetty.server;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import java.io.IOException;
public class ToUpperWebSocketAdapter extends WebSocketAdapter
{
    @Override
    public void onWebSocketText(String message)
    {
        if (isConnected())
        {
            try
            {
                System.out.printf("ToUpperWebSocketAdapter received: [%s]%n",message);
                // echo the message back
                getRemote().sendString(message.toUpperCase());
            }
            catch (IOException e)
            {
                e.printStackTrace(System.err);
            }
        }
    }
}JSR-356 websocket Server
在網址 ws://localhost:8008/test/jsr356toUpper 提供服務
ToUpper356Socket.java
package tw.com.maxkit.jsr356.server;
import java.io.IOException;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint("/jsr356toUpper")
public class ToUpper356Socket {
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("WebSocket opened: " + session.getId());
    }
    @OnMessage
    public void onMessage(String txt, Session session) throws IOException {
        System.out.println("Message received: " + txt);
        session.getBasicRemote().sendText(txt.toUpperCase());
    }
    @OnClose
    public void onClose(CloseReason reason, Session session) {
        System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());
    }
}測試網頁
websocketecho.html
<html>
<body>
    <div>
        <input type="text" id="input" />
    </div>
    <div>
        <input type="button" id="connectBtn" value="CONNECT"
            onclick="connect()" /> <input type="button" id="sendBtn"
            value="SEND" onclick="send()" disabled="true" />
    </div>
    <div id="output">
        <p>Output</p>
    </div>
</body>
<script type="text/javascript">
    var webSocket;
    var output = document.getElementById("output");
    var connectBtn = document.getElementById("connectBtn");
    var sendBtn = document.getElementById("sendBtn");
    function connect() {
        // oprn the connection if one does not exist
        if (webSocket !== undefined
                && webSocket.readyState !== WebSocket.CLOSED) {
            return;
        }
        // Create a websocket
        webSocket = new WebSocket("ws://localhost:8080/test/toUpper");
        webSocket.onopen = function(event) {
            updateOutput("Connected!");
            connectBtn.disabled = true;
            sendBtn.disabled = false;
        };
        webSocket.onmessage = function(event) {
            updateOutput(event.data);
        };
        webSocket.onclose = function(event) {
            updateOutput("Connection Closed");
            connectBtn.disabled = false;
            sendBtn.disabled = true;
        };
    }
    function send() {
        var text = document.getElementById("input").value;
        webSocket.send(text);
    }
    function closeSocket() {
        webSocket.close();
    }
    function updateOutput(text) {
        output.innerHTML += "<br/>" + text;
    }
</script>
</html>WebSocket Client
client 同樣分 RFC-6455 與 JSR-356 兩種
RFC-6455
WebSocketClientMain.java
package tw.com.maxkit.jetty.client;
import java.net.URI;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
public class WebSocketClientMain {
    public static void main(String[] args) {
        String dest = "ws://localhost:8080/test/toUpper";
        WebSocketClient client = new WebSocketClient();
        try {
            
            ToUpperClientSocket socket = new ToUpperClientSocket();
            client.start();
            URI echoUri = new URI(dest);
            ClientUpgradeRequest request = new ClientUpgradeRequest();
            client.connect(socket, echoUri, request);
            socket.getLatch().await();
            socket.sendMessage("echo");
            socket.sendMessage("test");
            Thread.sleep(10000l);
        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            try {
                client.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}ToUpperClientSocket.java
package tw.com.maxkit.jetty.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@WebSocket
public class ToUpperClientSocket {
    private Session session;
    
    CountDownLatch latch= new CountDownLatch(1);
    @OnWebSocketMessage
    public void onText(Session session, String message) throws IOException {
        System.out.println("Message received from server:" + message);
    }
    @OnWebSocketConnect
    public void onConnect(Session session) {
        System.out.println("Connected to server");
        this.session=session;
        latch.countDown();
    }
    
    public void sendMessage(String str) {
        try {
            session.getRemote().sendString(str);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public CountDownLatch getLatch() {
        return latch;
    }
}JSR-356 Client
WebSocket356ClientMain.java
package tw.com.maxkit.jsr356.client;
import java.net.URI;
import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;
public class WebSocket356ClientMain {
    public static void main(String[] args) {
    
        try {
            String dest = "ws://localhost:8080/test/jsr356toUpper";
            ToUpper356ClientSocket socket = new ToUpper356ClientSocket();
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(socket, new URI(dest));
            socket.getLatch().await();
            socket.sendMessage("echo356");
            socket.sendMessage("test356");
            Thread.sleep(10000l);
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
}ToUpper356ClientSocket.java
package tw.com.maxkit.jsr356.client;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
@ClientEndpoint
public class ToUpper356ClientSocket {
    CountDownLatch latch = new CountDownLatch(1);
    private Session session;
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("Connected to server");
        this.session = session;
        latch.countDown();
    }
    @OnMessage
    public void onText(String message, Session session) {
        System.out.println("Message received from server:" + message);
    }
    @OnClose
    public void onClose(CloseReason reason, Session session) {
        System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());
    }
    public CountDownLatch getLatch() {
        return latch;
    }
    public void sendMessage(String str) {
        try {
            session.getBasicRemote().sendText(str);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}
Sending Message to Remote Endpoint
發送訊息有幾種方式
Blocking Send Message
在完成訊息發送後,該 method 才會 return
這是發送 binary message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
    remote.sendBytes(buf);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}這是發送 text message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a TEXT message to remote endpoint
try
{
    remote.sendString("Hello World");
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}發送 Partial Message
如果有個大訊息,希望切割成多個部分,可利用 partial message sending methods,最後一個的 isLast == true
binary message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a BINARY message to remote endpoint
// Part 1
ByteBuffer buf1 = ByteBuffer.wrap(new byte[] { 0x11, 0x22 });
// Part 2 (last part)
ByteBuffer buf2 = ByteBuffer.wrap(new byte[] { 0x33, 0x44 });
try
{
    remote.sendPartialBytes(buf1,false);
    remote.sendPartialBytes(buf2,true); // isLast is true
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}text message
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a TEXT message to remote endpoint
String part1 = "Hello";
String part2 = " World";
try
{
    remote.sendPartialString(part1,false);
    remote.sendPartialString(part2,true); // last part
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}發送 Ping / Pong Control Frame
PING
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a PING to remote endpoint
String data = "You There?";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
    remote.sendPing(payload);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}PONG
RemoteEndpoint remote = session.getRemote();
// Blocking Send of a PONG to remote endpoint
String data = "Yup, I'm here";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
    remote.sendPong(payload);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}發非同步訊息發送
有兩個 async send message methods
- RemoteEndpoint.sendBytesByFuture(ByteBuffer message)
- RemoteEndpoint.sendStringByFuture(String message)
會回傳 java.util.concurrent.Future
binary
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
remote.sendBytesByFuture(buf);可利用 get 等待發送是否完成
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
    Future<Void> fut = remote.sendBytesByFuture(buf);
    // wait for completion (forever)
    fut.get();
}
catch (ExecutionException | InterruptedException e)
{
    // Send failed
    e.printStackTrace();
}可在 get 加上 timeout 時間
RemoteEndpoint remote = session.getRemote();
// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
Future<Void> fut = null;
try
{
    fut = remote.sendBytesByFuture(buf);
    // wait for completion (timeout)
    fut.get(2,TimeUnit.SECONDS);
}
catch (ExecutionException | InterruptedException e)
{
    // Send failed
    e.printStackTrace();
}
catch (TimeoutException e)
{
    // timeout
    e.printStackTrace();
    if (fut != null)
    {
        // cancel the message
        fut.cancel(true);
    }
}text 訊息跟 binary 類似,只是將 sendBytesByFuture 換成 sendStringByFuture
 
 
沒有留言:
張貼留言