Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

线程池在实际使用过程中,有时候在理解比较偏理论的时候会出现一些判断错误,这里我们就来看一个实际的案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(1, 1,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 100; i++) {
Thread.sleep(100);
for (int j = 0; j < 3; j++) {
threadPoolExecutor.execute(() -> {
int a = 0;
});
}
System.out.println("===============> 详情任务 - 任务处理完成");
}
System.out.println("都执行完成了");
}

MyArrayBlockingQueue 是我复制的 ArrayBlockingQueue 加了点日志,可以认为就是一样的,这种情况下
执行过程是怎么样的呢, 队列长度是 2,核心线程数和最大线程数都是 1,提交任务是采用了两层循环,内层是循环三次,往线程池里提交任务,然后内层循环完了以后会重新睡眠 100 毫秒
在进入下一次外层循环,如果能一眼看出来问题的说明对线程池了解得很深入了,如果没有的话我们就一起来看下
先说下结论,这个代码会出现拒绝异常

考虑下是什么原因呢,是不是我线程数太少了,放大一些,感觉符合直觉一点
修改成

1
2
3
4
private static final ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(100, 100,
0, TimeUnit.MINUTES,
new MyArrayBlockingQueue<>(2));

然而还是一样

只不过晚了点出现,那么问题出在哪呢
为什么我要去重写这个 MyArrayBlockingQueue,就是为了找到原因,其实很多讲解线程池的都是讲了线程池的参数,什么队列是链表的,数组的
但是没有讲到我是怎么往队列塞任务,怎么从队列取任务的呢

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

这里是往队列里塞任务,注意这里需要获得锁,
而对于获取任务呢

1
2
3
4
5
6
7
8
9
10
11
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

注意这里也需要获得锁,当我一个线程池的线程数进入稳定状态,也就是保持一定数量的线程不变的情况下
上面是一种比较可能的情况,即核心线程数等于最大线程数,那么我在提交任务的时候是非常快的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}

再来看下这段代码,第一步只需要判断是否为空,第二步就是判断核心线程数量,明显我说的情况,前面两步就直接过去了
然后就是判断线程池运行状态和往队列里塞任务了,但是线程运行完一个任务主动从队列里获取则需要更多的逻辑
这样就造成了我往队列里塞任务会比获取任务快很多,队列一满,就会抛出拒绝异常
即使我把线程数量放大到 100 还是一样,只不过会出现的慢一点,那么口说无凭,我们来验证下,提交任务过快,那么我在提交
方法里做个延迟

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length) {
return false;
}
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

这样就没啥问题了

除了最后这个加延时,其他的直接用 ArrayBlockingQueue 就可以实验,实操一下会对这个逻辑有更深的理解

https 和 tls

https 我原本的理解来自于几个概念

第一个是非对称加密

也就是公钥可以公开在网上,然后用公钥加密,由私有的私钥进行解密,然后就对 https 有了初步的偏颇的概念,就是浏览器用公钥进行加密,服务器端用私钥进行解密,这里再仔细想想会延伸出来几个问题,也是逐步完善我的后续理解

第二个就是非对称加密以后的对称加密

因为对加密的理解是超大的质数(这个还来源于大学的数据结构老师)相乘后会很难做质因数分解,比如最大的质数2^82,589,933 − 1,用十进制表示有24,862,048位,用现在计算机来算也是非常难的,那如果一直采用非对称加密,这个效率也会比较低,所以在网上查了一番之后知道了是先采用非对称加密,然后再用对称加密,因为非对称加密先有了安全保证,后续的消息就可以用对称加密的方式来进行传输安全了

第三个是最近一次看到的

也是一直缺乏思考的问题,因为 https 我们在使用的时候都会购买证书或者使用免费的证书,那这个证书和前面说的公私钥是什么关系,好像一直是悬空的,没想过这中间的相关性,证书究竟是怎么起作用的

下面是摘自 cloudflare 的 tls 握手流程,把证书的使用方式说的比较明白了

TLS 握手是由客户端和服务器交换的一系列数据报或消息。TLS 握手涉及多个步骤,因为客户端和服务器要交换完成握手和进行进一步对话所需的信息。

