Java 线程池系列-第四篇

这一篇主要补充两个内容,第一部分就是获取任务的逻辑
首先是状态判断,如果是停止了,SHUTDOWN或更大的了,就需要减小工作线程数量
并返回 null,使得工作线程 worker 退出,然后再判断线程数量和超时,同样如果超过了就会返回 null
然后就是去阻塞队列里获取任务,这里是阻塞着获取的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 状态异常,如果是大于等于 SHUTDOWN 的,则协助关闭线程池
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程数量超过核心线程数,则帮助减少线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

// 如果前面的不符合,则从阻塞队列获取任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

阻塞队列的 poll 主要是通过锁,和notEmpty这个 condition 来等待制定的时间
指定时间后开始 dequeue 出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

第二部分比较重要,有的同学的问题是为什么不一开始开到最大线程,而是达到核心线程数后就进队列了,
其实池化技术最简单的原因就是希望能够复用这些线程,因为创建销毁他们的成本太大了,如果直接最大线程数的话
其实都不用用到线程池技术了,直接有多少任务就开多少线程,用完就丢了,阐述下我认为比较重要的概念点