2019/07/01

WebSocket Support in Jetty

WebSocket 是在 http protocol 進行雙向通訊傳輸的協定,可以用 UTF-8 Text 或 Binary format。message 沒有長度限制,但 framing 有限制長度。可發送無限個訊息。訊息必須依照順序傳送,無法支援 interleaved messages。

WebSocket connection state

有四種

State Description
Connecting 當 HTTP upgrade 到 Websocket
Open socket is open, ready to read/write
Closing 啟動 WebSocket Close Handshake
Closed websocket is closed

WebSocket Events

Event Description
on Connect 成功連線,會收到 org.eclipse.jetty.websocket.api.Session object reference,這是該 socket 的 session
on Close 會有 Status Code
on Error websocket 發生 error
on Message 代表收到完整的 message,可以是 UTF-8 Text 或是 raw BINARY message

Jetty 提供的 WebSocket Spec

  • RFC-6455
    目前支援 WebSocket Protocol version 13

  • JSR-356
    Java WebScoket API (javax.webscoket),這是處理 websocket 的標準 java API

目前還不穩定的功能

  • perframe-compression
    Per Frame Compression Extension

    這是 Google/Chromium team 提供的 frame compression,但還在 early draft,Jetty 支援 draft-04 spec,目前已經被 permessage-compression 取代

  • permessage-compression
    Per Message Compression Extension

    將壓縮改為整個 message,而不是每一個 frame

WebSocket Session

websocket Session 物件有以下的使用方式

  1. 檢查 connection state (opened or not)

    if(session.isOpen()) {
    }
  2. 檢查 secure

    if(session.isSecure()) {
      // connection is using 'wss://'
    }
  3. 有哪些在 Upgrade Request and Response

    UpgradeRequest req = session.getUpgradeRequest();
    String channelName = req.getParameterMap().get("channelName");
    
    UpgradeResponse resp = session.getUpgradeResponse();
    String subprotocol = resp.getAcceptedSubProtocol();
  4. 取得 Local and Remote Address

    InetSocketAddress remoteAddr = session.getRemoteAddress();
  5. 存取 idle timeout

    session.setIdleTimeout(2000); // 2 second timeout

Jetty WebSocket API

同時支援 server 及 client

要開發 Jetty Websocket 程式,首先要在 Maven POM 加上 library,因測試同時要支援 RFC-6455 及 JSR-356,故同時加上了兩種 library

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tw.com.maxkit</groupId>
    <artifactId>test</artifactId>
    <version>0.1</version>

    <properties>
        <jetty.version>9.4.12.v20180830</jetty.version>
    </properties>

    <build>
        <plugins>
            <plugin>
                <groupId>org.eclipse.jetty</groupId>
                <artifactId>jetty-maven-plugin</artifactId>
                <version>${jetty.version}</version>
                <configuration>
                    <scanIntervalSeconds>2</scanIntervalSeconds>
                </configuration>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--Jetty dependencies start here -->
        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-server</artifactId>
            <version>${jetty.version}</version>
        </dependency>

        <dependency>
            <groupId>org.eclipse.jetty</groupId>
            <artifactId>jetty-servlet</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty dependencies end here -->

        <!--Jetty Websocket server side dependencies start here -->
        <!--Jetty JSR-356 Websocket server side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>javax-websocket-server-impl</artifactId>
            <version>${jetty.version}</version>
        </dependency>

        <!--Jetty Websocket API server side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-server</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket server dependencies end here -->


        <!--Jetty Websocket client side dependencies start here -->
        <!--JSR-356 Websocket client side depencency  -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>javax-websocket-client-impl</artifactId>
            <version>${jetty.version}</version>
        </dependency>

        <!--Jetty Websocket API client side dependency -->
        <dependency>
            <groupId>org.eclipse.jetty.websocket</groupId>
            <artifactId>websocket-client</artifactId>
            <version>${jetty.version}</version>
        </dependency>
        <!--Jetty Websocket client side  dependencies end here -->

    </dependencies>

</project>
RFC-6455 websocket Server

首先要將 Jetty path 透過 WebSocketServlet 跟 WebSocket class 綁定。

