原理:
生产者-消费者(producer-consumer)是共享一个公共的固定大小的 缓冲区。其中一个是生产者,用于将消息放入缓冲区;另外一个是消费者,用于从缓冲区中取出消息。问题出现在当缓冲区已经满了,而此时生产者还想向其中放入 一个新的数据项的情形,其解决方法是让生产者此时进行休眠,等待消费者从缓冲区中取走了一个或者多个数据后再去唤醒它。同样地,当缓冲区已经空了,而消费 者还想去取消息,此时也可以让消费者进行休眠,等待生产者放入一个或者多个数据时再唤醒它。
?
??? 公共资源
class="java" name="code">package test.thread.producer; import java.util.LinkedList; public class ShareResource { private LinkedList share = new LinkedList<>(); private int capacity = 20; public synchronized void addResource(Object resource) { if (isFull()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } share.add(resource); notify(); } public synchronized Object get() { if (isEmpty()) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } notify(); return share.poll(); } public boolean isFull() { return share.size() == capacity; } public boolean isEmpty() { return share.isEmpty(); } }
?
?? //生产者
?
package test.thread.producer; import org.apache.log4j.Logger; public class Producer implements Runnable { private Logger log = Logger.getLogger(getClass()); private ShareResource sr; public Producer(ShareResource sr) { this.sr = sr; } public void product() { sr.addResource("添加任务"); log.debug("添加任务"); } @Override public void run() { while(true) { product(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
?
? //消费者
package test.thread.producer;
import org.apache.log4j.Logger;
public class Customer implements Runnable {
private ShareResource sr;
private Logger log = Logger.getLogger(getClass());
public Customer(ShareResource sr) {
this.sr = sr;
}
public void cust() {
sr.get();
log.debug("消费任务");
}
@Override
public void run() {
while(true) {
try {
cust();
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
??
package test.thread.producer;
import org.junit.Test;
public class TestCustomer {
@Test
public void test() {
ShareResource sr = new ShareResource();
new Thread(new Producer(sr),"生产者线程1").start();
new Thread(new Producer(sr),"生产者线程2").start();
new Thread(new Customer(sr),"消费者线程1").start();
new Thread(new Customer(sr),"消费者线程2").start();
new Thread(new Customer(sr),"消费者线程3").start();
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
?
?
?