2025/01/13

Netty in Java 2

如何處理 Stream-based Transport

在 TCP/IP 的 stream-based transport 中,接收資料後,會存放在 socket receive buffer,但資料是一連串的 bytes,這代表說,即使用兩個訊息,以獨立的資料封包傳送,在 OS 也只會認為是一連串的 bytes,不代表解讀時也是這樣。

假設 OS TCP/IP 收到這三個資料包要傳送

ABC   DEF   GHI

在接收端可能會讀取到

AB    CDEFG    H   I

因此接收端必須自己整理 bytes 資料,並恢復為原本的狀態,才能正確解讀資料

ABC   DEF   GHI

方案1

在 Time client 的例子中,雖然 32 bits 資料很少,不太可能分片,但也有可能因為 traffic 增加,而出現這種狀況。

最簡單的方式,就是建立一個內部 buffer,累積到 4 bytes,就能繼續處理。

TimeClientHandler1.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler1 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 在 channel 產生時,建立一個 4 bytes buffer
        buf = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 當 channel 被移除時,就 release buffer
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        // 一次讀取一個 byte,寫入 ByteBuf
        buf.writeBytes(m);
        m.release();

        // 檢查是否已經累積到 4 bytes
        if (buf.readableBytes() >= 4) {
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

方案2

將訊息組合的部份拆開,移到 Decoder

TimeDecoder.java

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    // 每次收到資料時,都會用內部的 buffer,呼叫這個 method
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 當 in 不足 4 bytes 時,就不寫入到 out
        if (in.readableBytes() < 4) {
            return;
        }
        // 當 in 有 4 bytes
        out.add(in.readBytes(4));
    }
}

TimeDecoder1.java

TimeDecoder 可用另一種簡化的方式實作

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

// 改用 ReplayingDecoder,可簡化 TimeDecoder 的寫法
public class TimeDecoder1 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

TimeClient.java

修改 ChannelInitializer 的部分,加上 Decoder

package netty.time.stream;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

程式不變

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

POJO

改用 POJO java class,讓 protocol 解讀更清楚

UnixTime.java

package netty.time.pojo;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

修改 TimeDecoder

TimeDecoder.java

package netty.time.pojo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }

        out.add(new UnixTime(in.readUnsignedInt()));
    }
}

TimeClientHandler.java

package netty.time.pojo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time.pojo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

POJO Server

UnixTime.java

package netty.time.pojoserver;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeServerHandler.java

package netty.time.pojoserver;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeEncoder.java

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        // 將 ChannelPromise 傳給 Netty,讓 netty 可決定 success or failure
        ctx.write(encoded, promise);
    }
}

TimeEncoder1.java

簡化寫法

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder1 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

TimeServer.java

package netty.time.pojoserver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

References

Netty.docs: User guide for 4.x

2025/01/06

Netty in Java 1

Netty是一個 non-blokcing I/O socket framework,主要用於開發網路應用程式。非同步事件驅動的框架和工具可簡化程式開發。Netty最初由JBoss開發,現在由Netty項目社區開發和維護。Netty還支援了HTTP、HTTP2、DNS及其他協定,WebSockets、Google Protocol Buffers、支援 SSL/TLS 以及支援用於SPDY協定和訊息壓縮。

早期 Java Socket framework 會使用 Mina 這個 library,但因為 netty 跟 mina 作者相同,且 mina 已經很久都沒有在維護了,所以目前大部分的文章都建議要使用 netty。不過討論中有提到一個最大的不同點,在 UDP 的處理部分,mina 跟 netty 有不同的作法,mina 有高階的封裝,可讓 connection less 的 UDP 連線,使用起來很像有連線的狀況,netty 是比較貼近原本的 UDP,保持了 connection less 的特性。

另外在使用 netty 之前,要注意使用了哪一個版本的 netty。根據 Remove master branch · Issue #4466 · netty/netty · GitHub 的討論,由於 netty 5 開發時,發現新的作法增加了城市的複雜度,但卻沒有帶來明顯的效能提升,所以 netty 5 目前是被放棄的狀態,建議還是要使用 4.1 版,4.1 版的 user guide 在 Netty.docs: User guide for 4.x

netty 的 libary 切割為以下這些部分

