这个真的长见识了, 可以看到,原来是 A,B,C 对象引用了 N,这里会在第一次遍历的时候把这种引用反过来,让 N 的对象头部保存下 A 的地址,表示这类引用,然后在遍历到 B 的时候在链起来,到最后就会把所有引用了 N 对象的所有对象通过引线链起来,在第二次遍历的时候就把更新A,B,C 对象引用的 N 地址,并且移动 N 对象
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ publicfinalvoidawait()throws InterruptedException { if (Thread.interrupted()) thrownewInterruptedException();
// 判断下是否在阻塞队列中,因为有可能被其他节点从等待队列移动到阻塞队列 while (!isOnSyncQueue(node)) { // park等待,等待被唤醒 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
// 被唤醒后进入阻塞队列,等待获取锁,这里继续用了fullyRelease返回的 state if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
/** * Unlinks cancelled waiter nodes from condition queue. * Called only while holding lock. This is called when * cancellation occurred during condition wait, and upon * insertion of a new waiter when lastWaiter is seen to have * been cancelled. This method is needed to avoid garbage * retention in the absence of signals. So even though it may * require a full traversal, it comes into play only when * timeouts or cancellations occur in the absence of * signals. It traverses all nodes rather than stopping at a * particular target to unlink all pointers to garbage nodes * without requiring many re-traversals during cancellation * storms. */ privatevoidunlinkCancelledWaiters() { Nodet= firstWaiter; Nodetrail=null; // 循环遍历单向链表的节点,如果状态不是 CONDITION 就清出去 while (t != null) { Nodenext= t.nextWaiter; // 循环链表操作,清掉取消的节点 if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. * @param node the node * @return true if is reacquiring */ finalbooleanisOnSyncQueue(Node node) { // 如果waitStatus 是 CONDITION 或者没有 prev 前置节点肯定就不在 if (node.waitStatus == Node.CONDITION || node.prev == null) returnfalse; // 这里就是我前面提到的 next 的作用 if (node.next != null) // If has successor, it must be on queue returntrue; // 从 tail 开始找,是否在阻塞队列中 /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ return findNodeFromTail(node); } /** * Returns true if node is on sync queue by searching backwards from tail. * Called only when needed by isOnSyncQueue. * @return true if present */ privatebooleanfindNodeFromTail(Node node) { Nodet= tail; // 从 tail 开始,从后往前找 for (;;) { if (t == node) returntrue; if (t == null) returnfalse; t = t.prev; } }
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ publicfinalvoidsignal() { if (!isHeldExclusively()) thrownewIllegalMonitorStateException(); // firstWaiter 肯定是最早开始等待的 Nodefirst= firstWaiter; // 如果不为空就唤醒 if (first != null) doSignal(first); } /** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ privatevoiddoSignal(Node first) { do { // 因为要去唤醒 first 节点了,firstWaiter 需要再从后面找一个 // 并且判断是否为空,如果是空的话就直接可以把 lastWaiter 设置成空了 if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // first 不需要继续保存后面的 waiter 了,因为 firstWaiter 已经是 first 的后置节点了 first.nextWaiter = null; // 如果 first 节点转移不成功,并且 firstWaiter 节点不为空,则继续进入循环 } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ finalbooleantransferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ // 如果状态已经不是 CONDITION 就不会设置成功,返回 false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) returnfalse; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ // 调用跟aqs 第一篇中一样的 enq 方法进入阻塞队列,返回入队后的前一节点 Nodep= enq(node); intws= p.waitStatus; // 将前置节点状态设置成SIGNAL,表示后面有节点在等了 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); // 返回 true,上一个方法的循环就退出了 returntrue; }
package EDU.oswego.cs.dl.util.concurrent;
/** * Efficient array-based bounded buffer class. * Adapted from CPJ, chapter 8, which describes design. * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p> **/
/** * Helper monitor to handle puts. **/ protectedfinalObjectputMonitor_=newObject();
/** * Create a BoundedBuffer with the given capacity. * @exception IllegalArgumentException if capacity less or equal to zero **/ publicBoundedBuffer(int capacity)throws IllegalArgumentException { if (capacity <= 0) thrownewIllegalArgumentException(); array_ = newObject[capacity]; emptySlots_ = capacity; }
/** * Create a buffer with the current default capacity **/
/** * Return the number of elements in the buffer. * This is only a snapshot value, that may change * immediately after returning. **/ publicsynchronizedintsize() { return usedSlots_; }
publicbooleanoffer(Object x, long msecs)throws InterruptedException { if (x == null) thrownewIllegalArgumentException(); if (Thread.interrupted()) thrownewInterruptedException();
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ publicfinalvoidacquire(int arg) { // 前面第一种情况是tryAcquire直接成功了,这个if判断第一个条件就是false,就不往下执行了 // 如果是第二个线程,第一个条件获取锁不成功,条件判断!tryAcquire(arg) == true,就会走 // acquireQueued(addWaiter(Node.EXCLUSIVE), arg) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 这里是包装成一个node Nodenode=newNode(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 最快的方式就是把当前线程的节点放在阻塞队列的最后 Nodepred= tail; // 只有当tail,也就是pred不为空的时候可以直接接上 if (pred != null) { node.prev = pred; // 如果这里cas成功了,就直接接上返回了 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 不然就会继续走到这里 enq(node); return node; }
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ publicvoidunlock() { // 释放锁 sync.release(1); } /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ publicfinalbooleanrelease(int arg) { // 尝试去释放 if (tryRelease(arg)) { Nodeh= head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); returntrue; } returnfalse; } protectedfinalbooleantryRelease(int releases) { intc= getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) thrownewIllegalMonitorStateException(); booleanfree=false; // 判断是否完全释放锁,因为可重入 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } // 这段代码和上面的一致,只是为了顺序性,又拷下来看下
/** * Wakes up node's successor, if one exists. * * @param node the node */ // 唤醒后继节点 privatevoidunparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ intws= node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
/* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Nodes= node.next; // 如果后继节点是空或者当前节点取消等待了 if (s == null || s.waitStatus > 0) { s = null; // 从后往前找,找到非取消的节点,注意这里不是找到就退出,而是一直找到头 // 所以不必担心中间有取消的 for (Nodet= tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 将其唤醒 LockSupport.unpark(s.thread); }