Java 线程池系列-第二篇

介绍了线程池的目的和实现概述,以及如何初始化的,我们就来开始看看线程池最重要的执行过程
老规矩,还是先把注释翻译下,这个对理解逻辑其实非常重要,后面可以循着注释的逻辑来看代码
第一步,如果是少于核心线程数的线程正在运行,那么尝试去开启一个新线程,并把提交的命令
command 最为 first task,这里调用了 addWorker,会自动检查运行状态和线程数,并通过
返回 false 来提示不应该增加线程,比如已经有其他任务先创建了线程导致已经超过了核心线程数
第二步,在前面不符合的情况下,也就是线程数已经大于等于核心线程数了或者在调用 addWorker
的时候校验发现线程数已经大于等于核心线程数或者线程池运行状态不是正在运行了就会进入下一个判断
首先还是判断线程池是否正在运行,然后将 command 放进队列,如果成功进队了,还需要再次进行校验
如果线程池状态不是正在运行了,则需要再出队,然后执行拒绝策略,如果状态正常,但是线程被回收完了
那需要创建线程
第三步,如果不能正常进队列,会尝试再启动一个新线程,这里表示核心线程数满了,并且队列也满了,
则需要再开启一个新线程,如果开启失败则执行拒绝策略
代码注释基本已经把代码讲得很详细了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 这就是第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 注意这里是第一步可能失败了,获取一下最新的状态
c = ctl.get();
}
// 这里是第二步
if (isRunning(c) && workQueue.offer(command)) {
// 每次都获取最新的状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 这里是判断如果一个线程都没了的话,注意添加的是非核心线程
addWorker(null, false);
}
// 这里第三步是添加线程,并且是非核心线程
else if (!addWorker(command, false))
reject(command);
}

这里面比较重要的就是 addWorker 方法,我们也来看一下
这里用到了 break retry; 就是在循环中跳出到这个 retry 的位置继续执行
然后还是获取运行状态,如果是非运行状态,
并且rs == SHUTDOWNfirstTask == null! workQueue.isEmpty(),至少一个为false
因为只有当 SHUTDOWN 状态才会出现后面两者为 false 的情况,如果是 SHUTDOWN ,队列不为空还是需要执行完队列里的任务
然后是判断线程数量与传入参数 core 还有核心线程数跟最大线程数的对比,如果是超过了就返回 false 也就是外层的第一步的内部
if 就会继续执行后面的获取线程池状态,然后是 cas 增加线程数量,如果失败则 break 跳到方法开头,如果成功则继续重新获取 c
判断线程池运行状态,如果不一致则从外层继续进入
后面的就是新建一个 worker,然后获取锁,继续判断线程池状态,如果在运行中或者队列不为空(隐含条件)则继续执行添加 worker
并判断是否需要更新历史最大线程数,更改线程添加状态,然后启动线程,标记线程启动状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

后面会继续讲下线程如何执行队列里的任务