线程池原理探究
一、什么是线程池
在Java中,如果每有一个任务就创建一个新的线程,创建和销毁线程的系统资源是很大的,可能远大于处理任务的开销。在Jvm中创建太多的线程,可能会使系统由于过度消耗内存或者频繁的切换上下文为资源不足。
为了解决这个问题,就有了线程池的概念,核心逻辑就是提前创建好一些线程放在一个容器里,如果有任务要处理,就分配给池中的线程来处理,处理完这个任务之后不销毁线程,继续处理其他任务。本质是生产者消费者模型,如下图:
合理的使用线程池,带来的好处:
- 避免系统频繁的创建和销毁线程(复用线程),提升使用的性能。
- 设置合理的线程池大小,避免因硬件资源瓶颈带来的问题
- 提供了监控方法,能够更好的监控线程的使用情况。
二、线程池的模式
主要分为Leader-Follower 和 half-sync/half-async两个模式的线程池。
1.Leader-Follower模式
所有的线程只有三种状态:leading,following和processing,这种模型永远最多只有一个线程处于leading,所有的following线程都在等待成为leading线程,会设置一把锁,谁抢到谁就变为leading,有任务来的时候,leading线程对其处理,并转化为processing状态,处理完之后会变成following或者leading(恰好此时没有领导者),当leading丢失后,所有的following线程开始抢锁,抢到就变为leading。
我们来分析下这种模式,它有一个很明显的优点就是接受任务和处理任务的是同一个线程,没有上下文的切换,不要处理线程间的通信问题。缺点就是它只适合短的线程任务处理,高并发的情况下,如果处理时间都很长,所有线程都处于processing状态,就会造成阻塞。
在java中的ScheduledThreadPool中DelayedWorkQueue就使用了这种模式,后面第十三小节会分析。
2.half-sync/half-async模式
又称为生产者消费者模式,分为同步层、异步层和队列层,同步层的主线程接收到任务后,存入队列里,异步层的工作线程从队列中取任务进行处理,如果工作队列为空,则取不到任务的工作线程进入挂起状态。来看下图。
这种方式的优点可以让线程并行,增加系统的并发性,减少不必要的等待,又保持了同步编程的简单性。它的缺点就是线程之间有数据的通信,Queue可能会积压任务,造成OOM,不大适合大数据量交换的场合,且要对队列进行控制。
Java中的ThreadPoolExecutor使用的模式就是这种,ThreadPoolExecutor的execute是支持并发操作的,里面使用了CAS操作和阻塞队列来保证了安全性,本质都是生产者消费者模型。
三、线程池的创建和ThreadPoolExecutor的参数分析
J.U.C包下面提供了快速创建的线程池的工具类Executors,我们甚至不需要了解太多ThreadPoolExecutor的知识就可以使用线程池。
常用的几个方法如下面的表格所示:
方法 |
作用 |
newFixedThreadPool |
创建指定线程数的线程池 |
newSingleThreadExecutor |
创建一个线程数的线程池 |
newCachedThreadPool |
创建一个可以根据实际情况调整大小的线程池(空闲60s会回收) |
newSingleThreadScheduledExecutor |
创建一个可执行定时任务线程的线程池 |
newScheduledThreadPool |
创建指定数量可执行定时任务线程的线程池 |
newWorkStealingPool |
1.8中新增的,工作窃取,主要是fork/join池子,最后分析 |
举个简单的例子:
public class ThreadPoolRun { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 100; i++) { executorService.execute(()-> { try { Thread.sleep(1000); System.out.println("过了1秒钟"); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
我们使用的是newFixedThreadPool创建了一个大小为3的线程池,运行我们发现打印语句是三条三条输出的。
我们看一下newFixedThreadPool的源码:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue
}
我们可以发现实际上是调用了ThreadPoolExecutor来创建线程的。我们看一下他的构造函数
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
解释下各个参数的作用,稍后的源码分析中会详细看
corePoolSize:线程池核心线程个数;
maximumPoolSize:线程池最大线程数量;
keepAliveTime:存活时间,如果当前线程池中的数量比核心线程数量多,并且当前线程是闲置状态,该变量就是这些线程的最大生存时间;
TimeUnit:存活时间的时间单位
workQueue:用于保存等待任务执行的任务的阻塞队列
ThreadFactory:创建线程的工厂,默认使用DefaultThreadFactory
RejectedExecutionHandler:拒绝策略,表示当队列已满并且线程数量达到线程池最大线程数量的时候对新提交的任务所采取的策略
四、线程池的生命周期
线程池总共有五种状态:
1.RUNNING:能接受新提交的任务,并且也能处理阻塞队列中的任务;
2.SHUTDOWN:关闭状态,不再接受新提交的任务,但可以继续处理阻塞队列中已保存的任务。
3.STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
4.TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
5.TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。
下图为线程池状态的转化:
五、线程池的实现原理分析
给大家看个线程池的流程图:
下面用核心方法execute的源码开始分析:
第三行是int c = ctl.get();
所以我们先说下ctl的啥,去看它定义的地方
根据里面的注释,我们可以知道,ctl是线程池对运行状态(runState)和有效线程数(workerCount)的一个控制字段;使用了整形类型来保存,其中高三位保存的是线程池的状态(五种),低29位存储的是有效线程数。运行状态已经在上面的第四节分析过了。
runStateOf方法:获取运行状态;(高三位)
workerCountOf方法:获取活动线程数;(低29位)
ctlOf方法:获取运行状态和活动线程数的值。(根据线程池状态和线程数,来获取ctl的值)
接下来一行一行分析execute源码:
public void execute(Runnable command) { if (command == null) // 判断是否command为空 throw new NullPointerException(); int c = ctl.get(); // 获取线程状态和线程的数量 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) // 1.如果当前池中的线程数小于核心线程数,就新建一个线程加入池中 return; c = ctl.get(); // 添加失败则重新获取ctl } // 2.核心线程满了,workQueue队列未满,添加command成功时,进入 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 重新获取下ctl if (! isRunning(recheck) && remove(command)) // 再次检测,此时不是运行状态了,从队列中删除刚才添加的command reject(command); // 调用拒绝策略 else if (workerCountOf(recheck) == 0) // 活动的线程数量为0,即线程池空了 addWorker(null, false); // 创建一个临时线程 } // 3.核心线程满了,队列满了,尝试创建一个临时线程,则直接调用拒绝策略 else if (!addWorker(command, false)) reject(command); // 创建失败,说明完全满了或者被关闭了,调用拒绝策略 }
上面的代码总结下如下图所示:
继续分析下addWorker(Runnable firstTask, boolean core)的源码,看看是如何创建线程的。
private boolean addWorker(Runnable firstTask, boolean core) { retry: // 标记这个循环位置 for (;;) { int c = ctl.get(); // 获取ctl的值 int rs = runStateOf(c); // 获取运行的状态 // 当状态值大于等于shutdown时,即非running状态 // 且判断下面三个条件有一种不满足,都不可以在接收新的任务,返回添加失败:1.rs==shutdown 2.firstTask为空 3.阻塞队列不为空 // 换句话理解就是:在shutdown状态的时候还可以处理阻塞队列里的任务,但是不可以新加了(firstTask不为空),大于它的状态不可以,阻塞队列空了直接返回。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; // 添加失败 // cas增加线程的个数,采用了自旋 for (;;) { int wc = workerCountOf(c); // 获取运行的线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 当线程数大于上限或者大于核心线程数/最大线程数(根据传入的参数core来选择核心还是最大) return false; // 添加失败 if (compareAndIncrementWorkerCount(c)) // 尝试增加workerCount,如果成功,则跳出第一个for循环 break retry; c = ctl.get(); // 添加失败,重新获取ctl的值 if (runStateOf(c) != rs) // 判断下现在的状态是否发生改变,改变了调到第一层循环,重新获取线程的状态 continue retry; // else CAS failed due to workerCount change; retry inner loop (否则,重新再第2层循环执行,自旋继续尝试) } } // 走到这里说明cas成功了,线程数量成功加1了,下面是构建一个线程 boolean workerStarted = false; //工作线程是否启动的标识 boolean workerAdded = false;/ /工作线程是否已经添加成功的标识 Worker w = null; try { w = new Worker(firstTask); // 根据firstTask新建一个Worker对象 final Thread t = w.thread; // 每一个Worker对象都会创建一个线程 if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // 加锁,避免并发问题,因为workers是HashSet不是线程安全的 try { int rs = runStateOf(ctl.get());// 重新获取线程池状态,避免获取锁前调用了shutdown // rs < SHUTDOWN表示是RUNNING状态; // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) throw new IllegalThreadStateException(); workers.add(w); // 往workers里添加对象,workers是个hashset int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // largestPoolSize记录着线程池中出现过的最大线程数量 workerAdded = true; //表示工作线程创建成功了 } } finally { mainLock.unlock(); // 释放锁 } if (workerAdded) { // 添加成功后,启动线程 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 未启动成功 addWorkerFailed(w); // 调用addWorkerFailed方法,主要是从workers删除,减少线程数,并尝试着关闭线程池这样的操作 } return workerStarted; //返回结果 }
上述源码比较长,简单的说实际上就做了两件事:先通过cas操作来增加线程池中的线程个数,再安全的将任务添加到Workers里面,并启动线程执行任务。
六、阻塞队列
上面一直提到了阻塞队列workQueue,这里补充下阻塞队列的知识
BlockingQueue继承了Queue接口,是队列的一种。为什么叫它阻塞队列呢,因为它和趋同队列有两点区别:1.队列为空的时候,获取元素的线程会等待队列变为非空;2.队列满的时候,存储元素的线程会等待队列不满。所以说阻塞队列常用于生产者消费者的场景,我们的线程池的本质恰好是一个生产者消费者的场景,所以用它来存储任务再合适不过。
这里稍微提一下阻塞队列的实现原理:使用了ReentrantLock中的Condition的signal和await来实现阻塞和唤醒的控制,里面维护notEmpty和notFull 两个条件来控制阻塞和唤醒。
阻塞队列提供了四种处理方法:
常用的阻塞队列如下:
l ArrayBlockingQueue:基于数组实现的有界阻塞队列
l LinkedBlockingQueue:基于链表实现的有界阻塞队列,默认长度是Integer.MAX_VALUE
l SynchronousQueue:无缓冲的,容量为0的队列:每次去元素要阻塞,直至有数据来;每次放数据也要阻塞,直至有消费者来去取
l DelayQueue:延迟队列,无边界队列
l PriorityBlockingQueue:基于优先级的阻塞队列
l LinkedTransferQueue:一个由链表结构组成的无边界阻塞队列
分析下线程池中使用的方法:
1.线程池中往workQueue中添加元素的时候,我们没必要阻塞,添加成功了返回true,失败了就返回false,所以很显然offer更适合
2.对于核心线程来说,当没有任务处理的时候,我们不应该删除这个线程,应该给它阻塞住,所以使用了take()方法来取任务,当没有任务的时候会阻塞住。
3.对于非核心线程来说,当它空闲的时候,我们应该回收掉,所以使用了poll(time.unit)来取元素,当过了设置的超时时间后,会唤醒线程返回,然后回收掉非核心线程。
分析下线程池中会使用的队列:
newFixedThreadPool、newSingleThreadExecutor方法创建的线程池使用的是LinkedBlockingQueue,它的默认长度是Integer.MAX_VALUE,使用不当会导致大量请求堆积到队列中导致 OOM 的风险。
newCachedThreadPool使用的是SynchronousQueue,这里很巧妙,就是每个线程只能处理一个任务,不存储任务,当空闲达到设置的时间时,就直接回收了。
七、Worker类的分析
从上面的addWorker源码中,我们知道会new Worker对象并加到workers里面,下面具体分析下Worker类。
Worker即实现了Runnable接口,有继承了AQS抽象类。
看下成员变量:firstTask是保存传入的任务,第一次执行的,优先于从队列取;thread是在构造方法里通过ThreadFactory来创建的,用来处理任务的线程。
构造方法如下:
Worker(Runnable firstTask) { setState(-1); // 初始化为 -1,这样在线程运行前(调用runWorker)禁止中断,在 interruptIfStarted() 方法中会判断 getState()>=0;当调用shutdownNow时,会调用interruptIfStarted方法,也就是说如果线程都没开始执行,就没必要中断了;只有执行的时候才会先调用unlock,将状态设为0 this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); }
可以看到新建一个线程,newThread方法会将this传入,也就是自己,所以线程启动时,会调用run方法,然后调用runWorker方法。
还有一点,为什么Worker继承了AQS,使用AQS来实现独占锁,而不是直接使用ReentrantLock来实现呢?
我们可以看Worker类里的tryAcquire方法,发现它是不可以重入的,而ReentrantLock是可重入的。
// 尝试获取锁(不允许重入) protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { // 只有状态时0的时候才可以获取锁 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
使用AQS的作用:
- 如果正在执行任务 ,不应该中断线程
- 如果该线程不是独占锁的状态,也就是空闲状态,说明它没有在处理任务,此时是可以中断的
- 线程池在执行shutdown方法或者tryTerminate方法时会调用interruptIdleWorkers方法来中断空闲的线程,interruptIdleWorkers方法会使用tryLock方法来判断线程池中的线程是否处于空闲状态
- 之所以设置为不可重入,因为我们不希望任务在调用shutdown,setCorePoolSize,setMaximumPoolSize等方法时调用interruptIdleWorkers时重新获取到锁,这样会中断正在运行的线程,此时应该让线程先执行完,达到优雅关闭线程的目的。
八、runWorker方法的分析
前面的章节我们知道了addWorker方法,主要是增加工作的线程,而Worker可以简单理解为一个线程,里面重写了run方法,run里面调用了runWorker。这实际上是线程池处理任务的真正的逻辑。它主要做了这几件事:
1.如果worker中的firstTask不为空,则开始执行 firstTask;
2.如果firstTask为空,则通过 getTask()从阻塞队列中获取取任务,并赋值给 task,如果取到的 Runnable不为空,则执行该任务。
3. 执行完毕后,通过 while 循环继续 getTask()取任务
4. 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
这里线程池如何做到保持核心线程,而删除空闲的临时线程的呢。
这里需要深入看下 getTask()里面的代码,在从workQueue中取任务时,分两种情况:
1.当工作的的线程数小于核心线程数时,使用的是take方法,这里如果阻塞队列空了,会直接阻塞,线程挂起,直至有新的任务的到来;
2.如果当工作的线程数大于核心线程数时,调用的是poll(keepAliveTime, TimeUnit.NANOSECONDS) ,当队列为空且存活时间结束后,会返回null,最后调用runWorker中的processWorkerExit(w, completedAbruptly)来删除此临时线程。
九、阻塞队列
上面一直提到了阻塞队列workQueue,这里补充下阻塞队列的知识
BlockingQueue继承了Queue接口,是队列的一种。为什么叫它阻塞队列呢,因为它和普通队列有两点区别:1.队列为空的时候,获取元素的线程会等待队列变为非空;2.队列满的时候,存储元素的线程会等待队列不满。所以说阻塞队列常用于生产者消费者的场景,我们的线程池的本质恰好是一个生产者消费者的场景,所以用它来存储任务再合适不过。
这里稍微提一下阻塞队列的实现原理:使用了ReentrantLock中的Condition的signal和await来实现阻塞和唤醒的控制,里面维护notEmpty和notFull 两个条件来控制阻塞和唤醒。
阻塞队列提供了四种处理方法:
常用的阻塞队列如下:
l ArrayBlockingQueue:基于数组实现的有界阻塞队列
l LinkedBlockingQueue:基于链表实现的有界阻塞队列,默认长度是Integer.MAX_VALUE
l SynchronousQueue:无缓冲的,容量为0的队列:每次去元素要阻塞,直至有数据来;每次放数据也要阻塞,直至有消费者来去取
l DelayQueue:延迟队列,无边界队列
l PriorityBlockingQueue:基于优先级的阻塞队列
l LinkedTransferQueue:一个由链表结构组成的无边界阻塞队列
分析下线程池中使用的方法:
1.线程池中往workQueue中添加元素的时候,我们没必要阻塞,添加成功了返回true,失败了就返回false,所以很显然offer更适合
2.对于核心线程来说,当没有任务处理的时候,我们不应该删除这个线程,应该给它阻塞住,所以使用了take()方法来取任务,当没有任务的时候会阻塞住。
3.对于非核心线程来说,当它空闲的时候,我们应该回收掉,所以使用了poll(time,unit)来取元素,当过了设置的超时时间后,会唤醒线程返回,然后回收掉非核心线程。
分析下线程池中会使用的队列:
newFixedThreadPool、newSingleThreadExecutor方法创建的线程池使用的是LinkedBlockingQueue,它的默认长度是Integer.MAX_VALUE,使用不当会导致大量请求堆积到队列中导致 OOM 的风险。
newCachedThreadPool使用的是SynchronousQueue,这里很巧妙,就是每个线程只能处理一个任务,不存储任务,当空闲达到设置的时间时,就直接回收了。
十、回顾
上面的源码分析完,我们结合刚才分析的方法,将刚开始的图修正下:
十一、注意事项
1.不允许使用 Executors?
阿里开发手册上是说线程池的构建不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式。
我来简单分析下原因,用 Executors 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题,比如我们用 newFixdThreadPool 或者 singleThreadPool。对于newFixdThreadPool 我们可以看到它使用的LinkedBlockingQueue,允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险;而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题。
而如果我们通过 ThreadPoolExecutor 来构造线程池的话,我们势必要了解线程池构造中每个参数的具体含义,使得开发者在配置参数的时候能够更加谨慎。
对于我们游戏业务来说,区别于Web业务,实际是可以使用newFixdThreadPool 来快速创建线程的,因为我们的任务数量是可知的,一般不会出现大量请求堆积到队列中。
2.如何合理配置线程池的大小
线程池大小不是靠猜,也不是说越多越好。我们需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型。
如果是 CPU 密集型,主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高,那么线程数的配置应该根据 CPU 核心数来决定,CPU 核心数=最大同时执行线程数,假设 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。那线程池的最大线程数可以配置为 cpu 核心数+1。
如果是 IO 密集型,主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 处于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。这种情况下可以结合线程的等待时长来做判断,等待时间越高,那么线程数也相对越多。一般可以配置 cpu 核心数的 2 倍。也可以参考公式:线程池设定最佳线程数目=(线程等待时间与线程CPU时间之比 + 1)* CPU数目。这个公式的线程 cpu 时间是预估的程序单个线程在 cpu 上运行的时间(通常使用 loadrunner测试大量运行次数求出平均值)。
3.线程池中的线程初始化
默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。在实际中如果需要 线程池创建之后立即创建线程,可以通过以下两个方法办到:
- prestartCoreThread():初始化一个核心线程;
- prestartAllCoreThreads():初始化所有核心线程
4.线程池的关闭
ThreadPoolExecutor 提供了两个方法,用于线程池的关闭,分别是:
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
5.线程池容量的动态调整
ThreadPoolExecutor提供了动态调整线程池容量大小的方法:
setCorePoolSize:设置核心池大小
setMaximumPoolSize:设置线程池最大能创建的线程数目大小
6.任务拒绝策略
通常有以下四种策略:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
十二、Callable/Future 使用及原理分析
上面主要是针对execute来分析的,实际上线程池执行任务还有一种方法submit
两者的区别:
1.execute() 方法用于将不需要返回值的任务提交给线程池执行,所以无法判断任务是否被线程池执行成功与否。
2. submit() 方法用于将需要返回值的任务提交给线程池执行。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get() 方法来获取返回值,get() 方法会阻塞当前线程直到任务完成
我们看下submit的源码:
publicFuture submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask; }
实际上也就是封装了以一个FutureTask对象,然后去执行execute方法。
所以我们要重点了解一下 Callable/Future。
首先看个带返回值的线程的例子:
public class CallableDemo implements Callable{ @Override public String call() throws Exception { System.out.println("执行了call方法"); Thread.sleep(10000); // 阻塞10秒 return "hello World"; } public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask futureTask = new FutureTask(new CallableDemo()); new Thread(futureTask).start(); System.out.println(futureTask.get()); // 10秒后才得到值 } }
输出结果如下:
我们深入的去看下FutureTask类,我们发现它实现了RunnableFuture接口,而RunnableFuture继承了Runable和Future接口,类图如下:
Runnable我们很熟悉,Future实际上是一个任务的生命周期,里面提供了相应的方法来判断任务是否完成或者取消,以及获取任务的结果和取消任务等。
public interface Future{ // 取消任务 boolean cancel(boolean mayInterruptIfRunning); // 判断任务是否被取消 boolean isCancelled(); // 任务是否结束 boolean isDone(); // 获取任务执行的结果,如果未执行完,会阻塞(FutureTask里有实现) V get() throws InterruptedException, ExecutionException; // 获取任务执行的结果,多了个超时时间 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
实际上可以理解为run()方法去计算结果是个生产者,get()方法去取结果是个消费者。生产者数据准备好了,会唤醒消费者来取数据,类似于上面分析的阻塞队列。
我们详细看下run方法
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; // 保证只有一个线程可以进入到下面的try方法 try { Callablec = callable; if (c != null && state == NEW) { // 当c不为空,且状态为New时 V result; // 返回结果 boolean ran; // 运行成功的标志 try { result = c.call(); // 执行callable的call方法,获取返回结果 ran = true; // 运行成功 } catch (Throwable ex) { result = null; ran = false; setException(ex); // 设置异常结果 } if (ran) set(result); // 将结果保存起来,方便get方法获取 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
实际上run方法主要就是调用了callable的call方法返回结果值,并保存起来;如果发生了异常也会把异常存起来。要注意的是run方法使用了CAS来实现了并发安全,保证只有一个线程能够运行FutureTask任务。
再看下get方法,实际就是阻塞获取线程执行的结果,如果任务还没执行完,会调用awaitDone方法来让当前线程等待,有兴趣的可以自己看下awaitDone方法,这里就不分析了。
回到线程池的submit方法,实际上会通过worker线程来调用封装的ftask的run方法,run方法就是上面分析的,所以我们可以通过Future的get来获取返回值。
总体的流程图如下:
十三、ForkJoinPool的使用
在JAVA1.7版本新加入了一种线程池ForkJoinPool,它强调的是分而治之的思想,可以大任务通过划分成小任务执行(即fork),最后将小任务的结果汇总到一个结果上(即join).
ForkJoinPool主要两个方法:fork和join
下面写个简单的例子来看下:
比如要计算1-1亿数的相加,我们可以借助ForkJoinPool来实现。
public class ForkJoinPoolTest { private static final int MAX =10000000; public static void main(String[] args) { int sum=0; ForkJoinPool pool = new ForkJoinPool(); ForkJoinTasktaskFuture = pool.submit(new MyForkJoinTask(1,100000000)); try { sum = taskFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(System.out); } System.out.println(sum); } static class MyForkJoinTask extends RecursiveTask { // 子任务开始计算的值 private Integer startValue; // 子任务结束计算的值 private Integer endValue; public MyForkJoinTask(Integer startValue , Integer endValue) { this.startValue = startValue; this.endValue = endValue; } @Override protected Integer compute() { // 如果条件成立,说明这个任务所需要计算的数值分为足够小了 // 可以正式进行累加计算了 if(endValue - startValue < MAX) { System.out.println("开始计算的部分:startValue = " + startValue + ";endValue = " + endValue); Integer totalValue = 0; for(int index = this.startValue ; index <= this.endValue ; index++) { totalValue += index; } return totalValue; } // 否则再进行任务拆分,拆分成两个任务 else { MyForkJoinTask subTask1 = new MyForkJoinTask(startValue, (startValue + endValue) / 2); subTask1.fork(); MyForkJoinTask subTask2 = new MyForkJoinTask((startValue + endValue) / 2 + 1 , endValue); subTask2.fork(); return subTask1.join() + subTask2.join(); } } } }
结果如下:
十四、newScheduledThreadPool和newWorkStealingPool探究
上面的主要是分析了ThreadPoolExecutor类创建的线程池,这里提一下另外两种线程池,可定时或者周期性运行的线程池和具有抢占式操作的线程池。
1.首先是newScheduledThreadPool 和 其他线程池最大的区别是使用的阻塞队列是
DelayedWorkQueue,而且多了两个定时执行的方法scheduleAtFixedRate和
scheduleWithFixedDelay。支持定时以及周期性执行任务,1.5版本之前我们要借助Timer来实现,但它是单线程的,比较慢,而使用这个线程池可以自行控制线程数。
下面是个简单的例子:
public class ScheduledThreadPoolTest { public static void main(String[] args) { System.out.println("---- start ----"); ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(8); for (int i = 0; i < 10; i++) { scheduledThreadPool.schedule(() -> { System.out.println(Thread.currentThread().getName()); }, 3, TimeUnit.SECONDS); } System.out.println("---- end ----"); } |
输出结果如下,会在三秒后执行任务
简单的做个分析,实际上ScheduledThreadPoolExecutor继承了ThreadPoolExecutor类,不过它使用了自己实现的可延迟执行阻塞任务的队列DelayedWorkQueue。
在第二节中我们提到了Leader-Follower模式,ScheduledThreadPool采用了这种模式,等待第一个线程的任务被称为leader,其调用available.awaitNanos待当前队列头部任务到达调度时间时唤醒。其余线程作为follower只需调用await方法无限阻塞等待,直至被leader唤醒(signal),并重新完成抢锁->尝试执行队列首元素->抢leader->等待的循环。
2.再看下newWorkStealingPool,看下newWorkStealingPool的例子:
/** * @author wuzhiwen * @date 2021/5/29 18:10 * @desc 抢占式的线程池 */ public class WorkStealingPoolTest { // 线程数 private static final int threads = 10; // 用于计数线程是否执行完成 CountDownLatch countDownLatch = new CountDownLatch(threads); @Test public void test1() throws InterruptedException { System.out.println("---- start ----"); ExecutorService executorService = Executors.newWorkStealingPool(); int i; for (i = 0; i < threads; i++) { executorService.execute(new RunnableB(i, countDownLatch)); } countDownLatch.await(); System.out.println("---- end ----"); } |
/** * @author wuzhiwen * @date 2021/5/29 10:31 * @desc RunnableB */ public class RunnableB implements Runnable { private int i; CountDownLatch countDownLatch; public RunnableB(int i, CountDownLatch countDownLatch) { this.i = i; this.countDownLatch = countDownLatch; } @Override public void run() { try { System.out.println(Thread.currentThread().getName() + "=========" + i); } catch (Exception e) { System.out.println(e); } finally { countDownLatch.countDown(); } } |
看下输出
实际上底层使用了“工作窃取”算法来实现的,ForkJoinPool和其他Pool不大一样,每个工作线程都有自己的工作队列WorkQueue,且这个队列是双端队列,也就是说头尾都可以取元素,当其他线程空闲的时候,可以从其他线程的队列的尾部来窃取任务执行,而自己是从对头取,可以减少竞争(只剩一个元素时,还是有竞争的,使用了CAS)。push()/pop()仅在其所有者工作线程中调用,poll()是由其它线程窃取任务时调用的。
十五、ThreadPoolExecutor其他一些方法的源码分析
这里做个补充,大家有兴趣的可以看看,做个参考。
1.addWorkerFailed方法:
主要就是添加Worker后启动线程失败,会做失败后的处理:
- 如果worker已经建好了,就用workers中移除掉
- 原子递减核心线程数(因为在addWorker方法中先做了原子增加)
- 尝试结束线程池
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); // workers非线程安全的,要加锁 try { if (w != null) workers.remove(w); decrementWorkerCount(); // 减少线程数,因为前面已经加过了,失败要减掉 tryTerminate(); // 尝试结束线程 } finally { mainLock.unlock(); } } |
2.runWorker方法:
已经在第七小节的时候分析过了
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // 因为state初始化是-1,执行到这里设置为0,就可以中断了。 boolean completedAbruptly = true; try { // 当firstTask不为空时先处理它,之后去队列中获取 while (task != null || (task = getTask()) != null) { w.lock(); // 当线程池处于STOP以上的状态时,确保当前线程是中断状态;否则要保证当前线程不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 执行任务前,子类可实现此钩子方法,做一些加强处理 beforeExecute(wt, task); Throwable thrown = null; try { task.run(); //调用任务的run方法 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 执行任务后,子类可实现此钩子方法,做一些加强处理 afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { // 此方法会销毁线程(后面会分析) processWorkerExit(w, completedAbruptly); } } |
3.getTask方法:
private Runnable getTask() { // timeOut变量的值表示上次从阻塞队列中取任务时是否超时 boolean timedOut = false; // Did the last poll() time out?
for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 减少运行的线程数 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
int wc = workerCountOf(c); // timed变量用于判断是否需要进行超时控制。 // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时; // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量; // 对于超过核心线程数量的这些临时线程,要进行超时控制 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; }
try { // 阻塞获取 task,如果临时线程在 keepAliveTime 时间内未获取任务,说明超时了,此时 timedOut 为 true Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 如果 r == null,说明已经超时,timedOut设置为true timedOut = true; } catch (InterruptedException retry) { // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试 timedOut = false; } } } |
processWorkerExit方法:
主是用来修正workerCount和移除完成了的worker线程w
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1; // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //统计完成的任务数 completedTaskCount += w.completedTasks; // 从workers中移除,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } finally { mainLock.unlock(); }
// 根据线程池状态进行判断是否结束线程池 tryTerminate();
int c = ctl.get(); //当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;如果是正常退出的,在 wokerQueue 为非空的条件下,确保至少有一个线程在运行以执行 wokerQueue 中的任务 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } } |
tryTerminate方法:
tryTerminate方法根据线程池状态进行判断是否结束线程池,代码如下:
final void tryTerminate() { for (;;) { int c = ctl.get(); /* * 当前线程池的状态为以下几种情况时,直接返回: * 1. RUNNING,因为还在运行中,不能停止; * 2. TIDYING或TERMINATED,因为线程池中已经没有正在运行的线程了; * 3. SHUTDOWN并且等待队列非空,这时要执行完workQueue中的task; */ if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; // 如果线程数量不为0,则中断一个空闲的工作线程,并返回 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; }
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 这里尝试设置状态为TIDYING,如果设置成功,则调用terminated方法 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // terminated方法默认什么都不做,留给子类实现 terminated(); } finally { // 设置状态为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } } |
shutdown方法:
shutdown方法要将线程池切换到SHUTDOWN状态,并调用interruptIdleWorkers方法请求中断所有空闲的worker,最后调用tryTerminate尝试结束线程池。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全策略判断 checkShutdownAccess(); // 切换状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断空闲线程 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试结束线程池 tryTerminate();
|
shutdownNow方法:
public List List final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //加锁,worker不是线程安全的 try { checkShutdownAccess(); advanceRunState(STOP); // 中断所有工作线程,无论是否空闲 interruptWorkers(); // 取出队列中没有被执行的任务 tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }
|