class="java"> //一个基于二叉堆优先级的延迟取出队列。 //先看构造函数。 public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } //将collection插入到内部的PriorityQueue中。 public boolean addAll(Collection<? extends E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); boolean modified = false; for (E e : c) if (add(e)) modified = true; return modified; } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } //插入元素是插入到内部的PriorityQueue中。 public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); //如果插入的是头元素(即当前头元素变了) if (q.peek() == e) { //将leader线程置为null leader = null; //唤醒等待的线程 available.signal(); } return true; } finally { lock.unlock(); } } public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); } public void put(E e) { offer(e); } //获取元素但是并不移出 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //用底层的priorityQueue实现。 return q.peek(); } finally { lock.unlock(); } } //获取头元素如果头元素为空或者还没有到出列时间返回null。 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //获取头元素 E first = q.peek(); //如果头元素是空或者获取到的头元素还没有到出列时间返回null if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } } public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //如果没空 if (first == null) { //超时了返回null if (nanos <= 0) return null; else //挂起当前线程nanos时间 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); //到时间了可以取出 if (delay <= 0) return q.poll(); //超时了返回null if (nanos <= 0) return null; //如果等待时间小于出列时间或者leader线程!=null则挂起当前线程 if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { //没有其他leader线程则将自己置为leader线程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); nanos -= delay - timeLeft; } finally { //如果leader是自己 if (leader == thisThread) leader = null; } } } } } finally { //释放锁 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } //可阻塞的获取队列头结点 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); //没有获取到则等待 if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); //到达队列出列时间返回 if (delay <= 0) return q.poll(); //如果leader不等于null阻塞当前线程让leader线程取走当前元素 else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); //当前线程设置为leader leader = thisThread; try { //等待delay时间被唤醒 available.awaitNanos(delay); } finally { //醒来之后如果我是leader让其他线程变为leader。 if (leader == thisThread) leader = null; } } } } } finally { //如果leader为空并且有头唤醒其他线程。 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } } /**leader线程是当前正在获取头节点的线程。在offer的时候如果插入的元素变为了头结点, 此时将leader置为null,重新竞争leader。 DelayQueen内部使用的是一个PriorityQueen类实现,所以可以保证插入顺序的优先级。 然后根据实现Delayed接口的getDelay方法来获取元素是否已经到出列时间。 */ //获取队列长度 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.size(); } finally { lock.unlock(); } } //获取容量(没有容量限制所以返回integer最大值) public int remainingCapacity() { return Integer.MAX_VALUE; } //移除元素 public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.remove(o); } finally { lock.unlock(); } } public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(); } finally { lock.unlock(); } } public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { return q.toArray(a); } finally { lock.unlock(); } } //移出队列中的可用元素并将他们封装到collection中 public int drainTo(Collection<? super E> c) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; for (;;) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } } //最多移除maxElements个可用元素到collection中。 public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = 0; while (n < maxElements) { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) break; c.add(q.poll()); ++n; } return n; } finally { lock.unlock(); } }