privatefinalclassWorker extendsAbstractQueuedSynchronizer implementsRunnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ privatestaticfinallongserialVersionUID=6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatilelong completedTasks;
/** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
/** Delegates main run loop to outer runWorker */ // 启动线程 Worker publicvoidrun() { runWorker(this); }
privatevoidprocessWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();
publicvoidexecute(Runnable command) { if (command == null) thrownewNullPointerException(); /* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */ intc= ctl.get(); // 这就是第一步 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; // 注意这里是第一步可能失败了,获取一下最新的状态 c = ctl.get(); } // 这里是第二步 if (isRunning(c) && workQueue.offer(command)) { // 每次都获取最新的状态 intrecheck= ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); elseif (workerCountOf(recheck) == 0) // 这里是判断如果一个线程都没了的话,注意添加的是非核心线程 addWorker(null, false); } // 这里第三步是添加线程,并且是非核心线程 elseif (!addWorker(command, false)) reject(command); }
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) returnfalse;
for (;;) { intwc= workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) returnfalse; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } }
booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try { w = newWorker(firstTask); finalThreadt= w.thread; if (t != null) { finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. intrs= runStateOf(ctl.get());
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable thrownewIllegalThreadStateException(); workers.add(w); ints= workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
An {@link ExecutorService} that executes each submitted task using one of possibly several pooled threads, normally configured using {@link Executors} factory methods. // executorService 使用一个线程池中的线程来执行每个提交的任务,经常使用 Executors 工厂方法
<p>Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks. Each {@code ThreadPoolExecutor} also maintains some basic statistics, such as the number of completed tasks. // 线程池解决两个不同的问题,通常是为了在提交大量异步任务时,通过减少每个任务的调度开销来提高性能 // 并且提供了一种管理和约束资源,包括线程消耗的方法,每个 ThreadPoolExecutor还会管理一些基础的统计信息 // 比如完成的任务数量
<p>To be useful across a wide range of contexts, thisclass provides many adjustable parameters and extensibility hooks. However, programmers are urged to use the more convenient {@link Executors} factory methods {@link Executors#newCachedThreadPool} (unbounded thread pool, with automatic thread reclamation), {@link Executors#newFixedThreadPool} (fixed size thread pool) and {@link Executors#newSingleThreadExecutor} (single background thread), that preconfigure settings for the most common usage scenarios. Otherwise, use the following guide when manually configuring and tuning this class: // 为了在各种情况下都能派上用场,该类提供了许多可调整的参数和扩展钩子。不过,我们建议程序员使用更方便的 Executors 工厂方法 Executors. // newCachedThreadPool(无限制线程池,可自动回收线程)、Executors.newFixedThreadPool(固定大小线程池)和 Executors. // newSingleThreadExecutor(单后台线程),这些方法可为最常见的使用场景预先配置设置。否则,在手动配置和调整该类时,请使用以下指南:
Core and maximum pool sizes // 核心和最大线程池大小 A ThreadPoolExecutor will automatically adjust the pool size(see getPoolSize) according to the bounds set by corePoolSize(see getCorePoolSize) and maximumPoolSize(see getMaximumPoolSize). When a newtask is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a newthread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a newthread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize and setMaximumPoolSize. // ThreadPoolExecutor 会根据 corePoolSize(参见 getCorePoolSize)和 maximumPoolSize(参见 getMaximumPoolSize)设置的界限自动调整池大小(参见 getPoolSize)。在方法 execute(Runnable) 中提交新任务时,如果运行的线程少于 corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来处理请求。如果运行的线程多于 corePoolSize 但少于 maximumPoolSize,则只有在队列已满时才会创建新线程。如果将 corePoolSize 和 maximumPoolSize 设置为相同,就可以创建一个固定大小的线程池。如果将 maximumPoolSize 设置为 Integer.MAX_VALUE 等基本无限制的值,就可以让池容纳任意数量的并发任务。通常,核心和最大线程池大小只在构建时设置,但也可以使用 setCorePoolSize 和 setMaximumPoolSize 动态更改。
On-demand construction // 按需构建 By default, even core threads are initially created and started only when newtasks arrive, but this can be overridden dynamically using method prestartCoreThread or prestartAllCoreThreads. You probably want to prestart threads if you construct the pool with a non-empty queue. // 默认情况下,即使是核心线程也是只有当新任务到来时才会初始创建和启动,但这可以使用方法 prestartCoreThread 或 prestartAllCoreThreads 进行动态重载。如果使用非空队列构建线程池,则可能需要预启动线程。 Creating newthreads // 创建新线程 /** New threads are created using a ThreadFactory. If not otherwise specified, a Executors.defaultThreadFactory is used, that creates threads to all be in the same ThreadGroup and with the same NORM_PRIORITY priority and non-daemon status. By supplying a different ThreadFactory, you can alter the thread's name, thread group, priority, daemon status, etc. If a ThreadFactory fails to create a thread when asked by returning null from newThread, the executor will continue, but might not be able to execute any tasks. Threads should possess the "modifyThread" RuntimePermission. If worker threads or other threads using the pool do not possess this permission, service may be degraded: configuration changes may not take effect in a timely manner, and a shutdown pool may remain in a state in which termination is possible but not completed. */ // 新线程是使用线程工厂(ThreadFactory)创建的。如果未另行指定,将使用 Executors.defaultThreadFactory 创建线程,创建的线程将全部位于同一线程组,并具有相同的 NORM_PRIORITY 优先级和非守护进程状态。通过提供不同的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护进程状态等。如果线程工厂无法按要求创建线程,从 newThread 返回空值,执行器将继续运行,但可能无法执行任何任务。线程应拥有 "modifyThread "运行时权限。如果工作线程或使用池的其他线程不具备此权限,服务可能会降级:配置更改可能无法及时生效,关闭的池可能会一直处于可以终止但无法完成的状态。 Keep-alive times // 保活时间 If the pool currently has more than corePoolSize threads, excess threads will be terminated if they have been idle for more than the keepAliveTime(see getKeepAliveTime(TimeUnit)). This provides a means of reducing resource consumption when the pool is not being actively used. If the pool becomes more active later, newthreads will be constructed. This parameter can also be changed dynamically using method setKeepAliveTime(long, TimeUnit). Using a value of Long.MAX_VALUE TimeUnit.NANOSECONDS effectively disables idle threads from ever terminating prior to shut down. By default, the keep-alive policy applies only when there are more than corePoolSize threads. But method allowCoreThreadTimeOut(boolean) can be used to apply this time-out policy to core threads as well, so long as the keepAliveTime value is non-zero. // 如果当前池中的线程数超过 corePoolSize,多余的线程在闲置时间超过 keepAliveTime(请参阅 getKeepAliveTime(TimeUnit))时将被终止。这提供了一种在线程池未被频繁使用时减少资源消耗的方法。如果以后池变得更加活跃,就会构建新线程。也可以使用方法 setKeepAliveTime(long, TimeUnit) 动态更改该参数。使用 Long.MAX_VALUE TimeUnit.NANOSECONDS 值可有效禁止闲置线程在关闭前终止。默认情况下,只有当线程数超过 corePoolSize 时,才会采用保持连接策略。但只要 keepAliveTime 值不为零,就可以使用方法 allowCoreThreadTimeOut(boolean) 将超时策略也应用到核心线程。 Queuing // 队列 Any BlockingQueue may be used to transfer and hold submitted tasks. The use of this queue interacts with pool sizing: // 任何阻塞队列(BlockingQueue)都可用于传输和保留已提交的任务。该队列的使用与线程池大小有关: If fewer than corePoolSize threads are running, the Executor always prefers adding a newthread rather than queuing. // 如果运行的线程少于 corePoolSize,执行器总是倾向于添加新线程而不是队列。 If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a newthread. // 如果运行的线程数大于或等于 corePoolSize,执行器总是倾向于将请求排队,而不是添加新线程。 If a request cannot be queued, a newthread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected. // 如果请求无法进队列,执行器会创建一个新线程,除非这样做会超过最大线程池大小(maximumPoolSize),在这种情况下,任务会被拒绝。 There are three general strategies for queuing: // 入队一般有三种策略: Direct handoffs. A good default choice for a work queue is a SynchronousQueue that hands off tasks to threads without otherwise holding them. Here, an attempt to queue a task will fail if no threads are immediately available to run it, so a newthread will be constructed. This policy avoids lockups when handling sets of requests that might have internal dependencies. Direct handoffs generally require unbounded maximumPoolSizes to avoid rejection of newsubmitted tasks. This in turn admits the possibility of unbounded thread growth when commands continue to arrive on average faster than they can be processed. // 直接交接。工作队列的一个很好的默认选择是同步队列(SynchronousQueue),它可以将任务移交给线程,而不会以其他方式保留任务。在这里,如果没有线程可立即运行任务,则队列任务的尝试将失败,因此会构建一个新的线程。在处理可能存在内部依赖关系的请求集时,这种策略可以避免死锁。直接交接通常需要无限制的最大线程池大小,以避免新提交的任务被拒绝。反过来,当命令的平均到达速度超过其处理速度时,就有可能导致线程无限制增长。 /** Unbounded queues. Using an unbounded queue (for example a LinkedBlockingQueue without a predefined capacity) will cause new tasks to wait in the queue when all corePoolSize threads are busy. Thus, no more than corePoolSize threads will ever be created. (And the value of the maximumPoolSize therefore doesn't have any effect.) This may be appropriate when each task is completely independent of others, so tasks cannot affect each others execution; for example, in a web page server. While this style of queuing can be useful in smoothing out transient bursts of requests, it admits the possibility of unbounded work queue growth when commands continue to arrive on average faster than they can be processed. */ // 无界队列。使用无界队列(例如没有预定义容量的 LinkedBlockingQueue)会导致新任务在所有 corePoolSize 线程都繁忙时在队列中等待。因此,创建的线程不会超过 corePoolSize。(当每个任务都完全独立于其他任务,因此任务之间不会相互影响执行时,例如在网页服务器中,这种方式可能比较合适。虽然这种队列方式在平滑瞬时突发请求方面很有用,但当命令的平均到达速度持续超过其处理速度时,就会导致工作队列无限制地增长。 Bounded queues. A bounded queue(for example, an ArrayBlockingQueue) helps prevent resource exhaustion when used with finite maximumPoolSizes, but can be more difficult to tune and control. Queue sizes and maximum pool sizes may be traded off for each other: Using large queues and small pools minimizes CPU usage, OS resources, and context-switching overhead, but can lead to artificially low throughput. If tasks frequently block(for example if they are I/O bound), a system may be able to schedule time for more threads than you otherwise allow. Use of small queues generally requires larger pool sizes, which keeps CPUs busier but may encounter unacceptable scheduling overhead, which also decreases throughput. // 有界队列 有界队列(例如 ArrayBlockingQueue)与有限的最大线程池大小一起使用时,有助于防止资源耗尽,但可能更难调整和控制。队列大小和最大池大小可以相互权衡: 使用大队列和小池可以最大限度地减少 CPU 占用率、操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常出现阻塞(例如,如果任务受 I/O 约束),系统可能会为更多线程安排时间,而不考虑其他因素。使用小队列通常需要更大的池规模,这将使 CPU 更繁忙,但可能会遇到无法接受的调度开销,这也会降低吞吐量。 Rejected tasks // 被拒绝的任务 New tasks submitted in method execute(Runnable) will be rejected when the Executor has been shut down, and also when the Executor uses finite bounds for both maximum threads and work queue capacity, and is saturated. In either case, the execute method invokes the RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) method of its RejectedExecutionHandler. Four predefined handler policies are provided: // 如果执行器已关闭,或者执行器使用了最大线程数和工作队列容量的有限界限,并已达到饱和,那么在 execute(Runnable) 方法中提交的新任务将被拒绝。在这两种情况下,execute 方法都会调用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(Runnable, ThreadPoolExecutor) 方法。提供了四种预定义的处理程序策略: In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection. // 在默认的 ThreadPoolExecutor.AbortPolicy 中,处理程序会在拒绝时抛出运行时 RejectedExecutionException。 In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that newtasks are submitted. // 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用执行本身的线程会运行任务。这提供了一个简单的反馈控制机制,可减慢提交新任务的速度。 In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped. // 在 ThreadPoolExecutor.DiscardPolicy 中,无法执行的任务会被直接丢弃。 In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried(which can fail again, causing this to be repeated.) // 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行器没有关闭,工作队列头部的任务就会被丢弃,然后重新执行(可能会再次失败,导致重复执行)。 It is possible to define and use other kinds of RejectedExecutionHandler classes. Doing so requires some care especially when policies are designed to work only under particular capacity or queuing policies. // 我们还可以定义和使用其他类型的 RejectedExecutionHandler 类。这样做需要小心谨慎,尤其是当策略设计为仅在特定容量或队列策略下运行时。 Hook methods // 钩子方法 This classprovidesprotected overridable beforeExecute(Thread, Runnable) and afterExecute(Runnable, Throwable) methods that are called before and after execution of each task. These can be used to manipulate the execution environment; for example, reinitializing ThreadLocals, gathering statistics, or adding log entries. Additionally, method terminated can be overridden to perform any special processing that needs to be done once the Executor has fully terminated. // 该类提供 protected 的可重载 beforeExecute(Thread, Runnable) 和 afterExecute(Runnable, Throwable) 方法,可在每个任务执行前后调用。这些方法可用于操纵执行环境,例如,重新初始化线程位置、收集统计数据或添加日志条目。此外,还可以重写终止方法,以便在执行器完全终止后执行任何需要进行的特殊处理。(一些动态线程池技术就用到了这两个方法) If hook or callback methods throw exceptions, internal worker threads may in turn fail and abruptly terminate. // 如果钩子或回调方法抛出异常,内部工作线程可能会失败并突然终止。 Queue maintenance // 队列维护 Method getQueue() allows access to the work queue for purposes of monitoring and debugging. Use of this method for any other purpose is strongly discouraged. Two supplied methods, remove(Runnable) and purge are available to assist in storage reclamation when large numbers of queued tasks become cancelled. // 方法 getQueue() 允许访问工作队列,以便进行监控和调试。强烈建议不要将此方法用于任何其他目的。当大量队列任务被取消时,可使用 remove(Runnable) 和 purge 这两个提供的方法来帮助回收存储空间。(不建议直接将获取到的队列进行修改操作) Finalization // 终止 A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically. If you would like to ensure that unreferenced pools are reclaimed even if users forget to call shutdown, then you must arrange that unused threads eventually die, by setting appropriate keep-alive times, using a lower bound of zero core threads and/or setting allowCoreThreadTimeOut(boolean). // 程序中不再引用且没有剩余线程的池将自动关闭。如果要确保即使用户忘记调用 shutdown 也能回收未引用的池,则必须通过设置适当的保持连接时间、使用零核心线程下限和/或设置 allowCoreThreadTimeOut(boolean) 来使得未使用的线程最终被停止。
通过前面的简单翻译我们就能大致略窥得线程池配置全貌,继续看一个构造方法
1 2 3 4 5 6 7 8 9 10
publicThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } privatestaticfinalRejectedExecutionHandlerdefaultHandler= newAbortPolicy();