可以看到,原来是 A,B,C 对象引用了 N,这里会在第一次遍历的时候把这种引用反过来,让 N 的对象头部保存下 A 的地址,表示这类引用,然后在遍历到 B 的时候在链起来,到最后就会把所有引用了 N 对象的所有对象通过引线链起来,在第二次遍历的时候就把更新A,B,C 对象引用的 N 地址,并且移动 N 对象
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException();
// 判断下是否在阻塞队列中,因为有可能被其他节点从等待队列移动到阻塞队列 while (!isOnSyncQueue(node)) { // park等待,等待被唤醒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
// 被唤醒后进入阻塞队列,等待获取锁,这里继续用了fullyRelease返回的 state if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters() { Nodet= firstWaiter; Nodetrail=null; // 循环遍历单向链表的节点,如果状态不是 CONDITION 就清出去 while (t != null) { Nodenext= t.nextWaiter; // 循环链表操作,清掉取消的节点 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node) { // 如果waitStatus 是 CONDITION 或者没有 prev 前置节点肯定就不在 if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; // 这里就是我前面提到的 next 的作用 if (node.next != null) // If has successor, it must be on queue returntrue; // 从 tail 开始找,是否在阻塞队列中 /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ privatebooleanfindNodeFromTail(Node node) { Nodet= tail; // 从 tail 开始,从后往前找 for (;;) { if (t == node) returntrue; if (t == null) returnfalse; t = t.prev; } }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); // firstWaiter 肯定是最早开始等待的 Nodefirst= firstWaiter; // 如果不为空就唤醒 if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { // 因为要去唤醒 first 节点了,firstWaiter 需要再从后面找一个 // 并且判断是否为空,如果是空的话就直接可以把 lastWaiter 设置成空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // first 不需要继续保存后面的 waiter 了,因为 firstWaiter 已经是 first 的后置节点了 first.nextWaiter = null; // 如果 first 节点转移不成功,并且 firstWaiter 节点不为空,则继续进入循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 如果状态已经不是 CONDITION 就不会设置成功,返回 false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 调用跟aqs 第一篇中一样的 enq 方法进入阻塞队列,返回入队后的前一节点 Nodep= enq(node); intws= p.waitStatus; // 将前置节点状态设置成SIGNAL,表示后面有节点在等了 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 返回 true,上一个方法的循环就退出了 returntrue; }
/* File: BoundedBuffer.java Originally written by Doug Lea and released into the public domain. This may be used for any purposes whatsoever without acknowledgment. Thanks for the assistance and support of Sun Microsystems Labs, and everyone contributing, testing, and using this code. History: Date Who What 11Jun1998 dl Create public version 17Jul1998 dl Simplified by eliminating wait counts 25aug1998 dl added peek 5May1999 dl replace % with conditional (slightly faster) */
package EDU.oswego.cs.dl.util.concurrent;
/** * Efficient array-based bounded buffer class. * Adapted from CPJ, chapter 8, which describes design. * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> **/
/** * Helper monitor to handle puts. **/ protectedfinalObjectputMonitor_=newObject();
/** * Create a BoundedBuffer with the given capacity. * @exception IllegalArgumentException if capacity less or equal to zero **/ publicBoundedBuffer(int capacity)throws IllegalArgumentException { if (capacity <= 0) thrownewIllegalArgumentException(); array_ = newObject[capacity]; emptySlots_ = capacity; }
/** * Create a buffer with the current default capacity **/
/** * Return the number of elements in the buffer. * This is only a snapshot value, that may change * immediately after returning. **/ publicsynchronizedintsize() { return usedSlots_; }
publicbooleanoffer(Object x, long msecs)throws InterruptedException { if (x == null) thrownewIllegalArgumentException(); if (Thread.interrupted()) thrownewInterruptedException();