TLS 握手中的确切步骤将根据所使用的密钥交换算法的种类和双方支持的密码套件而有所不同。RSA 密钥交换算法虽然现在被认为不安全,但曾在 1.3 之前的 TLS 版本中使用。大致如下:

  1. “客户端问候(client hello)” 消息: 客户端通过向服务器发送“问候”消息来开始握手。该消息将包含客户端支持的 TLS 版本,支持的密码套件,以及称为一串称为“客户端随机数(client random)”的随机字节。
  2. “服务器问候(server hello)”消息: 作为对 client hello 消息的回复,服务器发送一条消息,内含服务器的 SSL 证书、服务器选择的密码套件,以及“服务器随机数(server random)”,即由服务器生成的另一串随机字节。
  3. 身份验证: 客户端使用颁发该证书的证书颁发机构验证服务器的 SSL 证书。此举确认服务器是其声称的身份,且客户端正在与该域的实际所有者进行交互。
  4. 预主密钥: 客户端再发送一串随机字节,即“预主密钥(premaster secret)”。预主密钥是使用公钥加密的,只能使用服务器的私钥解密。(客户端从服务器的 SSL 证书中获得公钥。)
  5. 私钥被使用:服务器对预主密钥进行解密。
  6. 生成会话密钥:客户端和服务器均使用客户端随机数、服务器随机数和预主密钥生成会话密钥。双方应得到相同的结果。
  7. 客户端就绪:客户端发送一条“已完成”消息,该消息用会话密钥加密。
  8. 服务器就绪:服务器发送一条“已完成”消息,该消息用会话密钥加密。
  9. 实现安全对称加密:已完成握手,并使用会话密钥继续进行通信。

为了学习这个过程,我们尝试用 Chrome 的自带抓包工具,以往这是可以通过在 Chrome 地址栏中输入 chrome://net-internals/#events 来打开,现在变成了两部分,chrome://net-export/ 这里可以开始抓包,然后会记录下一个抓包日志文件里,

然后再打开

https://netlog-viewer.appspot.com/#events 来查看具体的日志

这里我们以打开百度为例,即在打开 chrome://net-export/ 并启动抓包后,再在一个新 tab 打开 baidu.com,然后关闭

将日志文件在 https://netlog-viewer.appspot.com/#events 打开

这里可以看到

--> type 1 表示现在 tls 握手进行到哪一步了, 对应的值表示不同的阶段

代码(type)释义
0HelloRequest
1ClientHello
2ServerHello
11Certificate
12ServerKeyExchange
13CertificateRequest
14ServerHelloDone
15CertificateVerify
16ClientKeyExchange
20Finished

具体的对应关系是在这边

比如 11 之后表示我们从服务端收到了证书

然后就要去验证证书的可靠性

后面是验证后的结果

最后就是交换对称会话秘钥然后完成握手

这一篇主要补充两个内容,第一部分就是获取任务的逻辑
首先是状态判断,如果是停止了,SHUTDOWN或更大的了,就需要减小工作线程数量
并返回 null,使得工作线程 worker 退出,然后再判断线程数量和超时,同样如果超过了就会返回 null
然后就是去阻塞队列里获取任务,这里是阻塞着获取的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 状态异常,如果是大于等于 SHUTDOWN 的,则协助关闭线程池
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程数量超过核心线程数,则帮助减少线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

// 如果前面的不符合,则从阻塞队列获取任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

阻塞队列的 poll 主要是通过锁,和notEmpty这个 condition 来等待制定的时间
指定时间后开始 dequeue 出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

第二部分比较重要,有的同学的问题是为什么不一开始开到最大线程,而是达到核心线程数后就进队列了,
其实池化技术最简单的原因就是希望能够复用这些线程,因为创建销毁他们的成本太大了,如果直接最大线程数的话
其实都不用用到线程池技术了,直接有多少任务就开多少线程,用完就丢了,阐述下我认为比较重要的概念点

第三篇我们要来讲下 Worker 这个“打工人”,线程池里实际在干活的同学
Worker 实现了 Runnable 接口,所以在前面介绍的 addWorker
线程启动其实就调用了下面的 run 方法,而 run 方法则调用了内部的
runWorker 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
// 启动线程 Worker
public void run() {
runWorker(this);
}

