一. 线程池概述
在介绍线程池之前,先说一下为什么在开发中要使用线程池,个人认为主要有一下的原因:
- 开发中使用线程的时候,大部分线程执行的时间很短,频繁的创建、启动、销毁线程带来不必要的资源消耗。
- 线程的创建、挂起、唤醒等操作都需要依靠CPU调度,大量的线程会频繁进行上下文切换,特别是任务执行时间短、任务数量多的情况,大量时间将花在上下文切换上,又耗时,又耗资源。
- 无法对任务做统一的管理、分配、和监控等。
线程池主要通过ThreadPoolExecutor来实现,在ThreadPoolExecutor类注释上有下面一段话(下面是翻译过来的):
线程池解决了两个不同的问题:由于减少了每个任务的调用开销,改进了执行大量异步任务的性能;并且它还提供了一种任务执行时限制和管理资源(包括线程)的方法。 同时每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。
二. 线程池的构造方法参数解析
构造方法中各个参数的含义
先上一张图,有个大概的印象。
- 这里任务加入可以看做是放入了两个地方,一是线程池中,另一个是任务队列中。
- 放入线程池的又分为两个部分,一是corePoolSize,暂且叫做核心线程(下图红色部分),另一部分是非核心线程(下图橙色部分),核心线程+非核心线程 <= maximumPoolSize。
参 数 | 含 义 |
---|---|
corePoolSize | 核心线程数,当向线程池中添加新任务的时候,如果此时线程数量小于corePoolSize,哪怕线程池中有空闲的线程,此时也会重新新建一个线程来处理这个任务请求。同时corePoolSize也是线程池中维持的线程数量,就算都是空闲线程的也会存在,除非设置了allowCoreThreadTimeOut。 |
maximumPoolSize | 线程池中最大的线程数量,当向线程池中添加新任务的时候如果此时线程数量大于corePoolSize,小于maximumPoolSize,并且队列已经满了,将会新建线程来处理这个任务请求。 |
keepAliveTime | 非核心线程在空闲后存活的是时间。 |
unit | keepAliveTime的单位 |
workQueue | 任务队列,常用的有三种:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue |
threadFactory | 创建线程的工厂,默认使用DefaultThreadFactory |
handler | 被拒绝后的处理,拒绝处理策略。 a. CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务 b. AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务 c. DiscardPolicy,直接抛弃任务,不做任何处理 d. DiscardOldestPolicy,去除任务队列中的第一个任务,重新提交 |
构造方法
1 | //使用默认的ThreadFactory和拒绝处理策略 |
三. 工作原理
其原理如下图所示:
当一个新加一个任务的时候执行如下步骤:
- 首先尝试新建核心线程启动任务
- 核心线程已经满了,尝试加入到队列中
- 队列已经满了,尝试新建非核心线程启动任务
- 非核心线程已满,执行拒绝策略
同时,当线程池中的工作线程执行完一个Task后会从队列中take任务执行。
四. UML类图
- AbstractExecutorService,提供了ExecutorService的默认实现。
- ExecutorService,定义了管理终止任务的一些方法以及让任务返回Future的方法。
- Executor,只定义了一个execute()方法,任务的提交执行。
- AbortPolicy、DiscardOldestPolicy、DiscardPolicy、CallerRunsPolicy,ThreadPoolExecutor内部定义的拒绝策略类。
- Worker,是实现线程池重要的内部类,其UML类图如下:
Worker类主要是运行任务的以及维护线程的中断控制状态,以及其他状态的记录。Worker类继承AbstractQueuedSynchronizer以简化每个任务执行时候的锁的获取和释放。
五. 源码分析
注意区分Worker和Taks,Worker是指工作线程,也就是用来执行任务的线程,Task是线程执行的任务(添加到队列中的就是任务)。
ThreadPoolExecutor关键变量
ThreadPoolExecutor使用一个原子integer(AtomicInteger)变量ctl(32位),来表示线程池的控制状态,这个状态值实际上由两部分组成:
- workerCount:有效的线程数,最大(2^29)-1,为ctl的低29位
- runState:线程池运行状态,保存在ctl高位3位。
1 | //ctl初始化的时候状态为RUNNING,workerCount为0 |
说一下上面线程池的几种状态:
- RUNNING:可以接受新的任务,并且处理在队列中的任务。
- SHUTDOWN:不再接受新的任务,但是会把队列中的任务处理完成。
- STOP:不再接受新的任务,也不处理队列中的任务,并且正在处理的任务会被中断。
- TIDYING:所有任务都终止了,线程要转换到TIDYING状态,需要运行terminated()钩子方法。
- TERMINATED:terminated()执行完成,到达这个状态时候awaitTermination()方法返回。
在ThreadPoolExecutor类注释中提到状态之间的转换情况:
- RUNNING -> SHUTDOWN:调用shutdown(),或者隐式调用finalize()。
- (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()。
- SHUTDOWN -> TIDYING:当线程池和队列都为空的时候。
- STOP -> TIDYING:线程池为空的时候。
- TIDYING -> TERMINATED:钩子方法terminated()执行完成。
execute(Runnable command)
execute(Runnable command),提交任务。
1 | public void execute(Runnable command) { |
分为3步来执行:
- 如果workerCount小于corePoolSize,将任务作为first Task新建线程来执行任务。
- 将任务添加到队列中,如果添加成功依然需要检查,在进入任务之前,如果线程池被关闭,那么将任务从队列中移除;如果当前线程池中没有工作线程,而刚刚在队列中加入了任务,要保证线程池中至少有一个工作线程可以处理任务。
- 如果不能将任务加入队列中,尝试新加一个线程来执行任务,但是并不定会成功,可能是线程池被shut down或者线程池达到了饱和(maximumPoolSize),如果失败了执行拒绝策略。
关于Worker的构造函数
在介绍addWorker(Runnable firstTask, boolean core)之前我们来看看Worker的构造函数。
1 | Worker(Runnable firstTask) { |
Worker实现了Runnable,这里将其构造为Thread赋值给thread。第一步中的setState(-1),这一步很有意思,将AQS中的同步状态设置为-1,到后面我们会看到,线程池使用AQS中的同步状态来判断该工作线程是否可以被中断。-1:初始化值,此时工作线程还没有启动,也没有中断的必要;0:表示接受中断,此时工作线程为空闲状态;1:表示此时工作线程正在执行任务。
addWorker(Runnable firstTask, boolean core)
检查根据当前线程池的工作状态和给定的界限限制(corePoolSize 和maximumPoolSize)是否可以添加新的工作线程。 如果添加了新的工作线程,workerCount会相应调整,并且如果可能的话将firstTask作为其第一个任务运行。 如果线程池停止(Stop)或关闭(ShutDown),此方法返回false。如果线程工厂未能创建线程,它也会返回false。如果线程创建失败,无论是由于线程工厂返回null还是由于异常(通常是Thread.start()中的OutOfMemoryError),都会进行回滚。
1 |
|
Worker中的run()
Worker的run()方法实际调用的是runWorker(this),启动工作线程(注意这里不是直接启动的任务),在工作线程中执行任务。
工作线程在这循环,反复的从队列中获取任务并执行它们。
- 启动了初始的任务,也就是最开始创建工作线程的时候的firstTask。否则,只要线程池还在运行中,使用getTask()从队列中获取任务来执行。如果改变了线程池的工作状态,或者工作参数,getTask()返回null,此时工作线程将退出。或者工作线程因为异常而退出,此时异常退出标记completedAbruptly为true,之后会通过processWorkerExit新建一个工作线程来替换它。
- 在运行所有任务之前,需要获取锁,来保证当任务在运行的时候不会被中断,除非线程池正在停止(Stop)
1 | public void run() { |
getTask()
从队列中获取Task,大概分为以下几个步骤。
- 判断线程池以及队列的状态,如果线程池状态在STOP以上,此时线程池不处理队列中的任务;或者线程池处于SHUTDOWN但是队列为空(SHUTDOWN不再接受新的任务),workerCount减1,返回null,注意此时只是将变量减1,其实工作线程并没有终止真正的终止在 processWorkerExit(w, completedAbruptly);中。
- 如果通过了状态检查,判断是否要进行线程回收,如果需要workerCount数量减1,成功后返回null。
- 根据timed(timed表示需要进行超时闲置线程回收),选择是限时等待还是阻塞的方式从队列中获取任务。
1 | private Runnable getTask() { |
说到getTask()顺便介绍几种队列:
- SynchronousQueue:Executors.newCachedThreadPool()中使用的队列。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。所以使用这个队列,任务会直接提交给线程处理,所以一般会把maximumPoolSize指定成Integer.MAX_VALUE。
- ArrayBlockingQueue:有界队列,队列可以限定长度,当添加新任务的时候,如果当前工作线程数量小于corePoolSize,则新建线程执行任务,否则如队列,以为队列是有限的,如果队列已经满了就会新建非核心线程执行任务,如果非核心线程也饱和了就会被拒绝,执行拒绝策略。
- LinkedBlockingQueue(不设置预定值):无界队列,因为是无界队列,所以maximumPoolSize这个属性的设定失效,线程池中的工作线程数用于不会大于corePoolSize。
processWorkerExit(Worker w, boolean completedAbruptly)
工作线程退出后的操作,在这里进行线程池的终止以及工作线程的回收。
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
tryTerminate()
尝试终止线程池。
1 | final void tryTerminate() { |
interruptIdleWorkers(boolean onlyOne)
中断闲置工作线程。重点是如何判断哪些是闲置的工作线程,并且可以被中断。满足 !t.isInterrupted() && w.tryLock()
:
- !t.isInterrupted()表示没有被中断。
- w.tryLock()表示能获取到锁,获取到锁AQS的同步状态为0,表示工作线程闲置。
1 | //tru表示只中断一个线程 |
shutdown()
调用shutdown(),拒绝接受新的任务加入,会将正在执行的任务以及等待执行的任务执行完成,终止所有的闲置工作线程。
1 | public void shutdown() { |
shutdownNow()
调用shutdownNow()会尝试停止所有执行的任务,也不会对队列中的等待任务进行处理,并返回等待执行的任务列表,它会终止所有的工作线程,与shutdown()不同,shutdown()只会终止所有的空闲工作线程。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有Shutdown权限
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//终止所有工作线程
interruptWorkers();
//获取等待执行的任务列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池。
tryTerminate();
return tasks;
}
interruptWorkers() 以及 interruptIfStarted()
终止所有的工作线程。主要看看interruptIfStarted()中是如何判断哪些工作线程是可以被终止的,满足 getState() >= 0 && (t = thread) != null && !t.isInterrupted()
:
- getState() >= 0 获取CAS同步状态,上面说到同步状态初始化为-1,-1表示不可以被中断(可以看做当前工作线程没有启动),0表示闲置工作线程,1表示正在执行任务的工作线程。
- (t = thread) != null 这个不用说了,为空还终止个啥。
- !t.isInterrupted(),没有终止,避免重复调用。
1 | private void interruptWorkers() { |