2025/02/17

Queue in Java

Queue 是 java.util package 裡面的 Collection framwork 中的其中一個介面,主要是定義 First-in-First-out FIFO queue 這種介面。除了基本的 Collection 操作方法以外,Queue 還提供了專屬的 insert/get/inspect method。

throw Exception return false/null
insert add(e) offer(e)
remove remove() poll()
examine element() peek()
    @Test
    public void queue_test1() {
        Queue<String> queue = new LinkedList<>();
        queue.add("one");
        queue.add("two");
        queue.add("three");
        assertEquals("[one, two, three]", queue.toString());

        queue.remove("two");
        assertEquals("[one, three]", queue.toString());

        String element = queue.element();
        assertEquals("one", element);
        assertEquals("[one, three]", queue.toString());

        // To empty the queue
        queue.clear();
        queue.offer("one");
        queue.offer("two");
        queue.offer("three");
        assertEquals("[one, two, three]", queue.toString());

        // poll 是取得 queue 的第一個 element
        String pollElement = queue.poll();
        assertEquals("one", pollElement);
        assertEquals("[two, three]", queue.toString());

        // peek 是取得 queue 的第一個 element,但只是偷看,不會從 queue 移除該 element
        String peakElement = queue.peek();
        assertEquals("two", peakElement);
        assertEquals("[two, three]", queue.toString());
    }

    @Test
    public void queue_test2() {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        queue.add("one");
        queue.add("two");
        // offer 在 insert 超過 Queue 容量時,會產生 exception
        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
            queue.add("three");
        });

        // offer 在 insert 超過 Queue 容量時,不會產生 exception
        // 只是回傳一個 true/false flag 代表這個 insert 有沒有成功
        queue.clear();
        queue.offer("one");
        assertTrue( queue.offer("two") );
        assertFalse( queue.offer("three") );
        assertEquals("[one, two]", queue.toString());

        queue.clear();
        // remove, element 在 Queue 沒有任何資料時,會產生 exception
        NoSuchElementException exception2 = assertThrows(NoSuchElementException.class, () -> {
            queue.remove();
        });
        NoSuchElementException exception3 = assertThrows(NoSuchElementException.class, () -> {
            queue.element();
        });
        // poll, peek 會在 Queue 為空的時候,回傳 null
        assertNull(queue.poll());
        assertNull(queue.peek());
    }

sub interfaces

Queue 有三個主要的子介面: Blocking Queue, Transfer Queue, Deque

Blocking Queue

增加 methods,可強制 threads 等待 queue,例如在取得 queue 的元素時,可一直等待 queue 裡面有元素才回傳。或是可以等待 queue 清空後,再新增元素時。

Blocking Queue 的實作包含了 LinkedBlockingQueue, SynchronousQueue 及ArrayBlockingQueue

除了既有的 add(), offer() 以外,另外還有

  • put()

    insert 一個元素,等待 queue 有空間才能 put 進去

  • offer(E e, long timeout, TimeUnit unit)

    insert 一個元素,等待 queue 有空間才能 put 進去,等待的時間有 timeout 機制

remove 部分除了既有的 remove(), poll() 以外,還有

  • take()

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得

  • poll(long timeout, TimeUnit int)

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得,等待的時間有 timeout 機制

import java.util.Random;
import java.util.concurrent.BlockingQueue;

class Producer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    private int limit;

    Producer(BlockingQueue<Integer> blockingQueue, int limit) {
        this.blockingQueue = blockingQueue;
        this.limit = limit;
    }

    public void run() {
        Random random = new Random();
        for(int i = 1; i <= limit; i++) {
            try {
                // random 放入 1/2 個 integer
                int randomProducer = random.nextInt(2);
//                System.out.println("randomProducer=" + randomProducer);
                for(int j = 0; j <= randomProducer; j++) {
                    System.out.println("Producer put " + (i+j));
                    blockingQueue.put((i+j)); // to produce data
                }
                i = i+randomProducer;
                // produce data with an interval of 0.5 sec
                Thread.sleep(500);
            } catch (InterruptedException exp) {
                System.out.println("An interruption occurred at Producer");
            }
        }
    }
}

Consumer.java

import java.util.concurrent.BlockingQueue;