以下是 ToUpperWebSocketServlet 的 servlet,會處理 /toUpper 這個 url,因為在 IDE 裡面,通常會將 webapp 對應到某個 context,假設是 test,那麼 websocket 服務的 url,應該是 ws://localhost:8080/test/toUpper

ToUpperWebSocketServlet.java

package tw.com.maxkit.jetty.server;

import javax.servlet.annotation.WebServlet;

import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

@WebServlet(name = "ToUpper WebSocket Servlet", urlPatterns="/toUpper")
public class ToUpperWebSocketServlet  extends WebSocketServlet{

    @Override
    public void configure(WebSocketServletFactory factory) {
        // set a 10 second timeout
        factory.getPolicy().setIdleTimeout(10000);

//      factory.register(ToUpperWebSocket.class);
//      factory.register(ToUpperWebSocketListener.class);
        factory.register(ToUpperWebSocketAdapter.class);
    }

}

程式裡面設定了 ide timeout 的時間為 10s,另外有三種真正實作 websocket 訊息的方式,如果要使用某一種實作方式,只要調整 register 的 implementation class 即可。

//      factory.register(ToUpperWebSocket.class);
//      factory.register(ToUpperWebSocketListener.class);
        factory.register(ToUpperWebSocketAdapter.class);
  • WebSocket annotation
annotation description
@WebSocket 將這個 POJO 標記為 WebSocket,class 不能是 abstract and public
@OnWebSocketClose (optional) 收到 onClose event
@OnWebSocketMessage (optional) 有兩個 method,分別是 TEXT 與 BINARY message
@OnWebSocketError (optional) 收到 error event
@OnWebSocketFrame (optional) 收到 frame event

ToUppderWebSocket.java

package tw.com.maxkit.jetty.server;

import java.io.IOException;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

@WebSocket
public class ToUpperWebSocket {

    @OnWebSocketMessage
    public void onText(Session session, String message) throws IOException {
        System.out.println("ToUpperWebSocket received:" + message);
        if (session.isOpen()) {
            String response = message.toUpperCase();
            session.getRemote().sendString(response);
        }
    }

    @OnWebSocketConnect
    public void onConnect(Session session) throws IOException {
        System.out.println( session.getRemoteAddress().getHostName() + " connected!");
    }

    @OnWebSocketClose
    public void onClose(Session session, int status, String reason) {
        System.out.println(session.getRemoteAddress().getHostName() + " closed!");
    }

}
  • WebSocketListener

ToUpperWebSocketListener.java

package tw.com.maxkit.jetty.server;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketListener;

public class ToUpperWebSocketListener implements WebSocketListener {
    private Session outbound;

    public void onWebSocketBinary(byte[] payload, int offset, int len) {
        /* only interested in text messages */
    }

    public void onWebSocketClose(int statusCode, String reason) {
        this.outbound = null;
    }

    public void onWebSocketConnect(Session session) {
        this.outbound = session;
    }

    public void onWebSocketError(Throwable cause) {
        cause.printStackTrace(System.err);
    }

    public void onWebSocketText(String message) {
        if ((outbound != null) && (outbound.isOpen())) {
            System.out.printf("ToUpperWebSocketListener [%s]%n", message);
            // echo the message back
            outbound.getRemote().sendString(message.toUpperCase(), null);
        }
    }
}
  • WebSocketAdpapter

比 listener 簡單,提供檢查 session state 的 methods

ToUpperWebSocketAdapter.java

package tw.com.maxkit.jetty.server;


import org.eclipse.jetty.websocket.api.WebSocketAdapter;

import java.io.IOException;

public class ToUpperWebSocketAdapter extends WebSocketAdapter
{
    @Override
    public void onWebSocketText(String message)
    {
        if (isConnected())
        {
            try
            {
                System.out.printf("ToUpperWebSocketAdapter received: [%s]%n",message);
                // echo the message back
                getRemote().sendString(message.toUpperCase());
            }
            catch (IOException e)
            {
                e.printStackTrace(System.err);
            }
        }
    }
}
JSR-356 websocket Server

在網址 ws://localhost:8008/test/jsr356toUpper 提供服務

ToUpper356Socket.java

package tw.com.maxkit.jsr356.server;

