class="java"> //先看构造方法 public SynchronousQueue() { this(false); } //公平模式或者非公平模式 public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); } //所有的生产和消费都走的这个方法 Object transfer(Object e, boolean timed, long nanos) { SNode s = null; // constructed/reused as needed //e==null是消费者,否则为生产者。 int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; //如果头为空或者有相同的模式说明没有其他线程 if (h == null || h.mode == mode) { //说明不能等待 if (timed && nanos <= 0) { //再次判断有没有head if (h != null && h.isCancelled()) casHead(h, h.next); // pop cancelled node else return null; //如果可以等待,CAS设置当前node为head。 } else if (casHead(h, s = snode(s, e, h, mode))) { //一直自旋等到匹配 SNode m = awaitFulfill(s, timed, nanos); if (m == s) { // wait was cancelled clean(s); return null; } if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // try to fulfill if (h.isCancelled()) // already cancelled casHead(h, h.next); // pop and retry else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // loop until matched or waiters disappear SNode m = s.next; // m is s's match if (m == null) { // all waiters are gone casHead(s, null); // pop fulfill node s = null; // use new node next time break; // restart main loop } SNode mn = m.next; //尝试匹配 if (m.tryMatch(s)) { casHead(s, mn); // pop both s and m return (mode == REQUEST) ? m.item : s.item; } else // lost match s.casNext(m, mn); // help unlink } } } else { // help a fulfiller SNode m = h.next; // m is h's match if (m == null) // waiter is gone casHead(h, null); // pop fulfilling node else { SNode mn = m.next; if (m.tryMatch(h)) // help match casHead(h, mn); // pop both h and m else // lost match h.casNext(m, mn); // help unlink } } } } boolean casHead(SNode h, SNode nh) { return h == head && UNSAFE.compareAndSwapObject(this, headOffset, h, nh); } SNode awaitFulfill(SNode s, boolean timed, long nanos) { //阻塞版本timed为false所以这里是0 long lastTime = timed ? System.nanoTime() : 0; Thread w = Thread.currentThread(); SNode h = head; //是否需要旋转如果需要则自旋需要则根据timed判断自旋多少次。 //这里有一个疑问为什么要自旋而不直接挂起。我觉得是因为效率问题。 //自旋等待可以减少线程调度。 int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (;;) { if (w.isInterrupted()) s.tryCancel(); SNode m = s.match; if (m != null) return m; if (timed) { long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; if (nanos <= 0) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins-1) : 0; else if (s.waiter == null) s.waiter = w; // establish waiter so can park next iter //自旋完了就挂起 else if (!timed) LockSupport.park(this); else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } } boolean shouldSpin(SNode s) { SNode h = head; //如果是头节点或者头节点为null return (h == s || h == null || isFulfilling(h.mode)); } boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // waiters need at most one unpark waiter = null; LockSupport.unpark(w); } return true; } return match == s; } /** 总结:这个类相当于一个生产者对应一个消费者。一个人生产了就必须要有一个来消费。 相当于一对一。一个线程进来然后设置为头结点,然后自旋等待。另外一个线程进来,发现有头结点了, 说明有线程了。然后将自己设置为头结点,并将next指向刚才的头结点。然后尝试匹配。匹配成功。 判断当前线程的模式,是生产者则取头结点的值,消费者则取第二个节点的值。 */