Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

https 和 tls

https 我原本的理解来自于几个概念

第一个是非对称加密

也就是公钥可以公开在网上,然后用公钥加密,由私有的私钥进行解密,然后就对 https 有了初步的偏颇的概念,就是浏览器用公钥进行加密,服务器端用私钥进行解密,这里再仔细想想会延伸出来几个问题,也是逐步完善我的后续理解

第二个就是非对称加密以后的对称加密

因为对加密的理解是超大的质数(这个还来源于大学的数据结构老师)相乘后会很难做质因数分解,比如最大的质数2^82,589,933 − 1,用十进制表示有24,862,048位,用现在计算机来算也是非常难的,那如果一直采用非对称加密,这个效率也会比较低,所以在网上查了一番之后知道了是先采用非对称加密,然后再用对称加密,因为非对称加密先有了安全保证,后续的消息就可以用对称加密的方式来进行传输安全了

第三个是最近一次看到的

也是一直缺乏思考的问题,因为 https 我们在使用的时候都会购买证书或者使用免费的证书,那这个证书和前面说的公私钥是什么关系,好像一直是悬空的,没想过这中间的相关性,证书究竟是怎么起作用的

下面是摘自 cloudflare 的 tls 握手流程,把证书的使用方式说的比较明白了

TLS 握手是由客户端和服务器交换的一系列数据报或消息。TLS 握手涉及多个步骤,因为客户端和服务器要交换完成握手和进行进一步对话所需的信息。

TLS 握手中的确切步骤将根据所使用的密钥交换算法的种类和双方支持的密码套件而有所不同。RSA 密钥交换算法虽然现在被认为不安全,但曾在 1.3 之前的 TLS 版本中使用。大致如下:

  1. “客户端问候(client hello)” 消息: 客户端通过向服务器发送“问候”消息来开始握手。该消息将包含客户端支持的 TLS 版本,支持的密码套件,以及称为一串称为“客户端随机数(client random)”的随机字节。
  2. “服务器问候(server hello)”消息: 作为对 client hello 消息的回复,服务器发送一条消息,内含服务器的 SSL 证书、服务器选择的密码套件,以及“服务器随机数(server random)”,即由服务器生成的另一串随机字节。
  3. 身份验证: 客户端使用颁发该证书的证书颁发机构验证服务器的 SSL 证书。此举确认服务器是其声称的身份,且客户端正在与该域的实际所有者进行交互。
  4. 预主密钥: 客户端再发送一串随机字节,即“预主密钥(premaster secret)”。预主密钥是使用公钥加密的,只能使用服务器的私钥解密。(客户端从服务器的 SSL 证书中获得公钥。)
  5. 私钥被使用:服务器对预主密钥进行解密。
  6. 生成会话密钥:客户端和服务器均使用客户端随机数、服务器随机数和预主密钥生成会话密钥。双方应得到相同的结果。
  7. 客户端就绪:客户端发送一条“已完成”消息,该消息用会话密钥加密。
  8. 服务器就绪:服务器发送一条“已完成”消息,该消息用会话密钥加密。
  9. 实现安全对称加密:已完成握手,并使用会话密钥继续进行通信。

为了学习这个过程,我们尝试用 Chrome 的自带抓包工具,以往这是可以通过在 Chrome 地址栏中输入 chrome://net-internals/#events 来打开,现在变成了两部分,chrome://net-export/ 这里可以开始抓包,然后会记录下一个抓包日志文件里,

然后再打开

https://netlog-viewer.appspot.com/#events 来查看具体的日志

这里我们以打开百度为例,即在打开 chrome://net-export/ 并启动抓包后,再在一个新 tab 打开 baidu.com,然后关闭

将日志文件在 https://netlog-viewer.appspot.com/#events 打开

这里可以看到

--> type 1 表示现在 tls 握手进行到哪一步了, 对应的值表示不同的阶段

代码(type)释义
0HelloRequest
1ClientHello
2ServerHello
11Certificate
12ServerKeyExchange
13CertificateRequest
14ServerHelloDone
15CertificateVerify
16ClientKeyExchange
20Finished

具体的对应关系是在这边

比如 11 之后表示我们从服务端收到了证书

然后就要去验证证书的可靠性

后面是验证后的结果

最后就是交换对称会话秘钥然后完成握手

这一篇主要补充两个内容,第一部分就是获取任务的逻辑
首先是状态判断,如果是停止了,SHUTDOWN或更大的了,就需要减小工作线程数量
并返回 null,使得工作线程 worker 退出,然后再判断线程数量和超时,同样如果超过了就会返回 null
然后就是去阻塞队列里获取任务,这里是阻塞着获取的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
// 状态异常,如果是大于等于 SHUTDOWN 的,则协助关闭线程池
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程数量超过核心线程数,则帮助减少线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