Core 是核心,Protocol Support 是在 socket 的上層的通訊協定,Transport Support 則是資料傳輸,也就是 Socket/Datagram、HTTP Tunnel 或 In-VM Pipe,這幾個都是實際傳輸資料的實作。

maven

使用netty最簡單的方式是引用所有 netty 的 libary,可引用 netty-all

<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.111.Final</version>
</dependency>

DISCARD

網路協定最簡單的是 DISCARD,server side 不管 client 送什麼資料,都會直接丟棄,不做任何回應。

DiscardServer.java

package netty.discard;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Discards any incoming data.
 */
public class DiscardServer {

    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // ChannelInitializer 用來設定新的 Channel
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync();

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new DiscardServer(port).run();
    }
}

DiscardServerHandler.java

package netty.discard;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

// ChannelInboundHandlerAdapter 實作了 ChannelInboundHandler 介面所有 methods
// DiscardServerHandler 只需要 繼承 ChannelInboundHandlerAdapter
// 就可以只 override 必要的 methods
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {

    // channelRead 會在收到 message 時被呼叫
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // Discard the received data silently.
//        ((ByteBuf) msg).release(); // msg 必須要呼叫 release 釋放記憶體
//
//        channelRead 通常會用以下的方式實作內容
//        try {
//            // Do something with msg
//        } finally {
//            ReferenceCountUtil.release(msg);
//        }
        ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }

        // 上面的 while loop 可替代為以下寫法
        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
        in.release();
    }

//    // 上面的 while loop 可替代為以下寫法
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ByteBuf in = (ByteBuf) msg;
//        System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
//        in.release();
//    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        // 發生 IO error 或 handler 實作有問題時,會產生 exception,由這個 method 處理
        cause.printStackTrace();
        ctx.close();
    }
}

測試

啟動 DiscardServer 後,可用 telnet/nc 測試

# 等同 telnet 127.0.0.1 8080
nc -nvv 127.0.0.1 8080

ECHO

echo 協定基於 discard 做一些修改,在 echo server 收到 client 發送的資料後,會直接將收到的資料回傳給 client。

EchoServer.java

package netty.echo;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class EchoServer {

    private int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new EchoServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new EchoServer(port).run();
    }
}

EchoServerHandler.java

package netty.echo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

/**
 * Handles a server-side channel.
 */
public class EchoServerHandler extends ChannelInboundHandlerAdapter { // (1)

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // ChannelHandlerContext ctx 可驅動 I/O
        // 這邊將收到的 msg 透過 ctx 寫回 channel,但不需要 release msg
        // 因為 netty 會在寫入時,自動 release msg
        ctx.write(msg);
        ctx.flush();
        // ctx.write 不是直接寫到網路上,而是先放到 buffer,然後再 flush
    }

//    // 可改用 writeAndFlush
//    @Override
//    public void channelRead(ChannelHandlerContext ctx, Object msg) {
//        ctx.writeAndFlush(msg);
//    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}

測試

一樣用 nc 去測試

nc -nvv 127.0.0.1 8080

TIME

TIME protocol 的 server 會在 client 連線後發送一個 32 bits 整數,然後就直接關閉連線。

因為 server 必須忽略 client 發送的所有資料,所以不能用 channelRead() 要改用 channelActive()

TimeServer.java

package netty.time;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

TimeServerHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(final ChannelHandlerContext ctx) {
        // channelActive 會在 connection 建立後,就被呼叫
        // 先取得一塊 32 bits integer (4 bytes) 的 ByteBuf,然後將現在的時間填進去
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        // 這邊不需要跟 NIO 的 ByteBuffer 一樣,在發送前,呼叫 java.nio.ByteBuffer.flip()
        // 因為 ByteBuf 已經自動處理了 read/write,會使用兩種不同的 pointer (index)
        final ChannelFuture f = ctx.writeAndFlush(time);
        // ChannelHandlerContext.write() 跟 writeAndFlush 會回傳 ChannelFuture
        // 代表 Netty 的 IO operation 是非同步的
        // 必須用非同步的方式,確認 future 已經完成,才能將 context 關閉
        f.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) {
                assert f == future;
                ctx.close();
            }
        });  // 這邊可以簡化為這種寫法
        // f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

References

Netty.docs: User guide for 4.x

netty/example/src/main/java/io/netty/example at 4.1 · netty/netty · GitHub

2024/12/23

Mockito: mock static final method

以下記錄 Mockito 在處理 static method,以及 final class/method 的方法。

