JUC 线程池的使用与源码解析
线程池创建与使用
线程池的创建
Executors 框架提供了各种类型的线程池,主要有以下工厂方法∶
- public static ExecutorService newFixedThreadPool(int nThreads)
- public static ExecutorService newCachedThreadPool()
- public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
- public static ExecutorService newSingleThreadExecutor()
- public static ScheduledExecutorService newSingleThreadScheduledExecutor()
corePoolSize > 1 的方法名称以 Pool 结尾,等于 1 的以 Executor 结尾。
Executors 的真正实现类主要包括两个 ThreadPoolExecutor 和 ScheduledThreadPoolExecutor。
newFixedThreadPool
线程数达到核心线程数之后不会再新建线程(一方面是队列是无边界的,另一方面是 corePoolSize = maximumPoolSize)。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, // corePoolSize = maximumPoolSize
0L, TimeUnit.MILLISECONDS,
// 无界队列,所以即使设置了最大线程数,线程池中的线程数量也不会达到最大线程数
new LinkedBlockingQueue());
}
newSingleThreadExecutor
类似于 newFixedThreadPool,只是将 corePoolSize 和 maximumPoolSize 都设置为 1。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // corePoolSize = maximumPoolSize = 1
0L, TimeUnit.MILLISECONDS,
// 无界队列
new LinkedBlockingQueue()));
}
newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue());
}
newScheduledThreadPool
可以指定核心线程数,用于创建一个用于执行周期性或者定时任务的线程池。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
// 延迟队列
new DelayedWorkQueue());
}
ScheduledExecutorService 的核心方法如下:
// 延迟 delay 时间后,执行 Runnable 任务
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
// 延迟 delay 时间后,执行 Callable 任务
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit);
// 定时任务
// 以上一个任务开始时间开始计时,period 时间后,如果上一个任务已完成,则立即执行,否则等待上一个任务完成后再执行
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
// 延迟任务
// 以上一个任务的结束时间开始计时,delay 时间后,立即执行
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
delay 或者 period 时间后,表示任务可以立即执行,但是也要先获取到线程才会立马执行,否则会先阻塞等待获取线程,和一般的线程池逻辑类似。
线程池的使用
等待所有任务线程执行完成
引用:
ExecutorService等待线程完成后优雅结束
How to wait for all threads to finish, using ExecutorService?
方法一:shutdown() / shutdownNow() + awaitTermination()
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
while(...) {
taskExecutor.execute(new MyTask());
}
// 线程池暂停接收新的任务
taskExecutor.shutdown();
try {
// 等待所有任务完成
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// ...
}
方法二:invokeAll + shutdown() / shutdownNow() + awaitTermination()
我们可以用来运行线程的第一种方法是 invokeAll() 方法,在所有任务完成或超时到期后,该方法返回 Future 对象列表。
此外,我们必须注意返回的 Future 对象的顺序与提供的 Callable 对象的列表相同:
ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
// your tasks
List> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
// invokeAll() returns when all tasks are complete
List> futures = taskExecutor.invokeAll(callables);
taskExecutor.shutdown();
try {
// 等待所有任务完成
taskExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
// ...
}
方法三:使用 CountDownLatch
ExecutorService taskExecutor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(2);
for(int i = 0; i < 2; i++){
taskExecutor.submit(() -> {
try {
// 业务逻辑...
latch.countDown();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// wait for the latch to be decremented by the two remaining threads
latch.await();
方法四:使用 ExecutorCompletionService
运行多个线程的另一种方法是使用 ExecutorCompletionService,它使用提供的 ExecutorService 来执行任务。
与 invokeAll 的一个区别是返回表示执行任务的 Futures 的顺序。ExecutorCompletionService 使用队列按结束顺序存储结果,而 invokeAll 返回一个列表,该列表具有与给定任务列表的迭代器生成的顺序相同的顺序:
CompletionService service = new ExecutorCompletionService<>(WORKER_THREAD_POOL);
List> callables = Arrays.asList(new DelayedCallable("fast thread", 100), new DelayedCallable("slow thread", 3000));
for (Callable callable : callables) {
service.submit(callable);
}
方法五:使用 Java8 的 CompletableFuture
ExecutorService taskExecutor = Executors.newFixedThreadPool(4);
List tasks = getTasks();
CompletableFuture<?>[] futures = tasks.stream()
.map(task -> CompletableFuture.runAsync(task, taskExecutor))
.toArray(CompletableFuture[]::new);
// 等待所有任务执行完成
CompletableFuture.allOf(futures).join();
taskExecutor.shutdown();
线程池的配置及优化
配置主要是 ThreadPoolExecutor 构造方法的参数配置。
coreThreadPoolSize
每个线程都需要一定的栈内存空间。在最近的64位JVM中, 默认的栈大小 是1024KB。如果服务器收到大量请求,或者handleRequest方法执行很慢,服务器可能因为创建了大量线程而崩溃。例如有1000个并行的请求,创建出来的1000个线程需要使用1GB的JVM内存作为线程栈空间。另外,每个线程代码执行过程中创建的对象,还可能会在堆上创建对象。这样的情况恶化下去,将会超出JVM堆内存,并产生大量的垃圾回收操作,最终引发 内存溢出(OutOfMemoryErrors) 。
这些线程不仅仅会消耗内存,它们还会使用其他有限的资源,例如文件句柄、数据库连接等。不可控的创建线程,还可能引发其他类型的错误和崩溃。因此,避免资源耗尽的一个重要方式,就是避免不可控的数据结构。
顺便说下,由于线程栈大小引发的内存问题,可以通过-Xss开关来调整栈大小。缩小线程栈大小之后,可以减少每个线程的开销,但是可能会引发 栈溢出(StackOverflowErrors) 。对于一般应用程序而言,默认的1024KB过于富裕,调小为256KB或者512KB可能更为合适。Java允许的最小值是160KB。
CPU密集型任务应配置尽可能小的线程,如配置 cpu 核数 + 1
个线程的线程池,减少线程的切换。
IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 cpu 核数 * 2
。
对于 IO 型的任务的最佳线程数,有个公式可以计算
$$Nthreads = NCPU * UCPU * (1 + W/C)$$
其中:
? * NCPU 是处理器的核的数目
? * UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
? * W / C 是等待时间与计算时间的比率
由于线程数的选定依赖于应用程序的类型,可能需要经过大量性能测试之后,才能得出最优的结果。
通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用:
- getTaskCount:线程池已经执行的和未执行的任务总数(所有线程的 completedTaskCount 数量加上阻塞队列中的元素个数);
- getCompletedTaskCount:线程池已完成的任务数量(所有线程的 completedTaskCount 数量),该值小于等于 taskCount;
- getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了 maximumPoolSize;
- getPoolSize:线程池当前的线程数量(workers 里元素的个数);
- getActiveCount:当前线程池中正在执行任务(独占锁被占用)的线程数量。
通过这些方法,可以对线程池进行监控,在 ThreadPoolExecutor 类中提供了几个空方法,如 beforeExecute 方法,afterExecute 方法和 terminated 方法,可以扩展这些方法在执行前或执行后增加一些新的操作,例如统计线程池的执行任务的时间等,可以继承自 ThreadPoolExecutor 来进行扩展。
workQueue
maximumPoolSize
threadFactory
默认使用 DefaultThreadFactory,可以自定义。
RejectedExecutionHandler
除了默认的 4 种拒绝策略外,还可以自定义拒绝策略。
AbstractExecutorService 源码
AbstractExecutorService 抽象类实现类 ExecutorService 接口。
FutureTask 源码参考:
public interface Executor {
/**
* Executes the given command at some time in the future. The command
* may execute in a new thread, in a pooled thread, or in the calling
* thread, at the discretion of the {@code Executor} implementation.
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
public abstract class AbstractExecutorService implements ExecutorService {
// 使用线程对象 runnable 和 保存 runnable 执行结果的变量 value 来构造一个 FutureTask 对象
// newTaskFor 方法使用了适配器模式,可以将 Runnable + value 或者 callable 对象构造成一个 FutureTask 对象
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);
}
// 使用 callable 对象构造一个 FutureTask 对象
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
// 提交任务到线程池,返回一个 FutureTask 对象
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, null);
// 将任务添加到线程池中执行
execute(ftask);
return ftask;
}
// 同上
public Future submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 同上
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// 批量执行多个任务,但是只要一个任务完成就返回,同时中断其它任务
private T doInvokeAny(Collection<? extends Callable> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
// 当前待执行的任务数量
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList> futures = new ArrayList>(ntasks);
// 创建一个 ExecutorCompletionService 对象,当前线程池对象作为 ExecutorCompletionService 的 executor
// ExecutorCompletionService 可以维护一批 Future 任务,然后按照任务完成的先后顺序,添加到一个先进先出阻塞队列中
// 然后通过 take 或者 poll 方法获取到的一个已经执行完成的 Future 任务,可以直接调用 future 任务的 get 方法获取执行结果
ExecutorCompletionService ecs =
new ExecutorCompletionService(this);
// For efficiency, especially in executors with limited
// parallelism, check to see if previously submitted tasks are
// done before submitting more of them. This interleaving
// plus the exception mechanics account for messiness of main
// loop.
try {
// Record exceptions so that if we fail to obtain any
// result, we can throw the last exception we got.
ExecutionException ee = null;
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
// 当前正在执行的任务数量
int active = 1;
for (;;) {
Future f = ecs.poll();
// 从 ecs 的队列里没有获取到任务
if (f == null) {
// 未执行的任务数量大于 0
if (ntasks > 0) {
--ntasks;
// 继续获取下一个未执行的任务,将其添加到线程池中去执行
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
// 任务都已经执行完成了
break;
else if (timed) {
// 从 ecs 的阻塞队列中获取一个已完成的 Future 对象,可超时
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
else
// 阻塞低从 ecs 的阻塞队列中获取一个已完成的 Future 对象
f = ecs.take();
}
// 从阻塞队列中获取到了一个完成的 Future 任务(直接返回执行结果,并退出方法)
if (f != null) {
--active;
try {
// 返回获取到的一个任务的执行结果
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
// 从阻塞队列中获取到了一个完成的 Future 任务并返回任务的执行结果,然后将所有的任务都中断掉
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public T invokeAny(Collection<? extends Callable> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public T invokeAny(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
// 批量执行多个任务
public List> invokeAll(Collection<? extends Callable> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
// 将多个任务依次添加到线程池中去执行
for (Callable t : tasks) {
RunnableFuture f = newTaskFor(t);
// 将任务结果添加到 futures 中
futures.add(f);
execute(f);
}
// 遍历 futures 集合
for (int i = 0, size = futures.size(); i < size; i++) {
Future f = futures.get(i);
// 如果任务未完成
if (!f.isDone()) {
try {
// 阻塞获取执行结果
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
// 执行到这里,说明所有任务都已完成了
done = true;
// 返回执行
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
public List> invokeAll(Collection<? extends Callable> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
ArrayList> futures = new ArrayList>(tasks.size());
boolean done = false;
try {
for (Callable t : tasks)
futures.add(newTaskFor(t));
final long deadline = System.nanoTime() + nanos;
final int size = futures.size();
// Interleave time checks and calls to execute in case
// executor doesn't have any/much parallelism.
for (int i = 0; i < size; i++) {
execute((Runnable)futures.get(i));
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
return futures;
}
for (int i = 0; i < size; i++) {
Future f = futures.get(i);
if (!f.isDone()) {
if (nanos <= 0L)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
nanos = deadline - System.nanoTime();
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
}
ThreadPoolExecutor 源码
对于核心的几个线程池,无论是 newFixedThreadPool() 方法、newCachedThreadPol()、还是 newSingleThreadExecutor() 方法,虽然看起来创建的线程有着完全不同的功能特点,但其内部实现均使用了 ThreadPoolExecutor 实现。
ThreadPoolExecutor 属性
public class ThreadPoolExecutor extends AbstractExecutorService {
// ctl 字段存储线程池的当前状态和线程数
// 高 3 位存放线程池的运行状态 (runState)
// 低 29 位存放线程池内有效线程(活跃)的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 在 Java 中,一个 int 占据 32 位,所以 COUNT_BITS 的结果是 32 - 3 = 29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY 就代表了 workerCount 的上限,它是 ThreadPoolExecutor 中理论上的最大活跃线程数
// 运算过程为 1 左移 29 位,也就是 00000000 00000000 00000000 00000001 --> 001 0000 00000000 00000000 00000000,再减去1的话,就是 000 11111 11111111 11111111 11111111,前三位代表线程池运行状态 runState,所以这里 workerCount 的理论最大值就应该是 29 个 1,即 536870911
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// RUNNING:接受新任务,并处理队列任务
// -1 在 Java 底层是由 32 个 1 表示的,左移 29 位的话,即 111 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 1 的话,表示 RUNNING 状态,即 -536870912
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN:不接受新任务,但会处理队列任务
// 在线程池处于 RUNNING 状态时,调用 shutdown() 方法会使线程池进入到该状态(finalize() 方法在执行过程中也会调用 shutdown() 方法进入该状态)
// 0 在 Java 底层是由 32 个 0 表示的,无论左移多少位,还是 32 个 0,即 000 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位全部为 0 的话,表示 SHUTDOWN 状态,即 0;
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP:不接受新任务,不会处理队列任务,而且会中断正在处理过程中的任务
// 在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态
// 1 在 Java 底层是由前面的 31 个 0 和 1 个 1 组成的,左移 29 位的话,即 001 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 001 的话,表示 STOP 状态,即 536870912;
private static final int STOP = 1 << COUNT_BITS;
// TIDYING:所有的任务已结束,workerCount 为 0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态
// 在 Java 底层是由前面的 30 个 0 和 1 个 11 组成的,左移 29 位的话,即 011 00000 00000000 00000000 00000000,也就是低 29 位全部为 0,高 3 位为 011 的话,表示 TERMINATED 状态,即 1610612736;
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED:在 terminated() 方法执行完后进入该状态,默认 terminated() 方法中什么也没有做
private static final int TERMINATED = 3 << COUNT_BITS;
// Packing and unpacking ctl
// 传入的 c 代表的是 ctl 的值,即高 3 位为线程池运行状态 runState,低 29 位为线程池中当前活动的线程数量 workerCount
// runState:线程池运行状态,占据 ctl 的高 3 位,有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED 五种状态
// ~ 是按位取反的意思,CAPACITY 表示的是高位的 3 个 0,和低位的 29 个 1,而 ~CAPACITY 则表示高位的 3 个 1,低位的 29 个 0,然后再与入参 c 执行按位与操作,即高 3 位保持原样,低 29 位全部设置为 0,也就获取了线程池的运行状态 runState。
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 取得当前线程池内有效线程的数量
// workerCount:线程池中当前活动的线程数量,占据 ctl 的低 29 位
// 将 c 与 CAPACITY 进行与操作 &,也就是与 000 11111 11111111 11111111 11111111 进行与操作,c 的前三位通过与 000 进行与操作,无论 c 前三位为何值,最终都会变成 000,也就是舍弃前三位的值,而 c 的低 29 位与 29 个 1 进行与操作,c 的低 29 位还是会保持原值,这样就从 AtomicInteger ctl 中解析出了 workerCount 的值
private static int workerCountOf(int c) { return c & CAPACITY; }
// 原子变量 ctl 的初始化方法
// 传入的 rs 表示线程池运行状态 runState,其是高 3 位有值,低 29 位全部为 0 的 int,而 wc 则代表线程池中有效线程的数量 workerCount,其为高 3 位全部为 0,而低 29 位有值得 int,将 runState 和 workerCount 做或操作 | 处理,即用 runState 的高 3 位,workerCount 的低 29 位填充的数字,而默认传入的 runState、workerCount 分别为 RUNNING 和 0。
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
* Bit field accessors that don't require unpacking ctl.
* These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* Attempts to CAS-increment the workerCount field of ctl.
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* Decrements the workerCount field of ctl. This is called only on
* abrupt termination of a thread (see processWorkerExit). Other
* decrements are performed within getTask.
*/
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
/**
* workQueue 是用于持有任务并将其转换成工作线程 worker 的队列
*/
private final BlockingQueue workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
/**
* workers 是包含线程池中所有工作线程 worker 的集合
* 仅仅当获得 mainLock 锁时才能访问它
*/
private final HashSet workers = new HashSet();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
// 记录 workers 集合最大的元素个数
private int largestPoolSize;
/**
* 线程池已完成的任务的数量
*/
private long completedTaskCount;
/**
* 创建新线程的工厂类
*/
private volatile ThreadFactory threadFactory;
/**
* 执行拒绝策略的处理器
*/
private volatile RejectedExecutionHandler handler;
/**
* 空闲线程等待工作的超时时间(纳秒),即空闲线程存活时间
*/
private volatile long keepAliveTime;
/**
* 是否允许核心线程超时
* 默认值为 false,如果为 false,core 线程在空闲时依然存活;如果为 true,则 core 线程等待工作,直到时间超时至 keepAliveTime
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程池大小,保持存活的工作线程的最小数目,当小于 corePoolSize 时,会直接启动新的一个线程来处理任务,而不管线程池中是否有空闲线程
*/
private volatile int corePoolSize;
/**
* 线程池中线程的最大数量
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/* The context to be used when executing the finalizer, or null. */
private final AccessControlContext acc;
// Public constructors and methods
// 以下是构造方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
/**
* Returns the core number of threads.
*
* @return the core number of threads
* @see #setCorePoolSize
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
/**
* Returns true if this pool allows core threads to time out and
* terminate if no tasks arrive within the keepAlive time, being
* replaced if needed when new tasks arrive. When true, the same
* keep-alive policy applying to non-core threads applies also to
* core threads. When false (the default), core threads are never
* terminated due to lack of incoming tasks.
*
* @return {@code true} if core threads are allowed to time out,
* else {@code false}
*
* @since 1.6
*/
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
/**
* Sets the policy governing whether core threads may time out and
* terminate if no tasks arrive within the keep-alive time, being
* replaced if needed when new tasks arrive. When false, core
* threads are never terminated due to lack of incoming
* tasks. When true, the same keep-alive policy applying to
* non-core threads applies also to core threads. To avoid
* continual thread replacement, the keep-alive time must be
* greater than zero when setting {@code true}. This method
* should in general be called before the pool is actively used.
*
* @param value {@code true} if should time out, else {@code false}
* @throws IllegalArgumentException if value is {@code true}
* and the current keep-alive time is not greater than zero
*
* @since 1.6
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
/**
* Sets the maximum allowed number of threads. This overrides any
* value set in the constructor. If the new value is smaller than
* the current value, excess existing threads will be
* terminated when they next become idle.
*
* @param maximumPoolSize the new maximum
* @throws IllegalArgumentException if the new maximum is
* less than or equal to zero, or
* less than the {@linkplain #getCorePoolSize core pool size}
* @see #getMaximumPoolSize
*/
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
/**
* Returns the maximum allowed number of threads.
*
* @return the maximum allowed number of threads
* @see #setMaximumPoolSize
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* Sets the time limit for which threads may remain idle before
* being terminated. If there are more than the core number of
* threads currently in the pool, after waiting this amount of
* time without processing a task, excess threads will be
* terminated. This overrides any value set in the constructor.
*
* @param time the time to wait. A time value of zero will cause
* excess threads to terminate immediately after executing tasks.
* @param unit the time unit of the {@code time} argument
* @throws IllegalArgumentException if {@code time} less than zero or
* if {@code time} is zero and {@code allowsCoreThreadTimeOut}
* @see #getKeepAliveTime(TimeUnit)
*/
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
/**
* Returns the thread keep-alive time, which is the amount of time
* that threads in excess of the core pool size may remain
* idle before being terminated.
*
* @param unit the desired time unit of the result
* @return the time limit
* @see #setKeepAliveTime(long, TimeUnit)
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* User-level queue utilities */
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be in active use. Retrieving the task queue
* does not prevent queued tasks from executing.
*
* @return the task queue
*/
public BlockingQueue getQueue() {
return workQueue;
}
/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
*
* This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using {@code submit} might be
* converted into a form that maintains {@code Future} status.
* However, in such cases, method {@link #purge} may be used to
* remove those Futures that have been cancelled.
*
* @param task the task to remove
* @return {@code true} if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
final BlockingQueue q = workQueue;
try {
Iterator it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
/* Statistics */
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
}
ThreadFactory
public interface ThreadFactory {
/**
* Constructs a new {@code Thread}. Implementations may also initialize
* priority, name, daemon status, {@code ThreadGroup}, etc.
*
* @param r a runnable to be executed by new thread instance
* @return constructed thread, or {@code null} if the request to
* create a thread is rejected
*/
Thread newThread(Runnable r);
}
public class Executors {
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**
* 默认的线程工厂
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
final ThreadGroup group;//线程组
final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
final String namePrefix;
/*
* 创建默认的线程工厂
*/
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
/*
* 创建一个新的线程
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
// 新线程的名字
namePrefix + threadNumber.getAndIncrement(),
0);
// 将后台守护线程设置为应用线程
if (t.isDaemon())
t.setDaemon(false);
// 将线程的优先级全部设置为 NORM_PRIORITY
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}
Worker
// 通过继承 AQS 来实现独占锁这个功能
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 执行任务的线程
final Thread thread;
/** Initial task to run. Possibly null. */
// 要执行的任务
Runnable firstTask;
/** Per-thread task counter */
// thread 线程已完成的任务数量
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
// 设置 AQS 是 state 为 -1,主要目的是为了在 runWoker 之前不让中断。
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// this 表示 new 的 Worker 对象,Worker 实现了 Runnable 接口
// 所以这里是用 Worker 对象来创建一个 thread 对象
// Worker 中的 thread 的 start 方法会执行 Worker 的 run 方法
// Worker 的 run 方法会调用线程池的 runWorker(this) 方法
// runWorker(this) 则是调用 worker 的 firstTask 的 run 方法
// 好处是可以重复利用 Worker 中的 thread ——> 处理阻塞队列中的任务
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 尝试独占锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放独占锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断已启动线程
void interruptIfStarted() {
Thread t;
// getState() >= 0 说明该线程已启动
// 线程 t 不能为 null
// 并且 t 没有被中断过(中断过就不需要再次中断了)
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
// 中断该线程
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取线程池状态
int c = ctl.get();
// 1. 当前线程池工作线程数小于核心线程池数
if (workerCountOf(c) < corePoolSize) {
// 使用核心线程池中的线程处理任务,成功则返回
if (addWorker(command, true))
return;
// 如果调用核心线程池的线程处理任务失败,则重新获取线程池状态
c = ctl.get();
}
// 2. 如果线程池当前状态仍然处于运行中,则将任务添加到阻塞队列
// addWorker 添加失败会走到这里
if (isRunning(c) && workQueue.offer(command)) {
// 添加到阻塞队列成功后再重新获取线程池状态
int recheck = ctl.get();
// 如果当前线程池状态不是运行中,则从阻塞队列中移除掉刚刚添加的任务
// remove 成功了就 reject 该任务,否则说明任务已经被执行了
if (!isRunning(recheck) && remove(command))
// 移除掉任务后跑出拒绝处理异常
reject(command);
// 否则如果当前线程池线程空,则添加一个线程
else if (workerCountOf(recheck) == 0)
/*
addWorker(null, false) 也就是创建一个线程,但并没有传入任务(null),因为任务已经被添加到 workQueue 中了(remove(command) 失败才进入此 if 代码块),所以 worker 在执行的时候,会直 接从 workQueue 中获取任务(getTask())。
*/
// addWorker(null, false) 为了保证线程池在 RUNNING 状态下必须要有一个线程来执行任务
addWorker(null, false);
}
// 3. 阻塞队列已满,则新增线程处理任务
else if (!addWorker(command, false))
// 新增线程处理任务失败,抛出拒绝处理异常
reject(command);
}
reject
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
addWorker
// firstTask:要执行的任务
// core:是否添加到核心线程池
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// rs >= SHUTDOWN 说明当前线程池不再接受新的任务
// 但是线程池状态为 SHUTDOWN 并且阻塞队列有任务时,仍可以处理这些任务
// 此时 firstTask = null,不是新增任务,而是新增线程来处理任务,即:
// rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// corePoolSize 未满,核心线程的数量加 1
// 获取最新的核心线程池数量
// 获取最新的线程池状态(和原先状态不一致,重新循环)
for (;;) {
// 线程池中的线程数
int wc = workerCountOf(c);
// 如果线程数超限,则返回
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 自增 workerCount,如果成功,则退出 retry 循环
// 否则更新 ctl,然后判断当前状态是否改变,已改变就从外层 for 循环开始重新执行(外层 for 循环有判断状态逻辑)
// 如果状态没有改变,则重试自增 workerCount 操作
if (compareAndIncrementWorkerCount(c))
break retry;
// 设置 workerCount 失败,重新获取线程池状态
c = ctl.get(); // Re-read ctl
// 如果线程池当前的状态和方法开始时的状态一致,则重新循环本层的 for 循环
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 初始化 worker
w = new Worker(firstTask);
// 执行这个任务的线程
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 获取 mianLock 锁,准备启动线程
// 先判断线程池的状态,再判断线程是否已启动
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN 表示是 RUNNING 状态
// rs 是 SHUTDOWN 状态并且 firstTask 为 null,向线程池中添加线程,用来处理阻塞队列中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果线程已经被 start 过了,则抛出异常,不允许重复调用 start
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 添加任务到 HashSet 集合中
workers.add(w);
int s = workers.size();
// 如果 workers 的长度(任务队列长度)大于最大线程数量,则更新最大线程数量
// largestPoolSize 记录着线程池中出现过的最大线程数量
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
// 释放 mainLock 锁
mainLock.unlock();
}
// 已添加任务到 workers 集合
if (workerAdded) {
// 启动线程
t.start();
// 线程已启动
workerStarted = true;
}
}
} finally {
// 任务添加失败,或者任务添加成功但是启动失败
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorkerFailed
/**
* Rolls back the worker thread creation.
* - removes worker from workers, if present
* - decrements worker count
* - rechecks for termination, in case the existence of this
* worker was holding up termination
*/
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
// 获取 mainLock 锁
mainLock.lock();
try {
// 如果 worker 启动失败,则:
// 1. 如果 worker 不为 null,则从 workers 集合中移除该任务
if (w != null)
workers.remove(w);
// 2. workerCount 自减 1
decrementWorkerCount();
// 3. 根据线程池状态进行判断是否结束线程池
tryTerminate();
} finally {
// 释放 mainLock 锁
mainLock.unlock();
}
}
tryTerminate
// 根据线程池状态进行判断是否结束线程池
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 当前线程池的状态为以下几种情况时,直接返回:
// 1. 因为还在运行中,不能停止
if (isRunning(c) ||
// 2. TIDYING 或 TERMINATED,其它线程已经在结束线程池了,无需当前线程来结束
runStateAtLeast(c, TIDYING) ||
// 3. 调用 shutdown() 方法后的状态是 SHUTDOWN,但是仍然可以处理队列中的任务
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 4. 如果线程数量不为 0,则中断一个空闲的工作线程,并返回
if (workerCountOf(c) != 0) { // Eligible to terminate
/**
当 shutdown() 方法被调用时,会执行 interruptIdleWorkers(),
此方法会先检查线程是否是空闲状态,如果发现线程不是空闲状态,才会中断线程,
中断线程让在任务队列中阻塞的线程醒过来。但是如果在执行 interruptIdleWorkers() 方法时,
线程正在运行,此时并没有被中断;如果线程执行完任务后,然后又去调用了getTask (),
这时如果 workQueue 中没有任务了,调用 workQueue.take() 时就会一直阻塞。
这时该线程便错过了 shutdown() 的中断信号,若没有额外的操作,线程会一直处于阻塞的状态。
所以每次在工作线程结束时调用 tryTerminate 方法来尝试中断一个空闲工作线程,
避免在队列为空时取任务一直阻塞的情况,弥补了 shutdown() 中丢失的信号。
*/
interruptIdleWorkers(ONLY_ONE);
return;
}
// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1. SHUTDOWN 状态,这时不再接受新任务而且任务队列也空了
// 2. STOP 状态,当调用了 shutdownNow 方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 5. 这里尝试设置状态为 TIDYING,如果设置成功,则调用 terminated 方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// terminated 方法默认什么都不做,留给子类实现
terminated();
} finally {
// 设置状态为 TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
interruptWorkers
// 中断所有已启动线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
interruptIdleWorkers 相关方法
// 中断所有空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 首先看当前线程是否已经中断,如果没有中断,就看线程是否处于空闲状态
// 如果能获得线程关联的 Worker 锁,说明线程处于空闲状态,可以中断
// 否则说明线程不能中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
// 如果 onlyOne 为 true,只尝试中断第一个线程
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
runWorker
final void runWorker(Worker w) {
// 获取当前线程(等价于 w 的 thread)
Thread wt = Thread.currentThread();
// 获取当前 Worker 对象的任务
Runnable task = w.firstTask;
w.firstTask = null;
// unlock 源码:release(1);
// new Woker() 时,设置了 state 是 -1,这里调用 unlock,作用是将 state 位置为 0,允许 worker 中断
w.unlock(); // allow interrupts
// 用来标记线程是正常退出循环还是异常退出
boolean completedAbruptly = true;
try {
// 如果任务不为空,说明是刚创建线程,
// 如果任务为空,则从队列中取任务,如果队列没有任务,线程就会阻塞在这里(getTask 方法里调用了队列的 take 阻塞方法)。这里从阻塞队列中获取任务并执行,而不用新建线程去执行,这就是线程池的优势。
// task 执行完后在 finally 块中将其设置成了 null,所以第一次 worker 中 firstTask 执行完成后,后面都会从阻塞队列中获取任务来处理
while (task != null || (task = getTask()) != null) {
// worker 获取独占锁,准备执行任务
w.lock();
/**
第一个条件 runStateAtLeast(ctl.get(), STOP) 为 true,表示状态 >= STOP,而线程没有被中断,则线程需要被中断
第一个条件 runStateAtLeast(ctl.get(), STOP) 为 false,则再去判断当前线程是否被中断,如果被中断,则继续判断是否线程池状态 >= STOP
因为前面调用了 Thread.interrupted(),所以 wt.isInterrupted() 为 false,即线程没有被中断,则线程需要被中断
*/
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 任务执行之前做一些处理,空函数,需要用户定义处理逻辑
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务,也就是提交到线程池里的任务,且捕获异常
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
// 因为 runnable 方法不能抛出 checkedException ,所以这里
// 将异常包装成 Error 抛出
thrown = x;
throw new Error(x);
} finally {
// 任务执行完之后做一些处理,默认空函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
// 如果在执行 task.run() 时抛出异常,是不会走到这里的
// 所以抛出异常时,completedAbruptly 是 true,表示线程异常退出
completedAbruptly = false;
} finally {
// 线程
processWorkerExit(w, completedAbruptly);
}
}
getTask
// 从线程池阻塞队列中取任务
// 返回 null 前(线程正常销毁退出),都会进行 workerCount 减 1 操作
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// 获取当前线程池状态
int rs = runStateOf(c);
// rs >= STOP(线程池不接收任务,也不处理阻塞队列中的任务)
// 或者
// rs >= SHUTDOWN 且 阻塞队列为空(线程池不接收任务,且把阻塞队列中的任务处理完了)
// 这时候将核心线程的数量减 1,并直接返回 null,线程不再继续处理任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 方法实现:do {} while (! compareAndDecrementWorkerCount(ctl.get()));
// 不停的获取线程数量,并进行核心线程数的自减操作
decrementWorkerCount();
// 当前线程需要销毁
return null;
}
// 获取当前线程池工作线程数
int wc = workerCountOf(c);
// Are workers subject to culling?
// timed 用于判断是否需要进行超时控制。
// allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时
// wc > corePoolSize,表示当前线程池中的线程数量大于 corePoolSize,对于超过核心线程数量的这些线程,需要进行超时控制
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize 需要销毁线程(setMaximumPoolSize 可能会导致 maximumPoolSize 变小了)
// (这里 wc 要么 > maximumPoolSize,要么 > corePoolSize,所以销毁线程不会对线程池造成影响)
// timed && timedOut 说明线程空闲超时了,需要销毁线程
if ((wc > maximumPoolSize || (timed && timedOut))
// wc 大于 1 或 阻塞队列为空
&& (wc > 1 || workQueue.isEmpty())) {
// 线程数自减
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据 timed 来判断 workQueue 是超时等待获取队列任务,还是一直阻塞等待任务
Runnable r = timed
// 超时等待:当超过给定 keepAliveTime 时间还没有获取到任务时,则会返回 null,此时 Woker 会被销毁(getTask 方法返回 null 时,在 runWorker 方法中会跳出 while 循环,然后会执行 processWorkerExit 方法)
// keepAliveTime 就是线程的空闲时间(所以可以用来作为获取任务的等待超时时间)
? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)
// 阻塞等待:一直阻塞,直到有任务进来
: workQueue.take();
if (r != null)
return r;
// 获取任务超时,需要重新走循环获取任务
timedOut = true;
} catch (InterruptedException retry) {
// 如果获取任务时当前线程发生了中断,则设置 timedOut 为 false 并返回循环重试
timedOut = false;
}
}
}
processWorkerExit
// processWorkerExit 方法逻辑和 addWorkerFailed 方法逻辑类似
// processWorkerExit 需要将该线程已完成的任务数加到线程池的所有已完成任务中
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果 completedAbruptly 值为 true,则说明线程执行时出现了异常,将 workerCount 减 1
// 如果线程执行时没有出现异常,说明在 getTask() 方法中可能已经已经对 workerCount 进行了减 1 操作,这里就不必再减了
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计完成的任务数
completedTaskCount += w.completedTasks;
// 从 workers 中移除,也就表示着从线程池中移除了一个工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
// getTask 方法中,线程数多了要销毁线程
// 这里线程数少了,要添加线程
// 此时状态是 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 不是异常退出,说明从 getTask() 返回 null 导致的退出
if (!completedAbruptly) {
// 最小线程数,allowCoreThreadTimeOut 为 true,则核心线程可以被销毁,所以数量最少可以为 0,否则最少要保留 corePoolSize 个核心线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && !workQueue.isEmpty())
// 如果 allowCoreThreadTimeOut = true,并且等待队列有任务,至少保留一个线程来处理任务
// 修正最小核心线程数为 1
min = 1;
// 走到这里,说明 allowCoreThreadTimeOut = false,则 workerCount 不少于 corePoolSize
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 异常退出
// 线程池中,当前活跃线程数不满足大于等于 min,要给线程池添加一个线程来处理任务
addWorker(null, false);
}
}
hook 方法
/* Extension hooks */
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() { }
shutdown、shutdownNow 相关方法
/*
* Methods for controlling interrupts to worker threads.
*/
/**
* If there is a security manager, makes sure caller has
* permission to shut down threads in general (see shutdownPerm).
* If this passes, additionally makes sure the caller is allowed
* to interrupt each worker thread. This might not be true even if
* first check passed, if the SecurityManager treats some threads
* specially.
*/
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
void onShutdown() {
}
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* 取出阻塞队列中没有被执行的任务并返回
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List drainQueue() {
BlockingQueue q = workQueue;
ArrayList taskList = new ArrayList();
// drainTo 一次性从 BlockingQueue 获取所有可用的数据对象(还可以指定获取数据的个数)
// 通过该方法,可以提升获取数据效率,不需要多次分批加锁或释放锁
q.drainTo(taskList);
if (!q.isEmpty()) {
// 将 List 转换为数组,循环,取出 drainTo 方法未取完的元素
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查当前线程是否有关闭线程池的权限
checkShutdownAccess();
// 将线程池状态提升为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程,这里最终调用 interruptIdleWorkers(false);
interruptIdleWorkers();
// hook 方法,默认为空,让用户在线程池关闭时可以做一些操作
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 检查是否可以关闭线程池
tryTerminate();
}
public List shutdownNow() {
List tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 检查线程是否具有关闭线程池的权限
checkShutdownAccess();
// 将线程池状态提升为 STOP
advanceRunState(STOP);
// 中断所有工作线程,无论是否空闲
interruptWorkers();
// 取出队列中没有被执行的任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
// 等待线程池状态变为 TERMINATED 则返回,或者时间超时。由于整个过程独占锁,所以一般调用 shutdown 或者 shutdownNow 后使用
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
// 如果线程池状态为 TERMINATED,则返回 true
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
// 如果超时了,并且状态还不是 TERMINATED,则返回 false
if (nanos <= 0)
return false;
// 超时等待
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
RejectedExecutionHandler
public interface RejectedExecutionHandler {
// r 为请求执行的任务,executor 为线程池
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
/* Predefined RejectedExecutionHandlers */
/**
直接在调用者线程中,运行当前被丢弃的任务
这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
直接抛出异常
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 抛出 RejectedExecutionException 异常
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
丢弃无法处理的任务
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 不处理直接丢弃掉任务
}
}
/**
丢弃队列中最早被阻塞的线程
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
// 从线程池中的阻塞队列中取出第一个元素(丢弃队列中最久的一个待处理任务)
e.getQueue().poll();
// 将请求处理的任务 r 再次放到线程池中去执行
e.execute(r);
}
}
}
ScheduledThreadPoolExecutor 源码
ScheduledThreadPoolExecutor 源码后面再分析。