这个真的长见识了, 可以看到,原来是 A,B,C 对象引用了 N,这里会在第一次遍历的时候把这种引用反过来,让 N 的对象头部保存下 A 的地址,表示这类引用,然后在遍历到 B 的时候在链起来,到最后就会把所有引用了 N 对象的所有对象通过引线链起来,在第二次遍历的时候就把更新A,B,C 对象引用的 N 地址,并且移动 N 对象
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException();
// 判断下是否在阻塞队列中,因为有可能被其他节点从等待队列移动到阻塞队列 while (!isOnSyncQueue(node)) { // park等待,等待被唤醒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
// 被唤醒后进入阻塞队列,等待获取锁,这里继续用了fullyRelease返回的 state if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters() { Nodet= firstWaiter; Nodetrail=null; // 循环遍历单向链表的节点,如果状态不是 CONDITION 就清出去 while (t != null) { Nodenext= t.nextWaiter; // 循环链表操作,清掉取消的节点 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node) { // 如果waitStatus 是 CONDITION 或者没有 prev 前置节点肯定就不在 if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; // 这里就是我前面提到的 next 的作用 if (node.next != null) // If has successor, it must be on queue returntrue; // 从 tail 开始找,是否在阻塞队列中 /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ privatebooleanfindNodeFromTail(Node node) { Nodet= tail; // 从 tail 开始,从后往前找 for (;;) { if (t == node) returntrue; if (t == null) returnfalse; t = t.prev; } }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); // firstWaiter 肯定是最早开始等待的 Nodefirst= firstWaiter; // 如果不为空就唤醒 if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { // 因为要去唤醒 first 节点了,firstWaiter 需要再从后面找一个 // 并且判断是否为空,如果是空的话就直接可以把 lastWaiter 设置成空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // first 不需要继续保存后面的 waiter 了,因为 firstWaiter 已经是 first 的后置节点了 first.nextWaiter = null; // 如果 first 节点转移不成功,并且 firstWaiter 节点不为空,则继续进入循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 如果状态已经不是 CONDITION 就不会设置成功,返回 false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 调用跟aqs 第一篇中一样的 enq 方法进入阻塞队列,返回入队后的前一节点 Nodep= enq(node); intws= p.waitStatus; // 将前置节点状态设置成SIGNAL,表示后面有节点在等了 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 返回 true,上一个方法的循环就退出了 returntrue; }
/* File: BoundedBuffer.java Originally written by Doug Lea and released into the public domain. This may be used for any purposes whatsoever without acknowledgment. Thanks for the assistance and support of Sun Microsystems Labs, and everyone contributing, testing, and using this code. History: Date Who What 11Jun1998 dl Create public version 17Jul1998 dl Simplified by eliminating wait counts 25aug1998 dl added peek 5May1999 dl replace % with conditional (slightly faster) */
package EDU.oswego.cs.dl.util.concurrent;
/** * Efficient array-based bounded buffer class. * Adapted from CPJ, chapter 8, which describes design. * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> **/
/** * Helper monitor to handle puts. **/ protectedfinalObjectputMonitor_=newObject();
/** * Create a BoundedBuffer with the given capacity. * @exception IllegalArgumentException if capacity less or equal to zero **/ publicBoundedBuffer(int capacity)throws IllegalArgumentException { if (capacity <= 0) thrownewIllegalArgumentException(); array_ = newObject[capacity]; emptySlots_ = capacity; }
/** * Create a buffer with the current default capacity **/
/** * Return the number of elements in the buffer. * This is only a snapshot value, that may change * immediately after returning. **/ publicsynchronizedintsize() { return usedSlots_; }
publicbooleanoffer(Object x, long msecs)throws InterruptedException { if (x == null) thrownewIllegalArgumentException(); if (Thread.interrupted()) thrownewInterruptedException();
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg) { // 前面第一种情况是tryAcquire直接成功了,这个if判断第一个条件就是false,就不往下执行了 // 如果是第二个线程,第一个条件获取锁不成功,条件判断!tryAcquire(arg) == true,就会走 // acquireQueued(addWaiter(Node.EXCLUSIVE), arg) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 这里是包装成一个node Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 最快的方式就是把当前线程的节点放在阻塞队列的最后 Nodepred= tail; // 只有当tail,也就是pred不为空的时候可以直接接上 if (pred != null) { node.prev = pred; // 如果这里cas成功了,就直接接上返回了 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 不然就会继续走到这里 enq(node); return node; }
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ publicvoidunlock() { // 释放锁 sync.release(1); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg) { // 尝试去释放 if (tryRelease(arg)) { Nodeh= head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; } protectedfinalbooleantryRelease(int releases) { intc= getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) thrownewIllegalMonitorStateException(); booleanfree=false; // 判断是否完全释放锁,因为可重入 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 这段代码和上面的一致,只是为了顺序性,又拷下来看下
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒后继节点 privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; // 如果后继节点是空或者当前节点取消等待了 if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,找到非取消的节点,注意这里不是找到就退出,而是一直找到头 // 所以不必担心中间有取消的 for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 将其唤醒 LockSupport.unpark(s.thread); }