Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

0%

AQS篇二 之 Condition 浅析笔记

Condition也是 AQS 中很重要的一块内容,可以先看段示例代码,这段代码应该来自于Doug Lea大大,可以在 javadoc 中的 condition 部分找到,其实大大原来写过基于 synchronized 实现的,后面我也贴下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

// 对象池子,put 跟 take 的就是这里的
final Object[] items = new Object[100];
int putptr, takeptr, count;

// 生产
public void put(Object x) throws InterruptedException {
// 这里也说明了,需要先拥有锁
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}

// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}

介绍下 Condition 的结构

1
2
3
4
5
6
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

主要的就这么点,而且也复用了 AQS 阻塞队列或者大大叫 lock queue中同样的 Node 节点,只不过它没有使用其中的双向队列,也就是prev 和 next,而是在 Node 中的 nextWaiter,所以只是个单向的队列,没使用 next 其实还有个用处,后面会提到,看下结构的示意图

然后主要是看两个方法,awaitsignal,
先来看下 await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();

// 将当前节点包装成一个 condition waiter node 节点
Node node = addConditionWaiter();

// 完全释放占有的锁,这里需要是占有锁的线程
int savedState = fullyRelease(node);
int interruptMode = 0;

// 判断下是否在阻塞队列中,因为有可能被其他节点从等待队列移动到阻塞队列
while (!isOnSyncQueue(node)) {
// park等待,等待被唤醒
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}

// 被唤醒后进入阻塞队列,等待获取锁,这里继续用了fullyRelease返回的 state
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

添加条件队列节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
// 如果节点已经不是 CONDITION 状态了,表示已经取消了
if (t != null && t.waitStatus != Node.CONDITION) {
// 把等待队列中取消的节点清理出去
unlinkCancelledWaiters();
t = lastWaiter;
}
// 把当前线程包装成waitStatus=CONDITION 的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 没有 lastWaiter 节点,直接是 firstWaiter
if (t == null)
firstWaiter = node;
else
// 不然就接在 lastWaiter 后面
t.nextWaiter = node;
// 当前节点就会变成新的 lastWaiter
lastWaiter = node;
return node;
}

清理取消的节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
// 循环遍历单向链表的节点,如果状态不是 CONDITION 就清出去
while (t != null) {
Node next = t.nextWaiter;
// 循环链表操作,清掉取消的节点
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

完全释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 获取下当前的 state 值,因为是可重入的,所以这个值要保存下来
int savedState = getState();
// 这里还包含比较多操作,不过跟前面分析 AQS 的释放比较类似,不深入了
if (release(savedState)) {
failed = false;
// 返回这个值
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

判断是否在阻塞队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
// 如果waitStatus 是 CONDITION 或者没有 prev 前置节点肯定就不在
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
// 这里就是我前面提到的 next 的作用
if (node.next != null) // If has successor, it must be on queue
return true;
// 从 tail 开始找,是否在阻塞队列中
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
// 从 tail 开始,从后往前找
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}

await 的逻辑差不多就是这样子,主要的就是把自己包成一个 Node 节点,waitStatus 的状态是 CONDITION,挂在等待队列的最后,然后完全释放锁,park 等待

signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// firstWaiter 肯定是最早开始等待的
Node first = firstWaiter;
// 如果不为空就唤醒
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
// 因为要去唤醒 first 节点了,firstWaiter 需要再从后面找一个
// 并且判断是否为空,如果是空的话就直接可以把 lastWaiter 设置成空了
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// first 不需要继续保存后面的 waiter 了,因为 firstWaiter 已经是 first 的后置节点了
first.nextWaiter = null;
// 如果 first 节点转移不成功,并且 firstWaiter 节点不为空,则继续进入循环
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 如果状态已经不是 CONDITION 就不会设置成功,返回 false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 调用跟aqs 第一篇中一样的 enq 方法进入阻塞队列,返回入队后的前一节点
Node p = enq(node);
int ws = p.waitStatus;
// 将前置节点状态设置成SIGNAL,表示后面有节点在等了
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
// 返回 true,上一个方法的循环就退出了
return true;
}

这里其实就是把 condition 等待队列的第一个未取消的节点入队到阻塞队列去争锁

附录

