2025/02/17

Queue in Java

Queue 是 java.util package 裡面的 Collection framwork 中的其中一個介面,主要是定義 First-in-First-out FIFO queue 這種介面。除了基本的 Collection 操作方法以外,Queue 還提供了專屬的 insert/get/inspect method。

throw Exception return false/null
insert add(e) offer(e)
remove remove() poll()
examine element() peek()
    @Test
    public void queue_test1() {
        Queue<String> queue = new LinkedList<>();
        queue.add("one");
        queue.add("two");
        queue.add("three");
        assertEquals("[one, two, three]", queue.toString());

        queue.remove("two");
        assertEquals("[one, three]", queue.toString());

        String element = queue.element();
        assertEquals("one", element);
        assertEquals("[one, three]", queue.toString());

        // To empty the queue
        queue.clear();
        queue.offer("one");
        queue.offer("two");
        queue.offer("three");
        assertEquals("[one, two, three]", queue.toString());

        // poll 是取得 queue 的第一個 element
        String pollElement = queue.poll();
        assertEquals("one", pollElement);
        assertEquals("[two, three]", queue.toString());

        // peek 是取得 queue 的第一個 element,但只是偷看,不會從 queue 移除該 element
        String peakElement = queue.peek();
        assertEquals("two", peakElement);
        assertEquals("[two, three]", queue.toString());
    }

    @Test
    public void queue_test2() {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
        queue.add("one");
        queue.add("two");
        // offer 在 insert 超過 Queue 容量時,會產生 exception
        IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
            queue.add("three");
        });

        // offer 在 insert 超過 Queue 容量時,不會產生 exception
        // 只是回傳一個 true/false flag 代表這個 insert 有沒有成功
        queue.clear();
        queue.offer("one");
        assertTrue( queue.offer("two") );
        assertFalse( queue.offer("three") );
        assertEquals("[one, two]", queue.toString());

        queue.clear();
        // remove, element 在 Queue 沒有任何資料時,會產生 exception
        NoSuchElementException exception2 = assertThrows(NoSuchElementException.class, () -> {
            queue.remove();
        });
        NoSuchElementException exception3 = assertThrows(NoSuchElementException.class, () -> {
            queue.element();
        });
        // poll, peek 會在 Queue 為空的時候,回傳 null
        assertNull(queue.poll());
        assertNull(queue.peek());
    }

sub interfaces

Queue 有三個主要的子介面: Blocking Queue, Transfer Queue, Deque

Blocking Queue

增加 methods,可強制 threads 等待 queue,例如在取得 queue 的元素時,可一直等待 queue 裡面有元素才回傳。或是可以等待 queue 清空後,再新增元素時。

Blocking Queue 的實作包含了 LinkedBlockingQueue, SynchronousQueue 及ArrayBlockingQueue

除了既有的 add(), offer() 以外,另外還有

  • put()

    insert 一個元素,等待 queue 有空間才能 put 進去

  • offer(E e, long timeout, TimeUnit unit)

    insert 一個元素,等待 queue 有空間才能 put 進去,等待的時間有 timeout 機制

remove 部分除了既有的 remove(), poll() 以外,還有

  • take()

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得

  • poll(long timeout, TimeUnit int)

    取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得,等待的時間有 timeout 機制

import java.util.Random;
import java.util.concurrent.BlockingQueue;

class Producer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    private int limit;

    Producer(BlockingQueue<Integer> blockingQueue, int limit) {
        this.blockingQueue = blockingQueue;
        this.limit = limit;
    }

    public void run() {
        Random random = new Random();
        for(int i = 1; i <= limit; i++) {
            try {
                // random 放入 1/2 個 integer
                int randomProducer = random.nextInt(2);
//                System.out.println("randomProducer=" + randomProducer);
                for(int j = 0; j <= randomProducer; j++) {
                    System.out.println("Producer put " + (i+j));
                    blockingQueue.put((i+j)); // to produce data
                }
                i = i+randomProducer;
                // produce data with an interval of 0.5 sec
                Thread.sleep(500);
            } catch (InterruptedException exp) {
                System.out.println("An interruption occurred at Producer");
            }
        }
    }
}

Consumer.java

import java.util.concurrent.BlockingQueue;

