1) BlockingQueue Intro
? ? ?A queue is a data structure with two fundamental operations: to add an element to the tail of the queue and remove an element from the head.
? ? ?That is, the queue is FIFO(First In First Out) displine.
???? A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or when you try to enqueue to it and the queue is full.?
?
2) Simple ArrayBlockQueue Demo
class="java" name="code">package edu.xmu.thread; import java.util.ArrayList; import java.util.List; public class MyArrayBlockingQueue<E> { private List<E> queue; private int capacity; public MyArrayBlockingQueue(int capacity) { queue = new ArrayList<E>(capacity); this.capacity = capacity; } public synchronized void enqueue(E item) throws InterruptedException { System.out.println(String.format( "Thread: [%s] attempts to enqueue. Current queue size: [%d]", Thread.currentThread(), queue.size())); while (queue.size() == capacity) { System.out.println(String.format( "Thread [%s] is waiting for enqueue.", Thread.currentThread())); wait(); //When current thread is notified, and jumped out of while loop, it still have to attempt to acquire the lock again } if (queue.isEmpty()) { notifyAll(); } queue.add(item); System.out.println(String.format("Thread [%s] executed enqueue.", Thread.currentThread())); } public synchronized E dequeue() throws InterruptedException { System.out.println(String.format( "Thread: [%s] attempts to dequeue. Current queue size: [%d]", Thread.currentThread(), queue.size())); while (queue.size() == 0) { System.out.println(String.format( "Thread [%s] is waiting for dequeue.", Thread.currentThread())); wait(); } if (queue.size() == capacity) { notifyAll(); } E item = queue.remove(0); System.out.println(String.format("Thread [%s] executed dequeue.", Thread.currentThread())); return item; } }
?
3) Producer & Consumer Using BlockingQueue
package edu.xmu.thread; public class BlockingQueueTest { private static final int INITIAL_CAPACITY = 10; public static void main(String[] args) { MyArrayBlockingQueue<Food> foodQueue = new MyArrayBlockingQueue<>( INITIAL_CAPACITY); Thread producer1 = new Thread(new FoodProducer(foodQueue)); Thread producer2 = new Thread(new FoodProducer(foodQueue)); Thread producer3 = new Thread(new FoodProducer(foodQueue)); Thread consumer1 = new Thread(new FoodConsumer(foodQueue)); Thread consumer2 = new Thread(new FoodConsumer(foodQueue)); Thread consumer3 = new Thread(new FoodConsumer(foodQueue)); Thread consumer4 = new Thread(new FoodConsumer(foodQueue)); Thread consumer5 = new Thread(new FoodConsumer(foodQueue)); Thread consumer6 = new Thread(new FoodConsumer(foodQueue)); producer1.start(); producer2.start(); producer3.start(); consumer1.start(); consumer2.start(); consumer3.start(); consumer4.start(); consumer5.start(); consumer6.start(); } } class Food { } class FoodProducer implements Runnable { MyArrayBlockingQueue<Food> foodQueue; public FoodProducer(MyArrayBlockingQueue<Food> foodQueue) { super(); this.foodQueue = foodQueue; } @Override public void run() { while (true) { try { Thread.sleep((long) (Math.random() * 1000)); Food food = new Food(); foodQueue.enqueue(food); System.out.println(String.format( "Thread: [%s] produces food: [%s]", Thread.currentThread(), food)); } catch (InterruptedException e) { e.printStackTrace(); } } } } class FoodConsumer implements Runnable { MyArrayBlockingQueue<Food> foodQueue; public FoodConsumer(MyArrayBlockingQueue<Food> foodQueue) { super(); this.foodQueue = foodQueue; } @Override public void run() { while (true) { try { Thread.sleep(1000L); Food food = foodQueue.dequeue(); Thread.sleep(1000L); System.out.println(String.format( "Thread: [%s] consumes food: [%s]", Thread.currentThread(), food)); } catch (InterruptedException e) { e.printStackTrace(); } } } }
?
4) BlockingQueue provided by Java:
? ? 1> ArrayBlockingQueue
? ? 2> LinkedBlockingQueue
? ? 3> PriorityBlockingQueue
? ? Operations: Will dig into the realization of these BlockingQueues.
?
Reference Links:
1) http://stackoverflow.com/questions/16760513/how-does-wait-get-the-lock-back-in-java
2) http://tutorials.jenkov.com/java-concurrency/blocking-queues.html
3) Core Java Volume II