如何處理 Stream-based Transport
在 TCP/IP 的 stream-based transport 中,接收資料後,會存放在 socket receive buffer,但資料是一連串的 bytes,這代表說,即使用兩個訊息,以獨立的資料封包傳送,在 OS 也只會認為是一連串的 bytes,不代表解讀時也是這樣。
假設 OS TCP/IP 收到這三個資料包要傳送
ABC DEF GHI
在接收端可能會讀取到
AB CDEFG H I
因此接收端必須自己整理 bytes 資料,並恢復為原本的狀態,才能正確解讀資料
ABC DEF GHI
方案1
在 Time client 的例子中,雖然 32 bits 資料很少,不太可能分片,但也有可能因為 traffic 增加,而出現這種狀況。
最簡單的方式,就是建立一個內部 buffer,累積到 4 bytes,就能繼續處理。
TimeClientHandler1.java
package netty.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeClientHandler1 extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// 在 channel 產生時,建立一個 4 bytes buffer
buf = ctx.alloc().buffer(4);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
// 當 channel 被移除時,就 release buffer
buf.release();
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
// 一次讀取一個 byte,寫入 ByteBuf
buf.writeBytes(m);
m.release();
// 檢查是否已經累積到 4 bytes
if (buf.readableBytes() >= 4) {
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
方案2
將訊息組合的部份拆開,移到 Decoder
TimeDecoder.java
package netty.time.stream;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {
// 每次收到資料時,都會用內部的 buffer,呼叫這個 method
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 當 in 不足 4 bytes 時,就不寫入到 out
if (in.readableBytes() < 4) {
return;
}
// 當 in 有 4 bytes
out.add(in.readBytes(4));
}
}
TimeDecoder1.java
TimeDecoder 可用另一種簡化的方式實作
package netty.time.stream;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
// 改用 ReplayingDecoder,可簡化 TimeDecoder 的寫法
public class TimeDecoder1 extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
TimeClient.java
修改 ChannelInitializer 的部分,加上 Decoder
package netty.time.stream;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
if (args.length > 0) {
host = args[0];
port = Integer.parseInt(args[1]);
}
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// client 要使用 Bootstrap,跟 ServerBootstrap 不同
Bootstrap b = new Bootstrap();
// client 只需要一個 worker EventLoopGroup
b.group(workerGroup);
// client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
b.channel(NioSocketChannel.class);
// 不要使用 childOption
b.option(ChannelOption.SO_KEEPALIVE, true);
// b.handler(new ChannelInitializer<SocketChannel>() {
// @Override
// public void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new TimeClientHandler());
// }
// });
// 要在 TimeClientHandler 之前,使用 TimeDecoder
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
// Start the client.
// 呼叫 connect 而不是 bind
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
TimeClientHandler.java
程式不變
package netty.time.stream;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 收到的資料是 ByteBuf
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
POJO
改用 POJO java class,讓 protocol 解讀更清楚
UnixTime.java
package netty.time.pojo;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
修改 TimeDecoder
TimeDecoder.java
package netty.time.pojo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
}
TimeClientHandler.java
package netty.time.pojo;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TimeClient.java
package netty.time.pojo;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
if (args.length > 0) {
host = args[0];
port = Integer.parseInt(args[1]);
}
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// client 要使用 Bootstrap,跟 ServerBootstrap 不同
Bootstrap b = new Bootstrap();
// client 只需要一個 worker EventLoopGroup
b.group(workerGroup);
// client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
b.channel(NioSocketChannel.class);
// 不要使用 childOption
b.option(ChannelOption.SO_KEEPALIVE, true);
// b.handler(new ChannelInitializer<SocketChannel>() {
// @Override
// public void initChannel(SocketChannel ch) throws Exception {
// ch.pipeline().addLast(new TimeClientHandler());
// }
// });
// 要在 TimeClientHandler 之前,使用 TimeDecoder
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
// Start the client.
// 呼叫 connect 而不是 bind
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
POJO Server
UnixTime.java
package netty.time.pojoserver;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
TimeServerHandler.java
package netty.time.pojoserver;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TimeEncoder.java
package netty.time.pojoserver;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
// 將 ChannelPromise 傳給 Netty,讓 netty 可決定 success or failure
ctx.write(encoded, promise);
}
}
TimeEncoder1.java
簡化寫法
package netty.time.pojoserver;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class TimeEncoder1 extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
TimeServer.java
package netty.time.pojoserver;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws Exception {
// NioEventLoopGroup 是 multithread event loop,處理 IO operation
// 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
// 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 是 helper class,可設定 server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
}
})
// 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
// option 是設定 NioServerSocketChannel 的參數
.option(ChannelOption.SO_BACKLOG, 128)
// childOption 是設定 NioSocketChannel 的參數
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
// 綁定 TCP Port
ChannelFuture f = b.bind(port).sync(); // (7)
// 這邊開始會等待所有 server socket 關閉
// 但這個例子不會發生這種狀況
// 如果要以正常方式關閉 server,可呼叫以下 method
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TimeServer(port).run();
}
}
References
Netty.docs: User guide for 4.x