Queue 是 java.util package 裡面的 Collection framwork 中的其中一個介面,主要是定義 First-in-First-out FIFO queue 這種介面。除了基本的 Collection 操作方法以外,Queue 還提供了專屬的 insert/get/inspect method。
throw Exception | return false/null | |
---|---|---|
insert | add(e) | offer(e) |
remove | remove() | poll() |
examine | element() | peek() |
@Test
public void queue_test1() {
Queue<String> queue = new LinkedList<>();
queue.add("one");
queue.add("two");
queue.add("three");
assertEquals("[one, two, three]", queue.toString());
queue.remove("two");
assertEquals("[one, three]", queue.toString());
String element = queue.element();
assertEquals("one", element);
assertEquals("[one, three]", queue.toString());
// To empty the queue
queue.clear();
queue.offer("one");
queue.offer("two");
queue.offer("three");
assertEquals("[one, two, three]", queue.toString());
// poll 是取得 queue 的第一個 element
String pollElement = queue.poll();
assertEquals("one", pollElement);
assertEquals("[two, three]", queue.toString());
// peek 是取得 queue 的第一個 element,但只是偷看,不會從 queue 移除該 element
String peakElement = queue.peek();
assertEquals("two", peakElement);
assertEquals("[two, three]", queue.toString());
}
@Test
public void queue_test2() {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
queue.add("one");
queue.add("two");
// offer 在 insert 超過 Queue 容量時,會產生 exception
IllegalStateException exception = assertThrows(IllegalStateException.class, () -> {
queue.add("three");
});
// offer 在 insert 超過 Queue 容量時,不會產生 exception
// 只是回傳一個 true/false flag 代表這個 insert 有沒有成功
queue.clear();
queue.offer("one");
assertTrue( queue.offer("two") );
assertFalse( queue.offer("three") );
assertEquals("[one, two]", queue.toString());
queue.clear();
// remove, element 在 Queue 沒有任何資料時,會產生 exception
NoSuchElementException exception2 = assertThrows(NoSuchElementException.class, () -> {
queue.remove();
});
NoSuchElementException exception3 = assertThrows(NoSuchElementException.class, () -> {
queue.element();
});
// poll, peek 會在 Queue 為空的時候,回傳 null
assertNull(queue.poll());
assertNull(queue.peek());
}
sub interfaces
Queue 有三個主要的子介面: Blocking Queue, Transfer Queue, Deque
Blocking Queue
增加 methods,可強制 threads 等待 queue,例如在取得 queue 的元素時,可一直等待 queue 裡面有元素才回傳。或是可以等待 queue 清空後,再新增元素時。
Blocking Queue 的實作包含了 LinkedBlockingQueue, SynchronousQueue 及ArrayBlockingQueue
除了既有的 add(), offer() 以外,另外還有
put()
insert 一個元素,等待 queue 有空間才能 put 進去
offer(E e, long timeout, TimeUnit unit)
insert 一個元素,等待 queue 有空間才能 put 進去,等待的時間有 timeout 機制
remove 部分除了既有的 remove(), poll() 以外,還有
take()
取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得
poll(long timeout, TimeUnit int)
取得第一個元素,當 queue 為空的時候,會 blocking thread,等待 queue 有元素可以取得,等待的時間有 timeout 機制
import java.util.Random;
import java.util.concurrent.BlockingQueue;
class Producer extends Thread {
protected BlockingQueue<Integer> blockingQueue;
private int limit;
Producer(BlockingQueue<Integer> blockingQueue, int limit) {
this.blockingQueue = blockingQueue;
this.limit = limit;
}
public void run() {
Random random = new Random();
for(int i = 1; i <= limit; i++) {
try {
// random 放入 1/2 個 integer
int randomProducer = random.nextInt(2);
// System.out.println("randomProducer=" + randomProducer);
for(int j = 0; j <= randomProducer; j++) {
System.out.println("Producer put " + (i+j));
blockingQueue.put((i+j)); // to produce data
}
i = i+randomProducer;
// produce data with an interval of 0.5 sec
Thread.sleep(500);
} catch (InterruptedException exp) {
System.out.println("An interruption occurred at Producer");
}
}
}
}
Consumer.java
import java.util.concurrent.BlockingQueue;
class Consumer extends Thread {
protected BlockingQueue<Integer> blockingQueue;
Consumer(BlockingQueue<Integer> blockingQueue) { // constructor
this.blockingQueue = blockingQueue;
}
public void run() { // overriding run method
try {
while (true) {
Integer elem = blockingQueue.take(); // to consume data
System.out.println("Consumer take " + elem);
}
}
// to handle exception
catch (InterruptedException exp) {
System.out.println("An interruption occurred at Consumer");
}
}
}
CPTest.java
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class CPTest {
public static void main(String[] args) throws InterruptedException {
// create an object of BlockingQueue
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(5);
// passing object of BlockingQueue as arguments
Producer threadProd = new Producer(blockingQueue, 20);
Consumer threadCon = new Consumer(blockingQueue);
// to start the process
threadProd.start();
threadCon.start();
// to exit the process after 5 sec
Thread.sleep(2000);
System.exit(0);
}
}
執行結果
Producer put 1
Producer put 2
Consumer take 1
Consumer take 2
Producer put 3
Consumer take 3
Producer put 4
Consumer take 4
Producer put 5
Consumer take 5
Transfer Queue
extends BlockingQueue 介面,並套用 producer-consumer pattern,可控制 producer 到 consumer 資料流動的速度。
Transfer Queue 的實作包含了 LinkedTrasferQueue。
Producer.java
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
class Producer extends Thread {
protected TransferQueue<Integer> transferQueue;
private int limit;
Producer(TransferQueue<Integer> transferQueue, int limit) {
this.transferQueue = transferQueue;
this.limit = limit;
}
public void run() {
for(int i = 1; i <= limit; i++) {
try {
System.out.println("Producer put " + i);
boolean added = transferQueue.tryTransfer(i, 4000, TimeUnit.MILLISECONDS);
if( !added ) {
i = i-1;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer.java
import java.util.concurrent.TransferQueue;
class Consumer extends Thread {
protected TransferQueue<Integer> transferQueue;
Consumer(TransferQueue<Integer> transferQueue) { // constructor
this.transferQueue = transferQueue;
}
public void run() {
try {
while (true) {
Integer elem = transferQueue.take(); // to consume data
System.out.println("Consumer take " + elem);
}
} catch (InterruptedException exp) {
System.out.println("An interruption occurred at Consumer");
}
}
}
TransferQueueTest.java
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;
public class TransferQueueTest {
public static void main(String[] args) throws InterruptedException {
TransferQueue<Integer> transferQueue = new LinkedTransferQueue<>();
// passing object of BlockingQueue as arguments
Producer threadProd = new Producer(transferQueue, 5);
Consumer threadCon = new Consumer(transferQueue);
// to start the process
threadProd.start();
threadCon.start();
// to exit the process after 5 sec
Thread.sleep(2000);
System.exit(0);
}
}
Deque
Deque 是 Double-Ended Queue 的縮寫,也就是雙向的 Queue,頭尾都可以 insert/get 資料
Deque 的實作包含了 ArrayDeque。
Operation | Method | Method throwing Exception |
---|---|---|
Insertion from Head | offerFirst(e) | addFirst(e) |
Removal from Head | pollFirst() | removeFirst() |
Retrieval from Head | peekFirst() | getFirst() |
Insertion from Tail | offerLast(e) | addLast(e) |
Removal from Tail | pollLast() | removeLast() |
Retrieval from Tail | peekLast() | getLast() |
測試程式
@Test
public void deque_test() {
// Deque as Stack
Deque<String> stack = new ArrayDeque<>();
stack.push("one");
stack.push("two");
assertEquals("two", stack.getFirst());
assertEquals("two", stack.pop());
stack.pop();
NoSuchElementException exception = assertThrows(NoSuchElementException.class, () -> {
stack.pop();
});
// Deque as Queue
Deque<String> queue = new ArrayDeque<>();
queue.offer("one");
queue.offer("two");
assertEquals("two", queue.getLast());
assertEquals("one", queue.poll());
queue.poll();
assertNull(queue.poll());
}
Priority Queue
新的元素要加入 PriorityQueue 時,會立刻以 natural order 或是已經定義的 Comparator 排序
@Test
public void priority_queue_test() {
PriorityQueue<String> integerQueue = new PriorityQueue<>();
integerQueue.add("one");
integerQueue.add("two");
integerQueue.add("three");
String first = integerQueue.poll();
String second = integerQueue.poll();
String third = integerQueue.poll();
assertEquals("one", first);
assertEquals("three", second);
assertEquals("two", third);
}