synchronized 版的 BoundedBuffer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/*
File: BoundedBuffer.java

Originally written by Doug Lea and released into the public domain.
This may be used for any purposes whatsoever without acknowledgment.
Thanks for the assistance and support of Sun Microsystems Labs,
and everyone contributing, testing, and using this code.

History:
Date Who What
11Jun1998 dl Create public version
17Jul1998 dl Simplified by eliminating wait counts
25aug1998 dl added peek
5May1999 dl replace % with conditional (slightly faster)
*/

package EDU.oswego.cs.dl.util.concurrent;

/**
* Efficient array-based bounded buffer class.
* Adapted from CPJ, chapter 8, which describes design.
* <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>] <p>
**/

public class BoundedBuffer implements BoundedChannel {

protected final Object[] array_; // the elements

protected int takePtr_ = 0; // circular indices
protected int putPtr_ = 0;

protected int usedSlots_ = 0; // length
protected int emptySlots_; // capacity - length

/**
* Helper monitor to handle puts.
**/
protected final Object putMonitor_ = new Object();

/**
* Create a BoundedBuffer with the given capacity.
* @exception IllegalArgumentException if capacity less or equal to zero
**/
public BoundedBuffer(int capacity) throws IllegalArgumentException {
if (capacity <= 0) throw new IllegalArgumentException();
array_ = new Object[capacity];
emptySlots_ = capacity;
}

/**
* Create a buffer with the current default capacity
**/

public BoundedBuffer() {
this(DefaultChannelCapacity.get());
}

/**
* Return the number of elements in the buffer.
* This is only a snapshot value, that may change
* immediately after returning.
**/
public synchronized int size() { return usedSlots_; }

public int capacity() { return array_.length; }

protected void incEmptySlots() {
synchronized(putMonitor_) {
++emptySlots_;
putMonitor_.notify();
}
}

protected synchronized void incUsedSlots() {
++usedSlots_;
notify();
}

protected final void insert(Object x) { // mechanics of put
--emptySlots_;
array_[putPtr_] = x;
if (++putPtr_ >= array_.length) putPtr_ = 0;
}

protected final Object extract() { // mechanics of take
--usedSlots_;
Object old = array_[takePtr_];
array_[takePtr_] = null;
if (++takePtr_ >= array_.length) takePtr_ = 0;
return old;
}

public Object peek() {
synchronized(this) {
if (usedSlots_ > 0)
return array_[takePtr_];
else
return null;
}
}


public void put(Object x) throws InterruptedException {
if (x == null) throw new IllegalArgumentException();
if (Thread.interrupted()) throw new InterruptedException();

synchronized(putMonitor_) {
while (emptySlots_ <= 0) {
try { putMonitor_.wait(); }
catch (InterruptedException ex) {
putMonitor_.notify();
throw ex;
}
}
insert(x);
}
incUsedSlots();
}

public boolean offer(Object x, long msecs) throws InterruptedException {
if (x == null) throw new IllegalArgumentException();
if (Thread.interrupted()) throw new InterruptedException();

synchronized(putMonitor_) {
long start = (msecs <= 0)? 0 : System.currentTimeMillis();
long waitTime = msecs;
while (emptySlots_ <= 0) {
if (waitTime <= 0) return false;
try { putMonitor_.wait(waitTime); }
catch (InterruptedException ex) {
putMonitor_.notify();
throw ex;
}
waitTime = msecs - (System.currentTimeMillis() - start);
}
insert(x);
}
incUsedSlots();
return true;
}



public Object take() throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Object old = null;
synchronized(this) {
while (usedSlots_ <= 0) {
try { wait(); }
catch (InterruptedException ex) {
notify();
throw ex;
}
}
old = extract();
}
incEmptySlots();
return old;
}

public Object poll(long msecs) throws InterruptedException {
if (Thread.interrupted()) throw new InterruptedException();
Object old = null;
synchronized(this) {
long start = (msecs <= 0)? 0 : System.currentTimeMillis();
long waitTime = msecs;

while (usedSlots_ <= 0) {
if (waitTime <= 0) return null;
try { wait(waitTime); }
catch (InterruptedException ex) {
notify();
throw ex;
}
waitTime = msecs - (System.currentTimeMillis() - start);

}
old = extract();
}
incEmptySlots();
return old;
}

}
请我喝杯咖啡