class="java" name="code"> //先看构造函数 //初始化一个给定容量的ArrayBlockingQueue public ArrayBlockingQueue(int capacity) { this(capacity, false); } //通过给定的容量初始化内部的数组和锁以及条件。 public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); //初始化一个给定容量的数组 this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); } //通过给定的集合初始化数组超出大小会抛出异常这里为什么要上锁?源码注释写的是这里不会有互斥,只是为了保证可见性。防止指令重排? public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } //将数量设置为i count = i; //设置putIndex的位置如果数组已满的话设置为0 putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } //add方法 调用offer方法新增元素如果成功返回true失败抛出异常 public boolean add(E e) { return super.add(e); } public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { //不能加入null元素 checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { //如果数组已经满了 if (count == items.length) return false; else { insert(e); return true; } } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x; //设置putIndex位置 putIndex = inc(putIndex); //总数+1 ++count; //因为数组已经非空所以唤醒notEmpty条件上的等待队列 notEmpty.signal(); } final int inc(int i) { //队列已满put设置为0否则设置为i+1 return (++i == items.length) ? 0 : i; } //poll方法没有元素获取null public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //如果当前没有元素了返回null。 return (count == 0) ? null : extract(); } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; //获取E的实际类型 E x = this.<E>cast(items[takeIndex]); //将当前位置的值设置为null防止内存溢出。 items[takeIndex] = null; //将takeIndex加1如果数组已满设置为0 takeIndex = inc(takeIndex); //数组元素数量减少一个 --count; //当前数组已经是非满状态所以唤醒在notFull条件上等待的队列。 notFull.signal(); return x; } @SuppressWarnings("unchecked") static <E> E cast(Object item) { return (E) item; } //poll(long timeout, TimeUnit unit)在该时间内一直尝试去获取元素超时返回null。 /**这个方法实现的很精妙,直接在notEmpty条件上挂起当前线程,如果该条件被唤醒, 他就去获取一次值。 */ public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) { if (nanos <= 0) return null; //在notEmpty条件上等待nanos时间 nanos = notEmpty.awaitNanos(nanos); } return extract(); } finally { lock.unlock(); } } //offer(E e, long timeout, TimeUnit unit)在指定时间内插入元素超时返回false public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; //如果当前队列是满的那么在notFull条件上等待唤醒 nanos = notFull.awaitNanos(nanos); } insert(e); return true; } finally { lock.unlock(); } } //remove方法成功返回true失败返回false public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { //从takeIndex开始遍历查找o for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) { removeAt(i); return true; } } return false; } finally { lock.unlock(); } } //如果删除的是头元素直接删除takeIndex+1,否则滑动整个数组,重新构建整个数组。 void removeAt(int i) { final Object[] items = this.items; //如果移除正好位于takeindex上。 if (i == takeIndex) { items[takeIndex] = null; takeIndex = inc(takeIndex); } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i); //移动整个元素列表 if (nexti != putIndex) { items[i] = items[nexti]; i = nexti; } else { items[i] = null; //将putindex往前移 putIndex = i; break; } } } --count; //当前队列已经非满唤醒在非满条件上等待的线程 notFull.signal(); } //peek:获取头元素如果没有获取到则返回null。仅仅是获取不移除。 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex); } finally { lock.unlock(); } } final E itemAt(int i) { return this.<E>cast(items[i]); } //put()如果队列已满。则阻塞。直到notFull条件被唤醒。 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } } //take()如果队列为空则阻塞.直到notEmpty条件被唤醒。 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return extract(); } finally { lock.unlock(); } } //remainingCapacity()返回当前还能够容下的元素个数 public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); } } //contains返回是否包含当前元素 public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) if (o.equals(items[i])) return true; return false; } finally { lock.unlock(); } } //返回一个包含当前元素的新数组Object类型 public Object[] toArray() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { final int count = this.count; Object[] a = new Object[count]; for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) a[k] = items[i]; return a; } finally { lock.unlock(); } } //返回指定类型的新数组 public <T> T[] toArray(T[] a) { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { final int count = this.count; final int len = a.length; //如果传入的数组长度<当前的元素个数就重新创建一个count长度的数组 if (len < count) a = (T[])java.lang.reflect.Array.newInstance( a.getClass().getComponentType(), count); for (int i = takeIndex, k = 0; k < count; i = inc(i), k++) a[k] = (T) items[i]; //如果长度>count if (len > count) a[count] = null; return a; } finally { lock.unlock(); } } //移除此队列中的所有元素 public void clear() { final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) items[i] = null; //将这几个变量设置为初始值。 count = 0; putIndex = 0; takeIndex = 0; notFull.signalAll(); } finally { lock.unlock(); } } //返回迭代器一个内部类实现这里就不写了 public Iterator<E> iterator() { return new Itr(); } //移除队列中的所有元素并封装到collection中 public int drainTo(Collection<? super E> c) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; int max = count; while (n < max) { c.add(this.<E>cast(items[i])); //帮助垃圾回收器回收,防止内存泄漏 items[i] = null; i = inc(i); ++n; } if (n > 0) { //将当前集合置为初始化状态 count = 0; putIndex = 0; takeIndex = 0; //唤醒非满条件上的等待。 notFull.signalAll(); } return n; } finally { lock.unlock(); } } //从此队列中最多移出maxElements个元素。 public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int i = takeIndex; int n = 0; //如果maxElements<count就移出maxElements个否则全部移出。 int max = (maxElements < count) ? maxElements : count; while (n < max) { c.add(this.<E>cast(items[i])); items[i] = null; i = inc(i); ++n; } if (n > 0) { count -= n; takeIndex = i; notFull.signalAll(); } return n; } finally { lock.unlock(); } } /** 总结:ArrayBlockingQueue低层是用数组+锁实现。是一种可以用于生产者消费者模式的可阻塞的队列。但是插入和删除这些操作都是用的一把锁。这可能会导致效率不高? */