JavaSE: Producers&Consumers By Using BlockingQueue_JAVA_编程开发_程序员俱乐部

中国优秀的程序员网站程序员频道CXYCLUB技术地图
热搜:
更多>>
 
您所在的位置: 程序员俱乐部 > 编程开发 > JAVA > JavaSE: Producers&Consumers By Using BlockingQueue

JavaSE: Producers&Consumers By Using BlockingQueue

 2014/5/27 12:15:24  DavyJones2010  程序员俱乐部  我要评论(0)
  • 摘要:1)BlockingQueueIntroAqueueisadatastructurewithtwofundamentaloperations:toaddanelementtothetailofthequeueandremoveanelementfromthehead.Thatis,thequeueisFIFO(FirstInFirstOut)displine
  • 标签:Java ASE

1) BlockingQueue Intro

? ? ?A queue is a data structure with two fundamental operations: to add an element to the tail of the queue and remove an element from the head.

? ? ?That is, the queue is FIFO(First In First Out) displine.

???? A blocking queue is a queue that blocks when you try to dequeue from it and the queue is empty, or when you try to enqueue to it and the queue is full.?

?

2) Simple ArrayBlockQueue Demo

class="java" name="code">package edu.xmu.thread;

import java.util.ArrayList;
import java.util.List;

public class MyArrayBlockingQueue<E> {
	private List<E> queue;
	private int capacity;

	public MyArrayBlockingQueue(int capacity) {
		queue = new ArrayList<E>(capacity);
		this.capacity = capacity;
	}

	public synchronized void enqueue(E item) throws InterruptedException {
		System.out.println(String.format(
				"Thread: [%s] attempts to enqueue. Current queue size: [%d]",
				Thread.currentThread(), queue.size()));
		while (queue.size() == capacity) {
			System.out.println(String.format(
					"Thread [%s] is waiting for enqueue.",
					Thread.currentThread()));
			wait(); //When current thread is notified, and jumped out of while loop, it still have to attempt to acquire the lock again 
		}
		if (queue.isEmpty()) {
			notifyAll();
		}
		queue.add(item);
		System.out.println(String.format("Thread [%s] executed enqueue.",
				Thread.currentThread()));
	}

	public synchronized E dequeue() throws InterruptedException {
		System.out.println(String.format(
				"Thread: [%s] attempts to dequeue. Current queue size: [%d]",
				Thread.currentThread(), queue.size()));

		while (queue.size() == 0) {
			System.out.println(String.format(
					"Thread [%s] is waiting for dequeue.",
					Thread.currentThread()));
			wait();
		}

		if (queue.size() == capacity) {
			notifyAll();
		}

		E item = queue.remove(0);
		System.out.println(String.format("Thread [%s] executed dequeue.",
				Thread.currentThread()));
		return item;
	}
}

?

3) Producer & Consumer Using BlockingQueue

package edu.xmu.thread;

public class BlockingQueueTest {
	private static final int INITIAL_CAPACITY = 10;

	public static void main(String[] args) {
		MyArrayBlockingQueue<Food> foodQueue = new MyArrayBlockingQueue<>(
				INITIAL_CAPACITY);
		Thread producer1 = new Thread(new FoodProducer(foodQueue));
		Thread producer2 = new Thread(new FoodProducer(foodQueue));
		Thread producer3 = new Thread(new FoodProducer(foodQueue));

		Thread consumer1 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer2 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer3 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer4 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer5 = new Thread(new FoodConsumer(foodQueue));
		Thread consumer6 = new Thread(new FoodConsumer(foodQueue));

		producer1.start();
		producer2.start();
		producer3.start();

		consumer1.start();
		consumer2.start();
		consumer3.start();
		consumer4.start();
		consumer5.start();
		consumer6.start();
	}
}

class Food {
}

class FoodProducer implements Runnable {
	MyArrayBlockingQueue<Food> foodQueue;

	public FoodProducer(MyArrayBlockingQueue<Food> foodQueue) {
		super();
		this.foodQueue = foodQueue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep((long) (Math.random() * 1000));
				Food food = new Food();
				foodQueue.enqueue(food);
				System.out.println(String.format(
						"Thread: [%s] produces food: [%s]",
						Thread.currentThread(), food));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

class FoodConsumer implements Runnable {
	MyArrayBlockingQueue<Food> foodQueue;

	public FoodConsumer(MyArrayBlockingQueue<Food> foodQueue) {
		super();
		this.foodQueue = foodQueue;
	}

	@Override
	public void run() {
		while (true) {
			try {
				Thread.sleep(1000L);
				Food food = foodQueue.dequeue();
				Thread.sleep(1000L);
				System.out.println(String.format(
						"Thread: [%s] consumes food: [%s]",
						Thread.currentThread(), food));
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

?

4) BlockingQueue provided by Java:

? ? 1> ArrayBlockingQueue

? ? 2> LinkedBlockingQueue

? ? 3> PriorityBlockingQueue

? ? Operations: Will dig into the realization of these BlockingQueues.



?

Reference Links:

1) http://stackoverflow.com/questions/16760513/how-does-wait-get-the-lock-back-in-java

2) http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

3) Core Java Volume II

  • 大小: 55.4 KB
  • 查看图片附件
发表评论
用户名: 匿名