原理:
生产者-消费者(producer-consumer)是共享一个公共的固定大小的 缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入 一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费 者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
?
??? 公共资源
class="java" name="code">package test.thread.producer; import java.util.LinkedList; public class ShareResource { private LinkedList share = new LinkedList<>(); private int capacity = 20; public synchronized void addResource(Object resource) { if (isFull()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } share.add(resource); notify(); } public synchronized Object get() { if (isEmpty()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); return share.poll(); } public boolean isFull() { return share.size() == capacity; } public boolean isEmpty() { return share.isEmpty(); } }
?
?? //生产者
?
package test.thread.producer; import org.apache.log4j.Logger; public class Producer implements Runnable { private Logger log = Logger.getLogger(getClass()); private ShareResource sr; public Producer(ShareResource sr) { this.sr = sr; } public void product() { sr.addResource("添加任务"); log.debug("添加任务"); } @Override public void run() { while(true) { product(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
?
? //消费者
package test.thread.producer; import org.apache.log4j.Logger; public class Customer implements Runnable { private ShareResource sr; private Logger log = Logger.getLogger(getClass()); public Customer(ShareResource sr) { this.sr = sr; } public void cust() { sr.get(); log.debug("消费任务"); } @Override public void run() { while(true) { try { cust(); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
??
package test.thread.producer; import org.junit.Test; public class TestCustomer { @Test public void test() { ShareResource sr = new ShareResource(); new Thread(new Producer(sr),"生产者线程1").start(); new Thread(new Producer(sr),"生产者线程2").start(); new Thread(new Customer(sr),"消费者线程1").start(); new Thread(new Customer(sr),"消费者线程2").start(); new Thread(new Customer(sr),"消费者线程3").start(); try { Thread.sleep(1000000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
?
?
?