class="java"> //一个基于而为堆的优先级队列,它是无界的。 //先看构造函数: public PriorityBlockingQueue() { this(DEFAULT_INITIAL_CAPACITY, null); } public PriorityBlockingQueue(int initialCapacity, Comparator<? super E> comparator) { if (initialCapacity < 1) throw new IllegalArgumentException(); this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); this.comparator = comparator; this.queue = new Object[initialCapacity]; } public PriorityBlockingQueue(int initialCapacity) { this(initialCapacity, null); } public PriorityBlockingQueue(Collection<? extends E> c) { this.lock = new ReentrantLock(); this.notEmpty = lock.newCondition(); //是否需要堆化 boolean heapify = true; // true if not known to be in heap order //是否需要检测null boolean screen = true; // true if must screen for nulls if (c instanceof SortedSet<?>) { SortedSet<? extends E> ss = (SortedSet<? extends E>) c; this.comparator = (Comparator<? super E>) ss.comparator(); heapify = false; } else if (c instanceof PriorityBlockingQueue<?>) { PriorityBlockingQueue<? extends E> pq = (PriorityBlockingQueue<? extends E>) c; this.comparator = (Comparator<? super E>) pq.comparator(); screen = false; if (pq.getClass() == PriorityBlockingQueue.class) // exact match heapify = false; } Object[] a = c.toArray(); int n = a.length; // If c.toArray incorrectly doesn't return Object[], copy it. if (a.getClass() != Object[].class) a = Arrays.copyOf(a, n, Object[].class); if (screen && (n == 1 || this.comparator != null)) { for (int i = 0; i < n; ++i) if (a[i] == null) throw new NullPointerException(); } this.queue = a; this.size = n; if (heapify) //生成堆 heapify(); } //添加元素 public boolean add(E e) { return offer(e); } //添加元素不阻塞 public boolean offer(E e) { if (e == null) throw new NullPointerException(); final ReentrantLock lock = this.lock; lock.lock(); int n, cap; Object[] array; //扩容 while ((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap); try { Comparator<? super E> cmp = comparator; if (cmp == null) siftUpComparable(n, e, array); else siftUpUsingComparator(n, e, array, cmp); //长度+1 size = n + 1; //当前队列已经不为空了唤醒在notEmpty上等待的线程。 notEmpty.signal(); } finally { lock.unlock(); } return true; } //扩容。这里处理的很精巧利用CAS进行扩容。 private void tryGrow(Object[] array, int oldCap) { //这里释放了锁 lock.unlock(); // must release and then re-acquire main lock Object[] newArray = null; /**利用CAS设置allocationSpinLock状态成功则扩容。注意看这段代码虽然采用CAS机制但是还是有并发问题. 关键在于下面的一句判断&& queue == array关键就在这里,这里判断了相等才会复制 */ if (allocationSpinLock == 0 && UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)) { try { int newCap = oldCap + ((oldCap < 64) ? (oldCap + 2) : // grow faster if small (oldCap >> 1)); if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow int minCap = oldCap + 1; if (minCap < 0 || minCap > MAX_ARRAY_SIZE) throw new OutOfMemoryError(); newCap = MAX_ARRAY_SIZE; } if (newCap > oldCap && queue == array) newArray = new Object[newCap]; } finally { //扩容完后置为0 allocationSpinLock = 0; } } //其他线程扩容了交给其他线程执行 if (newArray == null) // back off if another thread is allocating Thread.yield(); lock.lock(); // queue == array这个判断很重要这个判断判断了只有一个线程扩容成功,其他扩容的线程都失败。 if (newArray != null && queue == array) { queue = newArray; System.arraycopy(array, 0, newArray, 0, oldCap); } } //添加元素不阻塞 public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e); // never need to block } //只获取头元素不阻塞 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (size == 0) ? null : (E) queue[0]; } finally { lock.unlock(); } } //获取头元素并移除 public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return dequeue(); } finally { lock.unlock(); } } private E dequeue() { int n = size - 1; if (n < 0) return null; else { Object[] array = queue; E result = (E) array[0]; E x = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(0, x, array, n); else siftDownUsingComparator(0, x, array, n, cmp); size = n; return result; } } //在一定时间内获取并移除某个元素 public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { //当队列中有元素并且没有超时的情况下,在notEmpty条件上等待队列变为非空。 while ( (result = dequeue()) == null && nanos > 0) nanos = notEmpty.awaitNanos(nanos); } finally { lock.unlock(); } return result; } //可以阻塞的获取元素直接队列非空。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); E result; try { while ( (result = dequeue()) == null) notEmpty.await(); } finally { lock.unlock(); } return result; } //获取队列大小 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } //无界队列所以返回最大值。 public int remainingCapacity() { return Integer.MAX_VALUE; } //返回元素的比较器 public Comparator<? super E> comparator() { return comparator; } //返回是否包含某元素 public boolean contains(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { return indexOf(o) != -1; } finally { lock.unlock(); } } private int indexOf(Object o) { if (o != null) { Object[] array = queue; int n = size; for (int i = 0; i < n; i++) if (o.equals(array[i])) return i; } return -1; } //删除某个元素 public boolean remove(Object o) { final ReentrantLock lock = this.lock; lock.lock(); try { int i = indexOf(o); if (i == -1) return false; removeAt(i); return true; } finally { lock.unlock(); } } private void removeAt(int i) { Object[] array = queue; int n = size - 1; if (n == i) // removed last element array[i] = null; else { E moved = (E) array[n]; array[n] = null; Comparator<? super E> cmp = comparator; if (cmp == null) siftDownComparable(i, moved, array, n); else siftDownUsingComparator(i, moved, array, n, cmp); if (array[i] == moved) { if (cmp == null) siftUpComparable(i, moved, array); else siftUpUsingComparator(i, moved, array, cmp); } } size = n; } //转化为数组 public Object[] toArray() { final ReentrantLock lock = this.lock; lock.lock(); try { return Arrays.copyOf(queue, size); } finally { lock.unlock(); } } //转化为带类型的数组 public <T> T[] toArray(T[] a) { final ReentrantLock lock = this.lock; lock.lock(); try { int n = size; //如果数组长度小于队列长度重新生成一个队列长度的数组 if (a.length < n) // Make a new array of a's runtime type, but my contents: return (T[]) Arrays.copyOf(queue, size, a.getClass()); System.arraycopy(queue, 0, a, 0, n); if (a.length > n) a[n] = null; return a; } finally { lock.unlock(); } } //清空队列 public void clear() { final ReentrantLock lock = this.lock; lock.lock(); try { Object[] array = queue; int n = size; size = 0; for (int i = 0; i < n; i++) array[i] = null; } finally { lock.unlock(); } } //将队列中的所有元素移除封装到集合中 public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); } public int drainTo(Collection<? super E> c, int maxElements) { if (c == null) throw new NullPointerException(); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(size, maxElements); for (int i = 0; i < n; i++) { c.add((E) queue[0]); // In this order, in case add() throws. dequeue(); } return n; } finally { lock.unlock(); } }