import java.io.IOException;

import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/jsr356toUpper")
public class ToUpper356Socket {

    @OnOpen
    public void onOpen(Session session) {
        System.out.println("WebSocket opened: " + session.getId());
    }
    @OnMessage
    public void onMessage(String txt, Session session) throws IOException {
        System.out.println("Message received: " + txt);
        session.getBasicRemote().sendText(txt.toUpperCase());
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) {
        System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());

    }
}

測試網頁

websocketecho.html

<html>
<body>
    <div>
        <input type="text" id="input" />
    </div>
    <div>
        <input type="button" id="connectBtn" value="CONNECT"
            onclick="connect()" /> <input type="button" id="sendBtn"
            value="SEND" onclick="send()" disabled="true" />
    </div>
    <div id="output">
        <p>Output</p>
    </div>
</body>

<script type="text/javascript">
    var webSocket;
    var output = document.getElementById("output");
    var connectBtn = document.getElementById("connectBtn");
    var sendBtn = document.getElementById("sendBtn");

    function connect() {
        // oprn the connection if one does not exist
        if (webSocket !== undefined
                && webSocket.readyState !== WebSocket.CLOSED) {
            return;
        }
        // Create a websocket
        webSocket = new WebSocket("ws://localhost:8080/test/toUpper");

        webSocket.onopen = function(event) {
            updateOutput("Connected!");
            connectBtn.disabled = true;
            sendBtn.disabled = false;

        };

        webSocket.onmessage = function(event) {
            updateOutput(event.data);
        };

        webSocket.onclose = function(event) {
            updateOutput("Connection Closed");
            connectBtn.disabled = false;
            sendBtn.disabled = true;
        };
    }

    function send() {
        var text = document.getElementById("input").value;
        webSocket.send(text);
    }

    function closeSocket() {
        webSocket.close();
    }

    function updateOutput(text) {
        output.innerHTML += "<br/>" + text;
    }
</script>
</html>

WebSocket Client

client 同樣分 RFC-6455 與 JSR-356 兩種

RFC-6455

WebSocketClientMain.java

package tw.com.maxkit.jetty.client;

import java.net.URI;

import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;

public class WebSocketClientMain {