準備

先準備一個要被 mock 的類別

import java.util.UUID;

public class DataUtils {
    public static String getUuid() {
//        return UUID.randomUUID().toString();
        return "UUID";
    }

    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public final int finalMethod() {
        return 1;
    }
    public String multiply(String a, int times) {
        return a.repeat( times );
    }
}

mock static method

  • 沒有參數的 static method

Mockito.mockStatic(Class classToMock)

注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面

    @Test
    // mock 沒有參數的 static method 的方法
    public void mock_test1() {
        assertEquals("UUID", DataUtils.getUuid());

        // Mockito.mockStatic(Class<T> classToMock)
        // 注意 MockedStatic 的 scope 問題,必須要放在 try-with-resources 裡面
        // MockedStatic 必須要在 block 結束時,自動回收
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(DataUtils::getUuid).thenReturn("Custom UUID");
            assertEquals("Custom UUID", DataUtils.getUuid());
        }

        assertEquals("UUID", DataUtils.getUuid());
    }
  • mock 有參數的 static method
    @Test
    // mock 有參數的 static method
    public void mock_test2() {
        assertEquals("A:B", DataUtils.concat("A", "B"));

        // Mockito.mockStatic(Class<T> classToMock)
        try (MockedStatic<DataUtils> utils = mockStatic(DataUtils.class)) {
            utils.when(() -> DataUtils.concat("A", "B"))
                    .thenReturn( "A-B" );
            assertEquals("A-B", DataUtils.concat("A", "B"));
        }

        assertEquals("A:B", DataUtils.concat("A", "B"));
    }
  • MockitoException

static mocking is already registered in the current thread 的 exception

    @Test
    public void mock_test3() {
        // 將 utils 再一次 建立一個 mockStatic 物件時,會發生 exception
//        org.mockito.exceptions.base.MockitoException:
//        For mock.DataUtils, static mocking is already registered in the current thread
//        To create a new mock, the existing static mock registration must be deregistered
        assertThrows(MockitoException.class, () -> {
            MockedStatic<DataUtils> utils = mockStatic(DataUtils.class);
            utils = mockStatic(DataUtils.class);
        });
    }
  • JUnit @Before @After

利用 JUnit 的 @Before @After,在每一次執行 @Test 的前後,處理 MockedStatic

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.junit.Assert.*;
import static org.mockito.Mockito.mockStatic;

public class DataUtilsMockitoTest2 {
    private MockedStatic<DataUtils> mockStatic;

    // @Before 會在每一次 @Test 之前被執行
    @Before
    public void before() {
        // Registering a static mock for UserService before each test
        mockStatic = mockStatic(DataUtils.class);
    }

    // @After 會在每一次 @Test 之後被執行
    @After
    public void after() {
        // Closing the mockStatic after each test
        mockStatic.close();
    }

    @Test
    public void mock_test4_1() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A-B" );
        assertEquals("A-B", DataUtils.concat("A", "B"));
    }

    @Test
    public void mock_test4_2() {
        assertTrue(Mockito.mockingDetails(DataUtils.class).isMock());
        mockStatic.when(() -> DataUtils.concat("A", "B"))
                .thenReturn( "A*B" );
        assertEquals("A*B", DataUtils.concat("A", "B"));
    }
}

mock final class/method

先準備一個 final class

public final class DataUtilsFinal extends DataUtils {
    public static String concat(String a, String b) {
        return a+":"+b;
    }

    public String multiply(String a, int times) {
        return a.repeat( times-1 );
    }
}
  • final method
    @Test
    public void mock_test1() {
        // 直接用 mock 就可以使用 finalMethod
        DataUtils dataUtils = new DataUtils();
        DataUtils dataUtilsMock = mock(DataUtils.class);
        when(dataUtilsMock.finalMethod()).thenReturn(0);

        assertEquals(1, dataUtils.finalMethod());
        assertEquals(0, dataUtilsMock.finalMethod());
    }
  • final class
    @Test
    public void mock_test2() {
        // final class
        DataUtilsFinal mock = mock(DataUtilsFinal.class);
        when(mock.multiply("a", 2)).thenReturn("a");

        assertEquals("a", mock.multiply("a", 2));
    }

References

Mocking Static Methods With Mockito | Baeldung

Mock Final Classes and Methods with Mockito | Baeldung