Netty是一個 non-blokcing I/O socket framework,主要用於開發網路應用程式。非同步事件驅動的框架和工具可簡化程式開發。Netty最初由JBoss開發,現在由Netty項目社區開發和維護。Netty還支援了HTTP、HTTP2、DNS及其他協定,WebSockets、Google Protocol Buffers、支援 SSL/TLS 以及支援用於SPDY協定和訊息壓縮。
早期 Java Socket framework 會使用 Mina 這個 library,但因為 netty 跟 mina 作者相同,且 mina 已經很久都沒有在維護了,所以目前大部分的文章都建議要使用 netty。不過討論中有提到一個最大的不同點,在 UDP 的處理部分,mina 跟 netty 有不同的作法,mina 有高階的封裝,可讓 connection less 的 UDP 連線,使用起來很像有連線的狀況,netty 是比較貼近原本的 UDP,保持了 connection less 的特性。
另外在使用 netty 之前,要注意使用了哪一個版本的 netty。根據 Remove master branch · Issue #4466 · netty/netty · GitHub 的討論,由於 netty 5 開發時,發現新的作法增加了城市的複雜度,但卻沒有帶來明顯的效能提升,所以 netty 5 目前是被放棄的狀態,建議還是要使用 4.1 版,4.1 版的 user guide 在 Netty.docs: User guide for 4.x
netty 的 libary 切割為以下這些部分
Core 是核心,Protocol Support 是在 socket 的上層的通訊協定,Transport Support 則是資料傳輸,也就是 Socket/Datagram、HTTP Tunnel 或 In-VM Pipe,這幾個都是實際傳輸資料的實作。
maven
使用netty最簡單的方式是引用所有 netty 的 libary,可引用 netty-all
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.111.Final</version>
</dependency>
DISCARD
網路協定最簡單的是 DISCARD,server side 不管 client 送什麼資料,都會直接丟棄,不做任何回應。
DiscardServer.java
package netty.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
// NioEventLoopGroup 是 multithread event loop,處理 IO operation
// 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
EventLoopGroup bossGroup = new NioEventLoopGroup();
// 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 是 helper class,可設定 server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
.childHandler(new ChannelInitializer<SocketChannel>() {
// ChannelInitializer 用來設定新的 Channel
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
// 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
// option 是設定 NioServerSocketChannel 的參數
.option(ChannelOption.SO_BACKLOG, 128)
// childOption 是設定 NioSocketChannel 的參數
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
// 綁定 TCP Port
ChannelFuture f = b.bind(port).sync();
// 這邊開始會等待所有 server socket 關閉
// 但這個例子不會發生這種狀況
// 如果要以正常方式關閉 server,可呼叫以下 method
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
DiscardServerHandler.java
package netty.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
// ChannelInboundHandlerAdapter 實作了 ChannelInboundHandler 介面所有 methods
// DiscardServerHandler 只需要 繼承 ChannelInboundHandlerAdapter
// 就可以只 override 必要的 methods
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
// channelRead 會在收到 message 時被呼叫
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Discard the received data silently.
// ((ByteBuf) msg).release(); // msg 必須要呼叫 release 釋放記憶體
//
// channelRead 通常會用以下的方式實作內容
// try {
// // Do something with msg
// } finally {
// ReferenceCountUtil.release(msg);
// }
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) {
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}
// 上面的 while loop 可替代為以下寫法
System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
in.release();
}
// // 上面的 while loop 可替代為以下寫法
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ByteBuf in = (ByteBuf) msg;
// System.out.print(in.toString(io.netty.util.CharsetUtil.US_ASCII));
// in.release();
// }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
// 發生 IO error 或 handler 實作有問題時,會產生 exception,由這個 method 處理
cause.printStackTrace();
ctx.close();
}
}
測試
啟動 DiscardServer 後,可用 telnet/nc 測試
# 等同 telnet 127.0.0.1 8080
nc -nvv 127.0.0.1 8080
ECHO
echo 協定基於 discard 做一些修改,在 echo server 收到 client 發送的資料後,會直接將收到的資料回傳給 client。
EchoServer.java
package netty.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class EchoServer {
private int port;
public EchoServer(int port) {
this.port = port;
}
public void run() throws Exception {
// NioEventLoopGroup 是 multithread event loop,處理 IO operation
// 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
// 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 是 helper class,可設定 server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
// 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
// option 是設定 NioServerSocketChannel 的參數
.option(ChannelOption.SO_BACKLOG, 128)
// childOption 是設定 NioSocketChannel 的參數
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
// 綁定 TCP Port
ChannelFuture f = b.bind(port).sync(); // (7)
// 這邊開始會等待所有 server socket 關閉
// 但這個例子不會發生這種狀況
// 如果要以正常方式關閉 server,可呼叫以下 method
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new EchoServer(port).run();
}
}
EchoServerHandler.java
package netty.echo;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* Handles a server-side channel.
*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ChannelHandlerContext ctx 可驅動 I/O
// 這邊將收到的 msg 透過 ctx 寫回 channel,但不需要 release msg
// 因為 netty 會在寫入時,自動 release msg
ctx.write(msg);
ctx.flush();
// ctx.write 不是直接寫到網路上,而是先放到 buffer,然後再 flush
}
// // 可改用 writeAndFlush
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) {
// ctx.writeAndFlush(msg);
// }
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
測試
一樣用 nc 去測試
nc -nvv 127.0.0.1 8080
TIME
TIME protocol 的 server 會在 client 連線後發送一個 32 bits 整數,然後就直接關閉連線。
因為 server 必須忽略 client 發送的所有資料,所以不能用 channelRead() 要改用 channelActive()
TimeServer.java
package netty.time;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class TimeServer {
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws Exception {
// NioEventLoopGroup 是 multithread event loop,處理 IO operation
// 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
// 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// ServerBootstrap 是 helper class,可設定 server
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用 NioServerSocketChannel 接受 incoming connection
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
// 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
// option 是設定 NioServerSocketChannel 的參數
.option(ChannelOption.SO_BACKLOG, 128)
// childOption 是設定 NioSocketChannel 的參數
.childOption(ChannelOption.SO_KEEPALIVE, true);
// Bind and start to accept incoming connections.
// 綁定 TCP Port
ChannelFuture f = b.bind(port).sync(); // (7)
// 這邊開始會等待所有 server socket 關閉
// 但這個例子不會發生這種狀況
// 如果要以正常方式關閉 server,可呼叫以下 method
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TimeServer(port).run();
}
}
TimeServerHandler.java
package netty.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) {
// channelActive 會在 connection 建立後,就被呼叫
// 先取得一塊 32 bits integer (4 bytes) 的 ByteBuf,然後將現在的時間填進去
final ByteBuf time = ctx.alloc().buffer(4);
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
// 這邊不需要跟 NIO 的 ByteBuffer 一樣,在發送前,呼叫 java.nio.ByteBuffer.flip()
// 因為 ByteBuf 已經自動處理了 read/write,會使用兩種不同的 pointer (index)
final ChannelFuture f = ctx.writeAndFlush(time);
// ChannelHandlerContext.write() 跟 writeAndFlush 會回傳 ChannelFuture
// 代表 Netty 的 IO operation 是非同步的
// 必須用非同步的方式,確認 future 已經完成,才能將 context 關閉
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // 這邊可以簡化為這種寫法
// f.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
TimeClient.java
package netty.time;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = "localhost";
int port = 8080;
if (args.length > 0) {
host = args[0];
port = Integer.parseInt(args[1]);
}
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// client 要使用 Bootstrap,跟 ServerBootstrap 不同
Bootstrap b = new Bootstrap();
// client 只需要一個 worker EventLoopGroup
b.group(workerGroup);
// client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
b.channel(NioSocketChannel.class);
// 不要使用 childOption
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
// 呼叫 connect 而不是 bind
ChannelFuture f = b.connect(host, port).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
TimeClientHandler.java
package netty.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 收到的資料是 ByteBuf
ByteBuf m = (ByteBuf) msg;
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
References
Netty.docs: User guide for 4.x
netty/example/src/main/java/io/netty/example at 4.1 · netty/netty · GitHub