    public static void main(String[] args) {
        String dest = "ws://localhost:8080/test/toUpper";
        WebSocketClient client = new WebSocketClient();
        try {
            
            ToUpperClientSocket socket = new ToUpperClientSocket();
            client.start();
            URI echoUri = new URI(dest);
            ClientUpgradeRequest request = new ClientUpgradeRequest();
            client.connect(socket, echoUri, request);
            socket.getLatch().await();
            socket.sendMessage("echo");
            socket.sendMessage("test");
            Thread.sleep(10000l);

        } catch (Throwable t) {
            t.printStackTrace();
        } finally {
            try {
                client.stop();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

ToUpperClientSocket.java

package tw.com.maxkit.jetty.client;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;

@WebSocket
public class ToUpperClientSocket {

    private Session session;
    
    CountDownLatch latch= new CountDownLatch(1);

    @OnWebSocketMessage
    public void onText(Session session, String message) throws IOException {
        System.out.println("Message received from server:" + message);
    }

    @OnWebSocketConnect
    public void onConnect(Session session) {
        System.out.println("Connected to server");
        this.session=session;
        latch.countDown();
    }
    
    public void sendMessage(String str) {
        try {
            session.getRemote().sendString(str);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    public CountDownLatch getLatch() {
        return latch;
    }

}
JSR-356 Client

WebSocket356ClientMain.java

package tw.com.maxkit.jsr356.client;

import java.net.URI;

import javax.websocket.ContainerProvider;
import javax.websocket.WebSocketContainer;

public class WebSocket356ClientMain {

    public static void main(String[] args) {
    
        try {

            String dest = "ws://localhost:8080/test/jsr356toUpper";
            ToUpper356ClientSocket socket = new ToUpper356ClientSocket();
            WebSocketContainer container = ContainerProvider.getWebSocketContainer();
            container.connectToServer(socket, new URI(dest));

            socket.getLatch().await();
            socket.sendMessage("echo356");
            socket.sendMessage("test356");
            Thread.sleep(10000l);

        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
}

ToUpper356ClientSocket.java

package tw.com.maxkit.jsr356.client;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import javax.websocket.ClientEndpoint;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;

@ClientEndpoint
public class ToUpper356ClientSocket {

    CountDownLatch latch = new CountDownLatch(1);
    private Session session;

    @OnOpen
    public void onOpen(Session session) {
        System.out.println("Connected to server");
        this.session = session;
        latch.countDown();
    }

    @OnMessage
    public void onText(String message, Session session) {
        System.out.println("Message received from server:" + message);
    }

    @OnClose
    public void onClose(CloseReason reason, Session session) {
        System.out.println("Closing a WebSocket due to " + reason.getReasonPhrase());
    }

    public CountDownLatch getLatch() {
        return latch;
    }

    public void sendMessage(String str) {
        try {
            session.getBasicRemote().sendText(str);
        } catch (IOException e) {

            e.printStackTrace();
        }
    }
}

Sending Message to Remote Endpoint

發送訊息有幾種方式

Blocking Send Message

在完成訊息發送後,該 method 才會 return

這是發送 binary message

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
    remote.sendBytes(buf);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}

這是發送 text message

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a TEXT message to remote endpoint
try
{
    remote.sendString("Hello World");
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}
發送 Partial Message

如果有個大訊息,希望切割成多個部分,可利用 partial message sending methods,最後一個的 isLast == true

binary message

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a BINARY message to remote endpoint
// Part 1
ByteBuffer buf1 = ByteBuffer.wrap(new byte[] { 0x11, 0x22 });
// Part 2 (last part)
ByteBuffer buf2 = ByteBuffer.wrap(new byte[] { 0x33, 0x44 });
try
{
    remote.sendPartialBytes(buf1,false);
    remote.sendPartialBytes(buf2,true); // isLast is true
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}

text message

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a TEXT message to remote endpoint
String part1 = "Hello";
String part2 = " World";
try
{
    remote.sendPartialString(part1,false);
    remote.sendPartialString(part2,true); // last part
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}
發送 Ping / Pong Control Frame

PING

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a PING to remote endpoint
String data = "You There?";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
    remote.sendPing(payload);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}

PONG

RemoteEndpoint remote = session.getRemote();

// Blocking Send of a PONG to remote endpoint
String data = "Yup, I'm here";
ByteBuffer payload = ByteBuffer.wrap(data.getBytes());
try
{
    remote.sendPong(payload);
}
catch (IOException e)
{
    e.printStackTrace(System.err);
}
發非同步訊息發送

有兩個 async send message methods

  • RemoteEndpoint.sendBytesByFuture(ByteBuffer message)
  • RemoteEndpoint.sendStringByFuture(String message)

會回傳 java.util.concurrent.Future,可用來測試是否有發送成功

binary

RemoteEndpoint remote = session.getRemote();

// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
remote.sendBytesByFuture(buf);

可利用 get 等待發送是否完成

RemoteEndpoint remote = session.getRemote();

// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
try
{
    Future<Void> fut = remote.sendBytesByFuture(buf);
    // wait for completion (forever)
    fut.get();
}
catch (ExecutionException | InterruptedException e)
{
    // Send failed
    e.printStackTrace();
}

可在 get 加上 timeout 時間

RemoteEndpoint remote = session.getRemote();

// Async Send of a BINARY message to remote endpoint
ByteBuffer buf = ByteBuffer.wrap(new byte[] { 0x11, 0x22, 0x33, 0x44 });
Future<Void> fut = null;
try
{
    fut = remote.sendBytesByFuture(buf);
    // wait for completion (timeout)
    fut.get(2,TimeUnit.SECONDS);
}
catch (ExecutionException | InterruptedException e)
{
    // Send failed
    e.printStackTrace();
}
catch (TimeoutException e)
{
    // timeout
    e.printStackTrace();
    if (fut != null)
    {
        // cancel the message
        fut.cancel(true);
    }
}

text 訊息跟 binary 類似,只是將 sendBytesByFuture 換成 sendStringByFuture

References

Jetty WebSocket Example

Chapter 27. Jetty Websocket API

沒有留言:

張貼留言