class="java" name="code"> //一个信号量,只有在池中还拥有许可时才允许线程继续执行。 //先看构造函数 //默认是非公平模式 public Semaphore(int permits) { sync = new NonfairSync(permits); } NonfairSync(int permits) { super(permits); } //设置状态 Sync(int permits) { setState(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //获取一个许可 public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) //没有获得锁阻塞调用AQS里面的方法阻塞 doAcquireSharedInterruptibly(arg); } final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; //如果remaining<0说明没有许可了。或者有许可直接CAS尝试设置state的值 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } //获取指定数量的许可 public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //释放许可 public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //CAS设置当前许可数量 if (compareAndSetState(current, next)) return true; } } //释放指定数量的许可 public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); } //获取许可 public void acquireUninterruptibly() { sync.acquireShared(1); } //不阻塞的直接获取许可,获取失败则直接返回false public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } //在一定时间内去尝试获取锁超时返回false public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } //返回次信号量的可用许可数 public int availablePermits() { return sync.getPermits(); } //获取可用的所有许可并清空许可 public int drainPermits() { return sync.drainPermits(); } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } //缩减当前的可用许可量 protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } //是否是公平模式 public boolean isFair() { return sync instanceof FairSync; } //查看是否有线程等待获取 public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final boolean hasQueuedThreads() { return head != tail; } //获取正在等待许可的线程数量 public final int getQueueLength() { return sync.getQueueLength(); } public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n; } //将可能正在等待的线程封装到集合中 protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); } public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list; }