Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

第三篇我们要来讲下 Worker 这个“打工人”,线程池里实际在干活的同学
Worker 实现了 Runnable 接口,所以在前面介绍的 addWorker
线程启动其实就调用了下面的 run 方法,而 run 方法则调用了内部的
runWorker 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
// 启动线程 Worker
public void run() {
runWorker(this);
}

介绍下这个 runWorker 方法
这里就会先处理 firstTask,如果 firstTask 是 null 则会去队列里取
里面会有两个钩子,一个是执行前,一个是执行后,注意这个循环是没有常规
退出逻辑的,说明是一直在往队列里获取,除非获取队列里的任务超时失败了
这样就到最后会去执行 processWorkerExit

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// firstTask 不为空或者从队列里获取到任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 前置执行钩子
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 任务执行
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行完后的钩子
afterExecute(task, thrown);
}
} finally {
// 最后会执行到这,清空任务
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 循环的最后就会去退出Worker
processWorkerExit(w, completedAbruptly);
}
}

而这个 processWorkerExit 也有个比较特殊的逻辑
照理我们会想到这里可能就是线程在处理完队列里的任务
以后,就会判断下是否要退出,如果是核心线程的话就不用
如果是非核心线程就会退出了,但是这里其实有个替换逻辑
就是最后有个 addWorker 调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}

tryTerminate();

int c = ctl.get();
// 判断状态
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 核心线程是否也需要退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 这里代表非核心线程就可以不用替换,也就是回收线程了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 否则是替换线程
addWorker(null, false);
}
}

这里主要讲了线程 Worker 运行的逻辑

介绍了线程池的目的和实现概述,以及如何初始化的,我们就来开始看看线程池最重要的执行过程
老规矩,还是先把注释翻译下,这个对理解逻辑其实非常重要,后面可以循着注释的逻辑来看代码
第一步,如果是少于核心线程数的线程正在运行,那么尝试去开启一个新线程,并把提交的命令
command 最为 first task,这里调用了 addWorker,会自动检查运行状态和线程数,并通过
返回 false 来提示不应该增加线程,比如已经有其他任务先创建了线程导致已经超过了核心线程数
第二步,在前面不符合的情况下,也就是线程数已经大于等于核心线程数了或者在调用 addWorker
的时候校验发现线程数已经大于等于核心线程数或者线程池运行状态不是正在运行了就会进入下一个判断
首先还是判断线程池是否正在运行,然后将 command 放进队列,如果成功进队了,还需要再次进行校验
如果线程池状态不是正在运行了,则需要再出队,然后执行拒绝策略,如果状态正常,但是线程被回收完了
那需要创建线程
第三步,如果不能正常进队列,会尝试再启动一个新线程,这里表示核心线程数满了,并且队列也满了,
则需要再开启一个新线程,如果开启失败则执行拒绝策略
代码注释基本已经把代码讲得很详细了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 这就是第一步
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
// 注意这里是第一步可能失败了,获取一下最新的状态
c = ctl.get();
}
// 这里是第二步
if (isRunning(c) && workQueue.offer(command)) {
// 每次都获取最新的状态
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
// 这里是判断如果一个线程都没了的话,注意添加的是非核心线程
addWorker(null, false);
}
// 这里第三步是添加线程,并且是非核心线程
else if (!addWorker(command, false))
reject(command);
}

这里面比较重要的就是 addWorker 方法,我们也来看一下
这里用到了 break retry; 就是在循环中跳出到这个 retry 的位置继续执行
然后还是获取运行状态,如果是非运行状态,
并且rs == SHUTDOWNfirstTask == null! workQueue.isEmpty(),至少一个为false
因为只有当 SHUTDOWN 状态才会出现后面两者为 false 的情况,如果是 SHUTDOWN ,队列不为空还是需要执行完队列里的任务
然后是判断线程数量与传入参数 core 还有核心线程数跟最大线程数的对比,如果是超过了就返回 false 也就是外层的第一步的内部
if 就会继续执行后面的获取线程池状态,然后是 cas 增加线程数量,如果失败则 break 跳到方法开头,如果成功则继续重新获取 c
判断线程池运行状态,如果不一致则从外层继续进入
后面的就是新建一个 worker,然后获取锁,继续判断线程池状态,如果在运行中或者队列不为空(隐含条件)则继续执行添加 worker
并判断是否需要更新历史最大线程数,更改线程添加状态,然后启动线程,标记线程启动状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

后面会继续讲下线程如何执行队列里的任务

这一篇我们继续聊线程池,一般线程池会介绍我们的参数,我先不一样一些
我们先来翻译一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
 An {@link ExecutorService} that executes each submitted task using
one of possibly several pooled threads, normally configured
using {@link Executors} factory methods.
// executorService 使用一个线程池中的线程来执行每个提交的任务,经常使用 Executors 工厂方法