class Consumer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    Consumer(BlockingQueue<Integer> blockingQueue) { // constructor
        this.blockingQueue = blockingQueue;
    }
    public void run() { // overriding run method
        try {
            while (true) {
                Integer elem = blockingQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        }
        // to handle exception
        catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

CPTest.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class CPTest {
    public static void main(String[] args) throws InterruptedException {
        // create an object of BlockingQueue
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(blockingQueue, 20);
        Consumer threadCon = new Consumer(blockingQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

執行結果

Producer put 1
Producer put 2
Consumer take 1
Consumer take 2
Producer put 3
Consumer take 3
Producer put 4
Consumer take 4
Producer put 5
Consumer take 5

Transfer Queue

extends BlockingQueue 介面,並套用 producer-consumer pattern,可控制 producer 到 consumer 資料流動的速度。

Transfer Queue 的實作包含了 LinkedTrasferQueue。

Producer.java

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

class Producer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    private int limit;

    Producer(TransferQueue<Integer> transferQueue, int limit) {
        this.transferQueue = transferQueue;
        this.limit = limit;
    }

    public void run() {
        for(int i = 1; i <= limit; i++) {
            try {
                System.out.println("Producer put " + i);
                boolean added = transferQueue.tryTransfer(i, 4000, TimeUnit.MILLISECONDS);
                if( !added ) {
                    i = i-1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer.java

import java.util.concurrent.TransferQueue;

class Consumer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    Consumer(TransferQueue<Integer> transferQueue) { // constructor
        this.transferQueue = transferQueue;
    }
    public void run() {
        try {
            while (true) {
                Integer elem = transferQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        } catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

TransferQueueTest.java

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueueTest {
    public static void main(String[] args) throws InterruptedException {
        TransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(transferQueue, 5);
        Consumer threadCon = new Consumer(transferQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

Deque

Deque 是 Double-Ended Queue 的縮寫,也就是雙向的 Queue,頭尾都可以 insert/get 資料

Deque 的實作包含了 ArrayDeque。

Operation Method Method throwing Exception
Insertion from Head offerFirst(e) addFirst(e)
Removal from Head pollFirst() removeFirst()
Retrieval from Head peekFirst() getFirst()
Insertion from Tail offerLast(e) addLast(e)
Removal from Tail pollLast() removeLast()
Retrieval from Tail peekLast() getLast()

測試程式

    @Test
    public void deque_test() {
        // Deque as Stack
        Deque<String> stack = new ArrayDeque<>();
        stack.push("one");
        stack.push("two");
        assertEquals("two", stack.getFirst());
        assertEquals("two", stack.pop());
        stack.pop();
        NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> {
            stack.pop();
        });

        // Deque as Queue
        Deque<String> queue = new ArrayDeque<>();
        queue.offer("one");
        queue.offer("two");
        assertEquals("two", queue.getLast());
        assertEquals("one", queue.poll());
        queue.poll();
        assertNull(queue.poll());
    }

Priority Queue

新的元素要加入 PriorityQueue 時,會立刻以 natural order 或是已經定義的 Comparator 排序

    @Test
    public void priority_queue_test() {
        PriorityQueue<String> integerQueue = new PriorityQueue<>();

        integerQueue.add("one");
        integerQueue.add("two");
        integerQueue.add("three");

        String first = integerQueue.poll();
        String second = integerQueue.poll();
        String third = integerQueue.poll();

        assertEquals("one", first);
        assertEquals("three", second);
        assertEquals("two", third);
    }

Reference

Guide to the Java Queue Interface | Baeldung

Java Queue – Queue in Java | DigitalOcean

2025/01/20

afick

afick: another file integrity checker Files 是檔案檢查工具,可監控是否有檔案被異動。

install

afick 試用 perl 開發的,故安裝前要先安裝 perl

dnf -y intall perl

根據 [install 文件](afick installation) 的說明,可以到 another file integrity checker - Browse /afick/3.8.0 at SourceForge.net 下載套件或 source code。

Rocky Linux 安裝方式,是安裝 rpm file

rpm -ivh afick-3.8.0-1.noarch.rpm

也可以直接從 source code 編譯安裝。參考 source code 裡面的 INSTALL 文件的說明,編譯安裝步驟為

tar xvfz afick*.tgz
cd  afick*
perl Makefile.pl
make install

如果要直接安裝成 service,就改為以下步驟。會有一個 cronjob 放在 /etc/cron.daily 目錄

tar xvfz afick*.tgz
cd  afick*
perl Makefile.pl Makefile_sys.in
make install

設定

設定檔在 /etc/afick.conf,在設定檔最後面加上要監控的目錄,例如

/var/www/html DIR

使用

# 初始化 afick
afick -c /etc/afick.conf -init

# 監控並檢查檔案
afick -c /etc/afick.conf -k

# 檢查檔案並更新資料庫
afick -c afick.conf --update

在執行初始化時,花了不少時間,25 萬個檔案,大約用了 10 分鐘

afick -c /etc/afick.conf -init

# #################################################################
# MD5 hash of /var/lib/afick/afick => LXVxgXA/BosPirqMhDpowg

# Hash database created successfully. 251267 files entered.
# user time : 88.79; system time : 32.05; real time : 625

修改兩個檔案內容後,測試檢查檔案。

檢查時也將 afick.conf 納入檢查範圍。因預設 exclude_suffix 把 html, htm 排除了,所以用 css file 測試。

afick -c /etc/afick.conf -k

# archive:=/var/lib/afick/archive
# database:=/var/lib/afick/afick
# exclude_suffix:=log LOG html htm HTM txt TXT xml hlp pod chm tmp old bak fon ttf TTF bmp BMP jpg JPG gif png ico wav WAV mp3 avi pyc
# history:=/var/lib/afick/history
# max_checksum_size:=10000000
# running_files:=1
# timing:=1
# dbm:=Storable
# last run on 2024/08/12 10:17:35 with afick version 3.8.0
WARNING: (control) afick internal change : /etc/afick.conf (see below)

# summary changes
deleted directory : /
    number of deleted files         : 1
changed file : /etc/afick.conf
changed file : /var/www/html/index.css

# detailed changes
deleted directory : /
    parent_date         : Thu Aug  1 14:23:12 2024
    number of deleted files         : 1
changed file : /etc/afick.conf
    md5         : 877b96dc1be6083fd4589a96a2767006    f604ce2893a4bda0750b6564c84020b9
    filesize         : 7268    7269
changed file : /var/www/html/index.css
    inode         : 402693690    402705600
# #################################################################
# MD5 hash of /var/lib/afick/afick => ddgOifAlUpJfbRakzwY9tQ

# Hash database : 251265 files scanned, 4 changed (new : 0; delete : 2; changed : 2; dangling : 16; exclude_suffix : 26665; exclude_prefix : 0; exclude_re : 0; masked : 0; degraded : 244)
# user time : 104.85; system time : 20.72; real time : 383

afick_cron

在 /etc/cron.daily/afick_cron 裡面,ACTION 參數決定,cronjob 檢查檔案後,是否要更新資料庫

# the default action is "update" (-u), you can also use "compare" (-k)
ACTION="-u"

References

不只是資安: [工具介紹] Linux 下的檔案完整性偵測工具 - afick

CentOS 7 安裝 AFICK – 檔案安全監控 (更新內容 – 2018/12/12) – Ken Wu

2025/01/13

Netty in Java 2

如何處理 Stream-based Transport

在 TCP/IP 的 stream-based transport 中,接收資料後,會存放在 socket receive buffer,但資料是一連串的 bytes,這代表說,即使用兩個訊息,以獨立的資料封包傳送,在 OS 也只會認為是一連串的 bytes,不代表解讀時也是這樣。

假設 OS TCP/IP 收到這三個資料包要傳送

ABC   DEF   GHI

在接收端可能會讀取到

AB    CDEFG    H   I

因此接收端必須自己整理 bytes 資料,並恢復為原本的狀態,才能正確解讀資料

ABC   DEF   GHI

方案1

在 Time client 的例子中,雖然 32 bits 資料很少,不太可能分片,但也有可能因為 traffic 增加,而出現這種狀況。

最簡單的方式,就是建立一個內部 buffer,累積到 4 bytes,就能繼續處理。

TimeClientHandler1.java

package netty.time;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler1 extends ChannelInboundHandlerAdapter {
    private ByteBuf buf;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 在 channel 產生時,建立一個 4 bytes buffer
        buf = ctx.alloc().buffer(4);
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 當 channel 被移除時,就 release buffer
        buf.release();
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf m = (ByteBuf) msg;
        // 一次讀取一個 byte,寫入 ByteBuf
        buf.writeBytes(m);
        m.release();

        // 檢查是否已經累積到 4 bytes
        if (buf.readableBytes() >= 4) {
            long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

方案2

將訊息組合的部份拆開,移到 Decoder

TimeDecoder.java

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    // 每次收到資料時,都會用內部的 buffer,呼叫這個 method
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 當 in 不足 4 bytes 時,就不寫入到 out
        if (in.readableBytes() < 4) {
            return;
        }
        // 當 in 有 4 bytes
        out.add(in.readBytes(4));
    }
}

TimeDecoder1.java

TimeDecoder 可用另一種簡化的方式實作

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;

import java.util.List;

// 改用 ReplayingDecoder,可簡化 TimeDecoder 的寫法
public class TimeDecoder1 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(
            ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

TimeClient.java

修改 ChannelInitializer 的部分,加上 Decoder

package netty.time.stream;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

TimeClientHandler.java

程式不變

package netty.time.stream;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Date;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 收到的資料是 ByteBuf
        ByteBuf m = (ByteBuf) msg;
        try {
            long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
            System.out.println(new Date(currentTimeMillis));
            ctx.close();
        } finally {
            m.release();
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

POJO

改用 POJO java class,讓 protocol 解讀更清楚

UnixTime.java

package netty.time.pojo;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

修改 TimeDecoder

TimeDecoder.java

package netty.time.pojo;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

// 處理碎片問題
public class TimeDecoder extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) {
            return;
        }

        out.add(new UnixTime(in.readUnsignedInt()));
    }
}

TimeClientHandler.java

package netty.time.pojo;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        UnixTime m = (UnixTime) msg;
        System.out.println(m);
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeClient.java

package netty.time.pojo;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class TimeClient {
    public static void main(String[] args) throws Exception {

        String host = "localhost";
        int port = 8080;
        if (args.length > 0) {
            host = args[0];
            port = Integer.parseInt(args[1]);
        }
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // client 要使用 Bootstrap,跟 ServerBootstrap 不同
            Bootstrap b = new Bootstrap();
            // client 只需要一個 worker EventLoopGroup
            b.group(workerGroup);
            // client 要改用 NioSocketChannel,跟 NioServerSocketChannel 不同
            b.channel(NioSocketChannel.class);
            // 不要使用 childOption
            b.option(ChannelOption.SO_KEEPALIVE, true);
//            b.handler(new ChannelInitializer<SocketChannel>() {
//                @Override
//                public void initChannel(SocketChannel ch) throws Exception {
//                    ch.pipeline().addLast(new TimeClientHandler());
//                }
//            });
            // 要在 TimeClientHandler 之前,使用 TimeDecoder
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
                }
            });

            // Start the client.
            // 呼叫 connect 而不是 bind
            ChannelFuture f = b.connect(host, port).sync();

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

POJO Server

UnixTime.java

package netty.time.pojoserver;

import java.util.Date;

public class UnixTime {

    private final long value;

    public UnixTime() {
        this(System.currentTimeMillis() / 1000L + 2208988800L);
    }

    public UnixTime(long value) {
        this.value = value;
    }

    public long value() {
        return value;
    }

    @Override
    public String toString() {
        return new Date((value() - 2208988800L) * 1000L).toString();
    }
}

TimeServerHandler.java

package netty.time.pojoserver;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class TimeServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

TimeEncoder.java

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class TimeEncoder extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        UnixTime m = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)m.value());
        // 將 ChannelPromise 傳給 Netty,讓 netty 可決定 success or failure
        ctx.write(encoded, promise);
    }
}

TimeEncoder1.java

簡化寫法

package netty.time.pojoserver;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class TimeEncoder1 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
        out.writeInt((int)msg.value());
    }
}

TimeServer.java

package netty.time.pojoserver;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class TimeServer {

    private int port;

    public TimeServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // NioEventLoopGroup 是 multithread event loop,處理 IO operation
        // 第一個 NioEventLoopGroup 處理 incoming connection,稱為 boss
        // 第二個 NioEventLoopGroup 處理已接收連線的 traffic,稱為 worker
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            // ServerBootstrap 是 helper class,可設定 server
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 接受 incoming connection
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new TimeEncoder(), new TimeServerHandler());
                        }
                    })
                    // 設定 ChannelOption, ChannelConfig 可取得所有可設定的參數
                    // option 是設定 NioServerSocketChannel 的參數
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // childOption 是設定 NioSocketChannel 的參數
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // Bind and start to accept incoming connections.
            // 綁定 TCP Port
            ChannelFuture f = b.bind(port).sync(); // (7)

            // 這邊開始會等待所有 server socket 關閉
            // 但這個例子不會發生這種狀況
            // 如果要以正常方式關閉 server,可呼叫以下 method
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        }

        new TimeServer(port).run();
    }
}

References

Netty.docs: User guide for 4.x