class Consumer extends Thread {
    protected BlockingQueue<Integer> blockingQueue;
    Consumer(BlockingQueue<Integer> blockingQueue) { // constructor
        this.blockingQueue = blockingQueue;
    }
    public void run() { // overriding run method
        try {
            while (true) {
                Integer elem = blockingQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        }
        // to handle exception
        catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

CPTest.java

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class CPTest {
    public static void main(String[] args) throws InterruptedException {
        // create an object of BlockingQueue
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(blockingQueue, 20);
        Consumer threadCon = new Consumer(blockingQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

執行結果

Producer put 1
Producer put 2
Consumer take 1
Consumer take 2
Producer put 3
Consumer take 3
Producer put 4
Consumer take 4
Producer put 5
Consumer take 5

Transfer Queue

extends BlockingQueue 介面,並套用 producer-consumer pattern,可控制 producer 到 consumer 資料流動的速度。

Transfer Queue 的實作包含了 LinkedTrasferQueue。

Producer.java

import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;

class Producer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    private int limit;

    Producer(TransferQueue<Integer> transferQueue, int limit) {
        this.transferQueue = transferQueue;
        this.limit = limit;
    }

    public void run() {
        for(int i = 1; i <= limit; i++) {
            try {
                System.out.println("Producer put " + i);
                boolean added = transferQueue.tryTransfer(i, 4000, TimeUnit.MILLISECONDS);
                if( !added ) {
                    i = i-1;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Consumer.java

import java.util.concurrent.TransferQueue;

class Consumer extends Thread {
    protected TransferQueue<Integer> transferQueue;
    Consumer(TransferQueue<Integer> transferQueue) { // constructor
        this.transferQueue = transferQueue;
    }
    public void run() {
        try {
            while (true) {
                Integer elem = transferQueue.take(); // to consume data
                System.out.println("Consumer take " + elem);
            }
        } catch (InterruptedException exp) {
            System.out.println("An interruption occurred at Consumer");
        }
    }
}

TransferQueueTest.java

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueueTest {
    public static void main(String[] args) throws InterruptedException {
        TransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();

        // passing object of BlockingQueue as arguments
        Producer threadProd = new Producer(transferQueue, 5);
        Consumer threadCon = new Consumer(transferQueue);

        // to start the process
        threadProd.start();
        threadCon.start();

        // to exit the process after 5 sec
        Thread.sleep(2000);
        System.exit(0);
    }
}

Deque

Deque 是 Double-Ended Queue 的縮寫,也就是雙向的 Queue,頭尾都可以 insert/get 資料

Deque 的實作包含了 ArrayDeque。

Operation Method Method throwing Exception
Insertion from Head offerFirst(e) addFirst(e)
Removal from Head pollFirst() removeFirst()
Retrieval from Head peekFirst() getFirst()
Insertion from Tail offerLast(e) addLast(e)
Removal from Tail pollLast() removeLast()
Retrieval from Tail peekLast() getLast()

測試程式

    @Test
    public void deque_test() {
        // Deque as Stack
        Deque<String> stack = new ArrayDeque<>();
        stack.push("one");
        stack.push("two");
        assertEquals("two", stack.getFirst());
        assertEquals("two", stack.pop());
        stack.pop();
        NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> {
            stack.pop();
        });

        // Deque as Queue
        Deque<String> queue = new ArrayDeque<>();
        queue.offer("one");
        queue.offer("two");
        assertEquals("two", queue.getLast());
        assertEquals("one", queue.poll());
        queue.poll();
        assertNull(queue.poll());
    }

Priority Queue

新的元素要加入 PriorityQueue 時,會立刻以 natural order 或是已經定義的 Comparator 排序

    @Test
    public void priority_queue_test() {
        PriorityQueue<String> integerQueue = new PriorityQueue<>();

        integerQueue.add("one");
        integerQueue.add("two");
        integerQueue.add("three");

        String first = integerQueue.poll();
        String second = integerQueue.poll();
        String third = integerQueue.poll();

        assertEquals("one", first);
        assertEquals("three", second);
        assertEquals("two", third);
    }

Reference

Guide to the Java Queue Interface | Baeldung

Java Queue – Queue in Java | DigitalOcean