介绍下这个 runWorker 方法
这里就会先处理 firstTask,如果 firstTask 是 null 则会去队列里取
里面会有两个钩子,一个是执行前,一个是执行后,注意这个循环是没有常规
退出逻辑的,说明是一直在往队列里获取,除非获取队列里的任务超时失败了
这样就到最后会去执行 processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// firstTask 不为空或者从队列里获取到任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置执行钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完后的钩子
afterExecute(task, thrown);
}
} finally {
// 最后会执行到这,清空任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 循环的最后就会去退出Worker
processWorkerExit(w, completedAbruptly);
}
}

而这个 processWorkerExit 也有个比较特殊的逻辑
照理我们会想到这里可能就是线程在处理完队列里的任务
以后,就会判断下是否要退出,如果是核心线程的话就不用
如果是非核心线程就会退出了,但是这里其实有个替换逻辑
就是最后有个 addWorker 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
// 判断状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 核心线程是否也需要退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 这里代表非核心线程就可以不用替换,也就是回收线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 否则是替换线程
addWorker(null, false);
}
}

这里主要讲了线程 Worker 运行的逻辑

介绍了线程池的目的和实现概述,以及如何初始化的,我们就来开始看看线程池最重要的执行过程
老规矩,还是先把注释翻译下,这个对理解逻辑其实非常重要,后面可以循着注释的逻辑来看代码
第一步,如果是少于核心线程数的线程正在运行,那么尝试去开启一个新线程,并把提交的命令
command 最为 first task,这里调用了 addWorker,会自动检查运行状态和线程数,并通过
返回 false 来提示不应该增加线程,比如已经有其他任务先创建了线程导致已经超过了核心线程数
第二步,在前面不符合的情况下,也就是线程数已经大于等于核心线程数了或者在调用 addWorker
的时候校验发现线程数已经大于等于核心线程数或者线程池运行状态不是正在运行了就会进入下一个判断
首先还是判断线程池是否正在运行,然后将 command 放进队列,如果成功进队了,还需要再次进行校验
如果线程池状态不是正在运行了,则需要再出队,然后执行拒绝策略,如果状态正常,但是线程被回收完了
那需要创建线程
第三步,如果不能正常进队列,会尝试再启动一个新线程,这里表示核心线程数满了,并且队列也满了,
则需要再开启一个新线程,如果开启失败则执行拒绝策略
代码注释基本已经把代码讲得很详细了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 这就是第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 注意这里是第一步可能失败了,获取一下最新的状态
c = ctl.get();
}
// 这里是第二步
if (isRunning(c) && workQueue.offer(command)) {
// 每次都获取最新的状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 这里是判断如果一个线程都没了的话,注意添加的是非核心线程
addWorker(null, false);
}
// 这里第三步是添加线程,并且是非核心线程
else if (!addWorker(command, false))
reject(command);
}

这里面比较重要的就是 addWorker 方法,我们也来看一下
这里用到了 break retry; 就是在循环中跳出到这个 retry 的位置继续执行
然后还是获取运行状态,如果是非运行状态,
并且rs == SHUTDOWNfirstTask == null! workQueue.isEmpty(),至少一个为false
因为只有当 SHUTDOWN 状态才会出现后面两者为 false 的情况,如果是 SHUTDOWN ,队列不为空还是需要执行完队列里的任务
然后是判断线程数量与传入参数 core 还有核心线程数跟最大线程数的对比,如果是超过了就返回 false 也就是外层的第一步的内部
if 就会继续执行后面的获取线程池状态,然后是 cas 增加线程数量,如果失败则 break 跳到方法开头,如果成功则继续重新获取 c
判断线程池运行状态,如果不一致则从外层继续进入
后面的就是新建一个 worker,然后获取锁,继续判断线程池状态,如果在运行中或者队列不为空(隐含条件)则继续执行添加 worker
并判断是否需要更新历史最大线程数,更改线程添加状态,然后启动线程,标记线程启动状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

后面会继续讲下线程如何执行队列里的任务

0%