// 如果前面的不符合,则从阻塞队列获取任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

阻塞队列的 poll 主要是通过锁,和notEmpty这个 condition 来等待制定的时间
指定时间后开始 dequeue 出队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

第二部分比较重要,有的同学的问题是为什么不一开始开到最大线程,而是达到核心线程数后就进队列了,
其实池化技术最简单的原因就是希望能够复用这些线程,因为创建销毁他们的成本太大了,如果直接最大线程数的话
其实都不用用到线程池技术了,直接有多少任务就开多少线程,用完就丢了,阐述下我认为比较重要的概念点

第三篇我们要来讲下 Worker 这个“打工人”,线程池里实际在干活的同学
Worker 实现了 Runnable 接口,所以在前面介绍的 addWorker
线程启动其实就调用了下面的 run 方法,而 run 方法则调用了内部的
runWorker 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
// 启动线程 Worker
public void run() {
runWorker(this);
}

介绍下这个 runWorker 方法
这里就会先处理 firstTask,如果 firstTask 是 null 则会去队列里取
里面会有两个钩子,一个是执行前,一个是执行后,注意这个循环是没有常规
退出逻辑的,说明是一直在往队列里获取,除非获取队列里的任务超时失败了
这样就到最后会去执行 processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// firstTask 不为空或者从队列里获取到任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置执行钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完后的钩子
afterExecute(task, thrown);
}
} finally {
// 最后会执行到这,清空任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 循环的最后就会去退出Worker
processWorkerExit(w, completedAbruptly);
}
}

而这个 processWorkerExit 也有个比较特殊的逻辑
照理我们会想到这里可能就是线程在处理完队列里的任务
以后,就会判断下是否要退出,如果是核心线程的话就不用
如果是非核心线程就会退出了,但是这里其实有个替换逻辑
就是最后有个 addWorker 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
// 判断状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 核心线程是否也需要退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 这里代表非核心线程就可以不用替换,也就是回收线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 否则是替换线程
addWorker(null, false);
}
}

这里主要讲了线程 Worker 运行的逻辑

介绍了线程池的目的和实现概述,以及如何初始化的,我们就来开始看看线程池最重要的执行过程
老规矩,还是先把注释翻译下,这个对理解逻辑其实非常重要,后面可以循着注释的逻辑来看代码
第一步,如果是少于核心线程数的线程正在运行,那么尝试去开启一个新线程,并把提交的命令
command 最为 first task,这里调用了 addWorker,会自动检查运行状态和线程数,并通过
返回 false 来提示不应该增加线程,比如已经有其他任务先创建了线程导致已经超过了核心线程数
第二步,在前面不符合的情况下,也就是线程数已经大于等于核心线程数了或者在调用 addWorker
的时候校验发现线程数已经大于等于核心线程数或者线程池运行状态不是正在运行了就会进入下一个判断
首先还是判断线程池是否正在运行,然后将 command 放进队列,如果成功进队了,还需要再次进行校验
如果线程池状态不是正在运行了,则需要再出队,然后执行拒绝策略,如果状态正常,但是线程被回收完了
那需要创建线程
第三步,如果不能正常进队列,会尝试再启动一个新线程,这里表示核心线程数满了,并且队列也满了,
则需要再开启一个新线程,如果开启失败则执行拒绝策略
代码注释基本已经把代码讲得很详细了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 这就是第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 注意这里是第一步可能失败了,获取一下最新的状态
c = ctl.get();
}
// 这里是第二步
if (isRunning(c) && workQueue.offer(command)) {
// 每次都获取最新的状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 这里是判断如果一个线程都没了的话,注意添加的是非核心线程
addWorker(null, false);
}
// 这里第三步是添加线程,并且是非核心线程
else if (!addWorker(command, false))
reject(command);
}

这里面比较重要的就是 addWorker 方法,我们也来看一下
这里用到了 break retry; 就是在循环中跳出到这个 retry 的位置继续执行
然后还是获取运行状态,如果是非运行状态,
并且rs == SHUTDOWNfirstTask == null! workQueue.isEmpty(),至少一个为false
因为只有当 SHUTDOWN 状态才会出现后面两者为 false 的情况,如果是 SHUTDOWN ,队列不为空还是需要执行完队列里的任务
然后是判断线程数量与传入参数 core 还有核心线程数跟最大线程数的对比,如果是超过了就返回 false 也就是外层的第一步的内部
if 就会继续执行后面的获取线程池状态,然后是 cas 增加线程数量,如果失败则 break 跳到方法开头,如果成功则继续重新获取 c
判断线程池运行状态,如果不一致则从外层继续进入
后面的就是新建一个 worker,然后获取锁,继续判断线程池状态,如果在运行中或者队列不为空(隐含条件)则继续执行添加 worker
并判断是否需要更新历史最大线程数,更改线程添加状态,然后启动线程,标记线程启动状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