<p>Thread pools address two different problems: they usually
provide improved performance when executing large numbers of
asynchronous tasks, due to reduced per-task invocation overhead,
and they provide a means of bounding and managing the resources,
including threads, consumed when executing a collection of tasks.
Each {@code ThreadPoolExecutor} also maintains some basic
statistics, such as the number of completed tasks.
// 线程池解决两个不同的问题,通常是为了在提交大量异步任务时,通过减少每个任务的调度开销来提高性能
// 并且提供了一种管理和约束资源,包括线程消耗的方法,每个 ThreadPoolExecutor还会管理一些基础的统计信息
// 比如完成的任务数量


<p>To be useful across a wide range of contexts, this class
provides many adjustable parameters and extensibility
hooks. However, programmers are urged to use the more convenient
{@link Executors} factory methods {@link
Executors#newCachedThreadPool} (unbounded thread pool, with
automatic thread reclamation), {@link Executors#newFixedThreadPool}
(fixed size thread pool) and {@link
Executors#newSingleThreadExecutor} (single background thread), that
preconfigure settings for the most common usage
scenarios. Otherwise, use the following guide when manually
configuring and tuning this class:
// 为了在各种情况下都能派上用场,该类提供了许多可调整的参数和扩展钩子。不过,我们建议程序员使用更方便的 Executors 工厂方法 Executors.
// newCachedThreadPool(无限制线程池,可自动回收线程)、Executors.newFixedThreadPool(固定大小线程池)和 Executors.
// newSingleThreadExecutor(单后台线程),这些方法可为最常见的使用场景预先配置设置。否则,在手动配置和调整该类时,请使用以下指南:

Core and maximum pool sizes
// 核心和最大线程池大小
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize) according to the bounds set by corePoolSize (see getCorePoolSize) and maximumPoolSize (see getMaximumPoolSize). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize.
// ThreadPoolExecutor 会根据 corePoolSize(参见 getCorePoolSize)和 maximumPoolSize(参见 getMaximumPoolSize)设置的界限自动调整池大小(参见 getPoolSize)。在方法 execute(Runnable) 中提交新任务时,如果运行的线程少于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。如果运行的线程多于 corePoolSize 但少于 maximumPoolSize,则只有在队列已满时才会创建新线程。如果将 corePoolSize 和 maximumPoolSize 设置为相同,就可以创建一个固定大小的线程池。如果将 maximumPoolSize 设置为 Integer.MAX_VALUE 等基本无限制的值,就可以让池容纳任意数量的并发任务。通常,核心和最大线程池大小只在构建时设置,但也可以使用 setCorePoolSize 和 setMaximumPoolSize 动态更改。

On-demand construction
// 按需构建
By default, even core threads are initially created and started only when new tasks arrive, but this can be overridden dynamically using method prestartCoreThread or prestartAllCoreThreads. You probably want to prestart threads if you construct the pool with a non-empty queue.
// 默认情况下,即使是核心线程也是只有当新任务到来时才会初始创建和启动,但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 进行动态重载。如果使用非空队列构建线程池,则可能需要预启动线程。
Creating new threads
// 创建新线程
/** New threads are created using a ThreadFactory. If not otherwise specified, a Executors.defaultThreadFactory is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Threads should possess the "modifyThread" RuntimePermission. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed. */
// 新线程是使用线程工厂(ThreadFactory)创建的。如果未另行指定,将使用 Executors.defaultThreadFactory 创建线程,创建的线程将全部位于同一线程组,并具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护进程状态等。如果线程工厂无法按要求创建线程,从 newThread 返回空值,执行器将继续运行,但可能无法执行任何任务。线程应拥有 "modifyThread "运行时权限。如果工作线程或使用池的其他线程不具备此权限,服务可能会降级:配置更改可能无法及时生效,关闭的池可能会一直处于可以终止但无法完成的状态。
Keep-alive times
// 保活时间
If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime (see getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, new threads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero.
// 如果当前池中的线程数超过 corePoolSize,多余的线程在闲置时间超过 keepAliveTime(请参阅 getKeepAliveTime(TimeUnit))时将被终止。这提供了一种在线程池未被频繁使用时减少资源消耗的方法。如果以后池变得更加活跃,就会构建新线程。也可以使用方法 setKeepAliveTime(long, TimeUnit) 动态更改该参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 值可有效禁止闲置线程在关闭前终止。默认情况下,只有当线程数超过 corePoolSize 时,才会采用保持连接策略。但只要 keepAliveTime 值不为零,就可以使用方法 allowCoreThreadTimeOut(boolean) 将超时策略也应用到核心线程。
Queuing
// 队列
Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing:
// 任何阻塞队列(BlockingQueue)都可用于传输和保留已提交的任务。该队列的使用与线程池大小有关:
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
// 如果运行的线程少于 corePoolSize,执行器总是倾向于添加新线程而不是队列。
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
// 如果运行的线程数大于或等于 corePoolSize,执行器总是倾向于将请求排队,而不是添加新线程。
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
// 如果请求无法进队列,执行器会创建一个新线程,除非这样做会超过最大线程池大小(maximumPoolSize),在这种情况下,任务会被拒绝。
There are three general strategies for queuing:
// 入队一般有三种策略:
Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a new thread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of new submitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed.
// 直接交接。工作队列的一个很好的默认选择是同步队列(SynchronousQueue),它可以将任务移交给线程,而不会以其他方式保留任务。在这里,如果没有线程可立即运行任务,则队列任务的尝试将失败,因此会构建一个新的线程。在处理可能存在内部依赖关系的请求集时,这种策略可以避免死锁。直接交接通常需要无限制的最大线程池大小,以避免新提交的任务被拒绝。反过来,当命令的平均到达速度超过其处理速度时,就有可能导致线程无限制增长。
/** Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. */
// 无界队列。使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)会导致新任务在所有 corePoolSize 线程都繁忙时在队列中等待。因此,创建的线程不会超过 corePoolSize。(当每个任务都完全独立于其他任务,因此任务之间不会相互影响执行时,例如在网页服务器中,这种方式可能比较合适。虽然这种队列方式在平滑瞬时突发请求方面很有用,但当命令的平均到达速度持续超过其处理速度时,就会导致工作队列无限制地增长。
Bounded queues. A bounded queue (for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block (for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput.
// 有界队列 有界队列(例如 ArrayBlockingQueue)与有限的最大线程池大小一起使用时,有助于防止资源耗尽,但可能更难调整和控制。队列大小和最大池大小可以相互权衡: 使用大队列和小池可以最大限度地减少 CPU 占用率、操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常出现阻塞(例如,如果任务受 I/O 约束),系统可能会为更多线程安排时间,而不考虑其他因素。使用小队列通常需要更大的池规模,这将使 CPU 更繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。
Rejected tasks
// 被拒绝的任务
New tasks submitted in method execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided:
// 如果执行器已关闭,或者执行器使用了最大线程数和工作队列容量的有限界限,并已达到饱和,那么在 execute(Runnable) 方法中提交的新任务将被拒绝。在这两种情况下,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法。提供了四种预定义的处理程序策略:
In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection.
// 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序会在拒绝时抛出运行时 RejectedExecutionException。
In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted.
// 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用执行本身的线程会运行任务。这提供了一个简单的反馈控制机制,可减慢提交新任务的速度。
In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped.
// 在 ThreadPoolExecutor.DiscardPolicy 中,无法执行的任务会被直接丢弃。
In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.)
// 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行器没有关闭,工作队列头部的任务就会被丢弃,然后重新执行(可能会再次失败,导致重复执行)。
It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies.
// 我们还可以定义和使用其他类型的 RejectedExecutionHandler 类。这样做需要小心谨慎,尤其是当策略设计为仅在特定容量或队列策略下运行时。
Hook methods
// 钩子方法
This class provides protected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated can be overridden to perform any special processing that needs to be done once the Executor has fully terminated.
// 该类提供 protected 的可重载 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 方法,可在每个任务执行前后调用。这些方法可用于操纵执行环境,例如,重新初始化线程位置、收集统计数据或添加日志条目。此外,还可以重写终止方法,以便在执行器完全终止后执行任何需要进行的特殊处理。(一些动态线程池技术就用到了这两个方法)
If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate.
// 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。
Queue maintenance
// 队列维护
Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge are available to assist in storage reclamation when large numbers of queued tasks become cancelled.
// 方法 getQueue() 允许访问工作队列,以便进行监控和调试。强烈建议不要将此方法用于任何其他目的。当大量队列任务被取消时,可使用 remove(Runnable) 和 purge 这两个提供的方法来帮助回收存储空间。(不建议直接将获取到的队列进行修改操作)
Finalization
// 终止
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call shutdown, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting allowCoreThreadTimeOut(boolean).
// 程序中不再引用且没有剩余线程的池将自动关闭。如果要确保即使用户忘记调用 shutdown 也能回收未引用的池,则必须通过设置适当的保持连接时间、使用零核心线程下限和/或设置 allowCoreThreadTimeOut(boolean) 来使得未使用的线程最终被停止。

通过前面的简单翻译我们就能大致略窥得线程池配置全貌,继续看一个构造方法

1
2
3
4
5
6
7
8
9
10
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();

这个构造方法就主要配置了 corePoolSize 核心线程数,maximumPoolSize 最大线程数, keepAliveTime 线程保活时间,unit 时间单位,workQueue 工作队列,而传入后可以看到使用了默认的线程工厂和默认的拒绝处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

最终会执行到这里做一些校验判断,包括几个参数校验和配置赋值

Java 线程池是 Java 并发体系里的重要组成部分,不过说起线程池一般都是几个参数,以及参数的含义和逻辑,为了更适合刚开始学习的同学或者说有一些会对这么个习惯性的概括讲法有点疑惑的,我们就从零开始,从最基础的开始看起,先来看下里面用来判断线程池状态和线程数量的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

这里我们先看下 COUNT_BITS 它是 Integer.SIZE - 3Integer.SIZE 这个是 32,也就是代表了 Java 用了四个 Byte 来存一个 Integer, 那么 COUNT_BITS 就是 29,
这个 29 是用了干嘛的呢,逐步往下看,为什么先介绍 COUNT_BITS,因为第一行的 ctl 的值是通过 ctlOf(RUNNING, 0) 算出来的,而 RUNNING 又是通过等于 -1 左移 COUNT_BITS 位来得到,移动 29 位后,还剩下三位,而这三位只需要表示后面五种状态,其中只有 RUNNING 是负值,这样就巧妙的只利用了 ctl 的前三位来存储线程池的状态而后面的 29 位就是用来存储线程数量的,具体怎么取呢,我们在来看下 CAPACITY 它是 (1 << COUNT_BITS) - 1 把 1 左移 29 位后减 1,也就是 536870911,这里再补充下,左移一位就等于是乘以 2,左移 29 位就是乘以 2 的 29 次,减了 1 就是 29 位能存的最大值,代表线程池最大的线程容量,用二进制来表示就是 32 位长度的int,前三位是 0,后 29 位是 1,代表了线程池的容量,那么 runStateOfworkerCountOf 的理解就很简单了,
runStateOf 是 c 跟 CAPACITY 取反之后的值做”与”操作,CAPACITY 取反就变成了前三位是 1,后面的二十九位是 0,而对于”与”操作,后面的二十九位无论是什么都会被忽略,也就是取了前三位的值
workerCountOf 正好相反,直接跟 CAPACITY 做 “与” 操作,也就是取后二十九位的值
ctlOf 就是直接对 rswc 做了 “或” 操作
而前面也提到了只有 RUNNING 是负数,那么就可以直接做大小比较来判断线程池状态
这个可能对很多同学来说是非常简单的,但是对于一部分同学来说就是个拦路虎,希望能对这部分同学有所帮助。

题目介绍

Given the roots of two binary trees root and subRoot, return true if there is a subtree of root with the same structure and node values of subRoot and false otherwise.

A subtree of a binary tree tree is a tree that consists of a node in tree and all of this node’s descendants. The tree tree could also be considered as a subtree of itself.

示例 1

Input: root = [3,4,5,1,2], subRoot = [4,1,2]
Output: true

示例 2

Input: root = [3,4,5,1,2,null,null,null,null,0], subRoot = [4,1,2]
Output: false

解析

需要判断subRoot 为根节点的这棵树是不是 root 为根节点的这个树的子树,题应该不是特别难理解,只是需要注意,这里是子树,而不是一个子结构,也就是示例 2 的判断结果是 false,因为左边子树还有个 0,只能认为是左边的一部分,但不是子树,思路分析也不难,就是递归去判断,只不过是两种维度,第一种就是找到子树的根节点,也就是在 root 树找到 subRoot 的根节点相同的节点,这个就会用到递归,如果找到了一个,就开始递归的对比所有子节点,必须完全一样

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public boolean isSubtree(TreeNode root, TreeNode subRoot) {
if (subRoot == null || root == null) {
return false;
}
if (root.val == subRoot.val) {
// 如果 root 节点跟 subRoot 节点相同,就开始递归对比所有的子节点
if (isSameTree(root.left, subRoot.left) && isSameTree(root.right, subRoot.right)) {
return true;
}
}
// 如果 root 节点不同,就开始递归往下找与 subRoot 相同的起始节点,这里是或的关系,因为在左右子树任一找到一个就行
return isSubtree(root.left, subRoot) || isSubtree(root.right, subRoot);
}
// 需要完全相同,也就是说不能像示例 2 那样多一个子节点这种,以及其他不同的情况,反正就是要完全一样
public boolean isSameTree(TreeNode root, TreeNode subTree) {
// 如果递归到这个节点都为空也认为是相同的
if (root == null && subTree == null) {
return true;
}
// 因为有前面的判断,这里就隐含了 subTree != null 这个条件
if (root == null) {
return false;
}
// 也因为有前面的判断,这里就隐含了 root != null 这个条件
if (subTree == null) {
return false;
}
// 两个都不为空,但是值不相同,也是 false
if (root.val != subTree.val) {
return false;
}
// 跟节点相同,就开始递归判断左右子树,必须两者都相同
return isSameTree(root.left, subTree.left) && isSameTree(root.right, subTree.right);
}
0%