后面会继续讲下线程如何执行队列里的任务

这一篇我们继续聊线程池,一般线程池会介绍我们的参数,我先不一样一些
我们先来翻译一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
 An {@link ExecutorService} that executes each submitted task using
one of possibly several pooled threads, normally configured
using {@link Executors} factory methods.
// executorService 使用一个线程池中的线程来执行每个提交的任务,经常使用 Executors 工厂方法

<p>Thread pools address two different problems: they usually
provide improved performance when executing large numbers of
asynchronous tasks, due to reduced per-task invocation overhead,
and they provide a means of bounding and managing the resources,
including threads, consumed when executing a collection of tasks.
Each {@code ThreadPoolExecutor} also maintains some basic
statistics, such as the number of completed tasks.
// 线程池解决两个不同的问题,通常是为了在提交大量异步任务时,通过减少每个任务的调度开销来提高性能
// 并且提供了一种管理和约束资源,包括线程消耗的方法,每个 ThreadPoolExecutor还会管理一些基础的统计信息
// 比如完成的任务数量


<p>To be useful across a wide range of contexts, this class
provides many adjustable parameters and extensibility
hooks. However, programmers are urged to use the more convenient
{@link Executors} factory methods {@link
Executors#newCachedThreadPool} (unbounded thread pool, with
automatic thread reclamation), {@link Executors#newFixedThreadPool}
(fixed size thread pool) and {@link
Executors#newSingleThreadExecutor} (single background thread), that
preconfigure settings for the most common usage
scenarios. Otherwise, use the following guide when manually
configuring and tuning this class:
// 为了在各种情况下都能派上用场,该类提供了许多可调整的参数和扩展钩子。不过,我们建议程序员使用更方便的 Executors 工厂方法 Executors.
// newCachedThreadPool(无限制线程池,可自动回收线程)、Executors.newFixedThreadPool(固定大小线程池)和 Executors.
// newSingleThreadExecutor(单后台线程),这些方法可为最常见的使用场景预先配置设置。否则,在手动配置和调整该类时,请使用以下指南:

Core and maximum pool sizes
// 核心和最大线程池大小
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize.
// ThreadPoolExecutor 会根据 corePoolSize(参见 getCorePoolSize)和 maximumPoolSize(参见 getMaximumPoolSize)设置的界限自动调整池大小(参见 getPoolSize)。在方法 execute(Runnable) 中提交新任务时,如果运行的线程少于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。如果运行的线程多于 corePoolSize 但少于 maximumPoolSize,则只有在队列已满时才会创建新线程。如果将 corePoolSize 和 maximumPoolSize 设置为相同,就可以创建一个固定大小的线程池。如果将 maximumPoolSize 设置为 Integer.MAX_VALUE 等基本无限制的值,就可以让池容纳任意数量的并发任务。通常,核心和最大线程池大小只在构建时设置,但也可以使用 setCorePoolSize 和 setMaximumPoolSize 动态更改。

On-demand construction
// 按需构建
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread or prestartAllCoreThreads. You probably want to prestart threads if you construct the pool with a non-empty queue.
// 默认情况下,即使是核心线程也是只有当新任务到来时才会初始创建和启动,但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 进行动态重载。如果使用非空队列构建线程池,则可能需要预启动线程。
Creating new threads
// 创建新线程
/** New threads are created using a ThreadFactory. If not otherwise specified, a Executors.defaultThreadFactory is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Threads should possess the "modifyThread" RuntimePermission. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed. */
// 新线程是使用线程工厂(ThreadFactory)创建的。如果未另行指定,将使用 Executors.defaultThreadFactory 创建线程,创建的线程将全部位于同一线程组,并具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护进程状态等。如果线程工厂无法按要求创建线程,从 newThread 返回空值,执行器将继续运行,但可能无法执行任何任务。线程应拥有 "modifyThread "运行时权限。如果工作线程或使用池的其他线程不具备此权限,服务可能会降级:配置更改可能无法及时生效,关闭的池可能会一直处于可以终止但无法完成的状态。
Keep-alive times
// 保活时间
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.
// 如果当前池中的线程数超过 corePoolSize,多余的线程在闲置时间超过 keepAliveTime(请参阅 getKeepAliveTime(TimeUnit))时将被终止。这提供了一种在线程池未被频繁使用时减少资源消耗的方法。如果以后池变得更加活跃,就会构建新线程。也可以使用方法 setKeepAliveTime(long, TimeUnit) 动态更改该参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 值可有效禁止闲置线程在关闭前终止。默认情况下,只有当线程数超过 corePoolSize 时,才会采用保持连接策略。但只要 keepAliveTime 值不为零,就可以使用方法 allowCoreThreadTimeOut(boolean) 将超时策略也应用到核心线程。
Queuing
// 队列
Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
// 任何阻塞队列(BlockingQueue)都可用于传输和保留已提交的任务。该队列的使用与线程池大小有关:
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
// 如果运行的线程少于 corePoolSize,执行器总是倾向于添加新线程而不是队列。
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
// 如果运行的线程数大于或等于 corePoolSize,执行器总是倾向于将请求排队,而不是添加新线程。
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
// 如果请求无法进队列,执行器会创建一个新线程,除非这样做会超过最大线程池大小(maximumPoolSize),在这种情况下,任务会被拒绝。
There are three general strategies for queuing:
// 入队一般有三种策略:
Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
// 直接交接。工作队列的一个很好的默认选择是同步队列(SynchronousQueue),它可以将任务移交给线程,而不会以其他方式保留任务。在这里,如果没有线程可立即运行任务,则队列任务的尝试将失败,因此会构建一个新的线程。在处理可能存在内部依赖关系的请求集时,这种策略可以避免死锁。直接交接通常需要无限制的最大线程池大小,以避免新提交的任务被拒绝。反过来,当命令的平均到达速度超过其处理速度时,就有可能导致线程无限制增长。
/** Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. */
// 无界队列。使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)会导致新任务在所有 corePoolSize 线程都繁忙时在队列中等待。因此,创建的线程不会超过 corePoolSize。(当每个任务都完全独立于其他任务,因此任务之间不会相互影响执行时,例如在网页服务器中,这种方式可能比较合适。虽然这种队列方式在平滑瞬时突发请求方面很有用,但当命令的平均到达速度持续超过其处理速度时,就会导致工作队列无限制地增长。
Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.
// 有界队列 有界队列(例如 ArrayBlockingQueue)与有限的最大线程池大小一起使用时,有助于防止资源耗尽,但可能更难调整和控制。队列大小和最大池大小可以相互权衡: 使用大队列和小池可以最大限度地减少 CPU 占用率、操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常出现阻塞(例如,如果任务受 I/O 约束),系统可能会为更多线程安排时间,而不考虑其他因素。使用小队列通常需要更大的池规模,这将使 CPU 更繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。
Rejected tasks
// 被拒绝的任务
New tasks submitted in method execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:
// 如果执行器已关闭,或者执行器使用了最大线程数和工作队列容量的有限界限,并已达到饱和,那么在 execute(Runnable) 方法中提交的新任务将被拒绝。在这两种情况下,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法。提供了四种预定义的处理程序策略:
In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
// 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序会在拒绝时抛出运行时 RejectedExecutionException。
In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
// 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用执行本身的线程会运行任务。这提供了一个简单的反馈控制机制,可减慢提交新任务的速度。
In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
// 在 ThreadPoolExecutor.DiscardPolicy 中,无法执行的任务会被直接丢弃。
In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
// 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行器没有关闭,工作队列头部的任务就会被丢弃,然后重新执行(可能会再次失败,导致重复执行)。
It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.
// 我们还可以定义和使用其他类型的 RejectedExecutionHandler 类。这样做需要小心谨慎,尤其是当策略设计为仅在特定容量或队列策略下运行时。
Hook methods
// 钩子方法
This class provides protected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
// 该类提供 protected 的可重载 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 方法,可在每个任务执行前后调用。这些方法可用于操纵执行环境,例如,重新初始化线程位置、收集统计数据或添加日志条目。此外,还可以重写终止方法,以便在执行器完全终止后执行任何需要进行的特殊处理。(一些动态线程池技术就用到了这两个方法)
If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
// 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
Queue maintenance
// 队列维护
Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge are available to assist in storage reclamation when large numbers of queued tasks become cancelled.
// 方法 getQueue() 允许访问工作队列,以便进行监控和调试。强烈建议不要将此方法用于任何其他目的。当大量队列任务被取消时,可使用 remove(Runnable) 和 purge 这两个提供的方法来帮助回收存储空间。(不建议直接将获取到的队列进行修改操作)
Finalization
// 终止
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call shutdown, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting allowCoreThreadTimeOut(boolean).
// 程序中不再引用且没有剩余线程的池将自动关闭。如果要确保即使用户忘记调用 shutdown 也能回收未引用的池,则必须通过设置适当的保持连接时间、使用零核心线程下限和/或设置 allowCoreThreadTimeOut(boolean) 来使得未使用的线程最终被停止。

通过前面的简单翻译我们就能大致略窥得线程池配置全貌,继续看一个构造方法

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

这个构造方法就主要配置了 corePoolSize 核心线程数,maximumPoolSize 最大线程数, keepAliveTime 线程保活时间,unit 时间单位,workQueue 工作队列,而传入后可以看到使用了默认的线程工厂和默认的拒绝处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

最终会执行到这里做一些校验判断,包括几个参数校验和配置赋值

0%