BrightLoong's Blog

ThreadPoolExecutor源码解析

ThreadPoolExecutor

一. 线程池概述

在介绍线程池之前,先说一下为什么在开发中要使用线程池,个人认为主要有一下的原因:

  • 开发中使用线程的时候,大部分线程执行的时间很短,频繁的创建、启动、销毁线程带来不必要的资源消耗。
  • 线程的创建、挂起、唤醒等操作都需要依靠CPU调度,大量的线程会频繁进行上下文切换,特别是任务执行时间短、任务数量多的情况,大量时间将花在上下文切换上,又耗时,又耗资源。
  • 无法对任务做统一的管理、分配、和监控等。

线程池主要通过ThreadPoolExecutor来实现,在ThreadPoolExecutor类注释上有下面一段话(下面是翻译过来的):

线程池解决了两个不同的问题:由于减少了每个任务的调用开销,改进了执行大量异步任务的性能;并且它还提供了一种任务执行时限制和管理资源(包括线程)的方法。 同时每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

二. 线程池的构造方法参数解析

构造方法中各个参数的含义

先上一张图,有个大概的印象。

threadPool

  • 这里任务加入可以看做是放入了两个地方,一是线程池中,另一个是任务队列中。
  • 放入线程池的又分为两个部分,一是corePoolSize,暂且叫做核心线程(下图红色部分),另一部分是非核心线程(下图橙色部分),核心线程+非核心线程 <= maximumPoolSize。
参 数 含 义
corePoolSize 核心线程数,当向线程池中添加新任务的时候,如果此时线程数量小于corePoolSize,哪怕线程池中有空闲的线程,此时也会重新新建一个线程来处理这个任务请求。同时corePoolSize也是线程池中维持的线程数量,就算都是空闲线程的也会存在,除非设置了allowCoreThreadTimeOut。
maximumPoolSize 线程池中最大的线程数量,当向线程池中添加新任务的时候如果此时线程数量大于corePoolSize,小于maximumPoolSize,并且队列已经满了,将会新建线程来处理这个任务请求。
keepAliveTime 非核心线程在空闲后存活的是时间。
unit keepAliveTime的单位
workQueue 任务队列,常用的有三种:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue
threadFactory 创建线程的工厂,默认使用DefaultThreadFactory
handler 被拒绝后的处理,拒绝处理策略。
a. CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务
b. AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务
c. DiscardPolicy,直接抛弃任务,不做任何处理
d. DiscardOldestPolicy,去除任务队列中的第一个任务,重新提交

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
//使用默认的ThreadFactory和拒绝处理策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

//使用默认的拒绝处理策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

//使用默认的ThreadFactory
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}

//传入所有参数,并且对参数进行校验
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

三. 工作原理

其原理如下图所示:

theory

当一个新加一个任务的时候执行如下步骤:

  1. 首先尝试新建核心线程启动任务
  2. 核心线程已经满了,尝试加入到队列中
  3. 队列已经满了,尝试新建非核心线程启动任务
  4. 非核心线程已满,执行拒绝策略

同时,当线程池中的工作线程执行完一个Task后会从队列中take任务执行。

四. UML类图

UML

  • AbstractExecutorService,提供了ExecutorService的默认实现。
  • ExecutorService,定义了管理终止任务的一些方法以及让任务返回Future的方法。
  • Executor,只定义了一个execute()方法,任务的提交执行。
  • AbortPolicy、DiscardOldestPolicy、DiscardPolicy、CallerRunsPolicy,ThreadPoolExecutor内部定义的拒绝策略类。
  • Worker,是实现线程池重要的内部类,其UML类图如下:

Worker

Worker类主要是运行任务的以及维护线程的中断控制状态,以及其他状态的记录。Worker类继承AbstractQueuedSynchronizer以简化每个任务执行时候的锁的获取和释放。

五. 源码分析

注意区分Worker和Taks,Worker是指工作线程,也就是用来执行任务的线程,Task是线程执行的任务(添加到队列中的就是任务)。

ThreadPoolExecutor关键变量

ThreadPoolExecutor使用一个原子integer(AtomicInteger)变量ctl(32位),来表示线程池的控制状态,这个状态值实际上由两部分组成:

  1. workerCount:有效的线程数,最大(2^29)-1,为ctl的低29位
  2. runState:线程池运行状态,保存在ctl高位3位。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//ctl初始化的时候状态为RUNNING,workerCount为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

//workerCount位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//workerCount容量,(2^29)-1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;

// runState 的状态
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;


// 获取runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

说一下上面线程池的几种状态:

  • RUNNING:可以接受新的任务,并且处理在队列中的任务。
  • SHUTDOWN:不再接受新的任务,但是会把队列中的任务处理完成。
  • STOP:不再接受新的任务,也不处理队列中的任务,并且正在处理的任务会被中断。
  • TIDYING:所有任务都终止了,线程要转换到TIDYING状态,需要运行terminated()钩子方法。
  • TERMINATED:terminated()执行完成,到达这个状态时候awaitTermination()方法返回。

在ThreadPoolExecutor类注释中提到状态之间的转换情况:

  • RUNNING -> SHUTDOWN:调用shutdown(),或者隐式调用finalize()。
  • (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()。
  • SHUTDOWN -> TIDYING:当线程池和队列都为空的时候。
  • STOP -> TIDYING:线程池为空的时候。
  • TIDYING -> TERMINATED:钩子方法terminated()执行完成。

execute(Runnable command)

execute(Runnable command),提交任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果线程池状态为Running(其他状态拒绝添加新任务到队列)将任务添加到队列中。
if (isRunning(c) && workQueue.offer(command)) {
//添加成功后进行recheck
int recheck = ctl.get();
//如果当前状态不是Running,从队列中移除任务。
if (! isRunning(recheck) && remove(command))
reject(command);
//刚刚在队列中加入了任务,保证线程池中至少有一个工作线程可以处理任务。
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//
else if (!addWorker(command, false))
reject(command);
}

分为3步来执行:

  1. 如果workerCount小于corePoolSize,将任务作为first Task新建线程来执行任务。
  2. 将任务添加到队列中,如果添加成功依然需要检查,在进入任务之前,如果线程池被关闭,那么将任务从队列中移除;如果当前线程池中没有工作线程,而刚刚在队列中加入了任务,要保证线程池中至少有一个工作线程可以处理任务。
  3. 如果不能将任务加入队列中,尝试新加一个线程来执行任务,但是并不定会成功,可能是线程池被shut down或者线程池达到了饱和(maximumPoolSize),如果失败了执行拒绝策略。

关于Worker的构造函数

在介绍addWorker(Runnable firstTask, boolean core)之前我们来看看Worker的构造函数。

1
2
3
4
5
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

Worker实现了Runnable,这里将其构造为Thread赋值给thread。第一步中的setState(-1),这一步很有意思,将AQS中的同步状态设置为-1,到后面我们会看到,线程池使用AQS中的同步状态来判断该工作线程是否可以被中断。-1:初始化值,此时工作线程还没有启动,也没有中断的必要;0:表示接受中断,此时工作线程为空闲状态;1:表示此时工作线程正在执行任务。

addWorker(Runnable firstTask, boolean core)

检查根据当前线程池的工作状态和给定的界限限制(corePoolSize 和maximumPoolSize)是否可以添加新的工作线程。 如果添加了新的工作线程,workerCount会相应调整,并且如果可能的话将firstTask作为其第一个任务运行。 如果线程池停止(Stop)或关闭(ShutDown),此方法返回false。如果线程工厂未能创建线程,它也会返回false。如果线程创建失败,无论是由于线程工厂返回null还是由于异常(通常是Thread.start()中的OutOfMemoryError),都会进行回滚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90

/**
*@Param core:如果为true把corePoolSize作为解析,否则把maximumPoolSize作为界限。
*
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

//将下面的判断拆开来看。
//rs >= SHUTDOWN,在这个前提下,需要满足:
//! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
//上面的可以这样理解,只有在同时满足三个条件的情况下才会返回false,所以话句话说,只要有一个不满足,就会返回true
//将上面的判断修改为 (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())
//rs != SHUTDOWN:基于rs >= SHUTDOWN的前提下,当rs != SHUTDOWN,不再创建新的线程。
//firstTask != null:基于rs >= SHUTDOWN的前提下,不再接受新的任务。
//workQueue.isEmpty():任务队列为空,不用再创建新的工作线程来处理任务。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
//检查工作线程数是否超过了界限,超过了返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用CAS修改workerCount,加1,成功跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
//+1操作失败,其他任务添加修改了workerCount值,继续循环。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}

//工作线程是否启动
boolean workerStarted = false;

//工作线程是否添加成功
boolean workerAdded = false;
Worker w = null;
try {
//新加一个工作线程,Worker本身也实现了Runabble
w = new Worker(firstTask);

//这路获取到的其实是使用Worker生成的Thread
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {

//在获取到锁之后再次检查状态
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();

//添加到工作线程组中
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
//记录当前工作线程总数
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加工作线程成功。
if (workerAdded) {
//启动工作线程,其实执行的是Worker中的run()
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

Worker中的run()

Worker的run()方法实际调用的是runWorker(this),启动工作线程(注意这里不是直接启动的任务),在工作线程中执行任务。

工作线程在这循环,反复的从队列中获取任务并执行它们。

  1. 启动了初始的任务,也就是最开始创建工作线程的时候的firstTask。否则,只要线程池还在运行中,使用getTask()从队列中获取任务来执行。如果改变了线程池的工作状态,或者工作参数,getTask()返回null,此时工作线程将退出。或者工作线程因为异常而退出,此时异常退出标记completedAbruptly为true,之后会通过processWorkerExit新建一个工作线程来替换它。
  2. 在运行所有任务之前,需要获取锁,来保证当任务在运行的时候不会被中断,除非线程池正在停止(Stop)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public void run() {
runWorker(this);
}

final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
//获取任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 将AQS同步状态设置为0(初始值为-1),表示当前工作线程为闲置状态,可以被中断了。
//异常退出标记。
boolean completedAbruptly = true;
try {
//getTask()从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
//再次检查工作线程状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行前的钩子方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
//注意区分和start(),调用run()并不会另启线程,而是在当前线程中执行,所以任务其实是在工作线程中执行的。
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行后的钩子方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
//工作线程退出后的操作。
processWorkerExit(w, completedAbruptly);
}
}

getTask()

从队列中获取Task,大概分为以下几个步骤。

  1. 判断线程池以及队列的状态,如果线程池状态在STOP以上,此时线程池不处理队列中的任务;或者线程池处于SHUTDOWN但是队列为空(SHUTDOWN不再接受新的任务),workerCount减1,返回null,注意此时只是将变量减1,其实工作线程并没有终止真正的终止在 processWorkerExit(w, completedAbruptly);中。
  2. 如果通过了状态检查,判断是否要进行线程回收,如果需要workerCount数量减1,成功后返回null。
  3. 根据timed(timed表示需要进行超时闲置线程回收),选择是限时等待还是阻塞的方式从队列中获取任务。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// 如果线程池已经关闭(STOP状态不再处理队列中的任务),或者队列为空,workerCount减1,返回空。
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// allowCoreThreadTimeOut,是否允许回收核心线程。
// timed表示需要进行超时闲置线程回收。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

//1. (wc > maximumPoolSize || (timed && timedOut):如果工作线程数量大于maximumPoolSize,或者闲置线程超时。
//2. (wc > 1 || workQueue.isEmpty()):队列不为空时,至少需要保留一个工作线程来处理队列中的任务
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//workerCount数量减1,CAS操作,失败了会不断循环。
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
//根据timed,选择是限时等待还是阻塞的方式从队列中获取任务。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 一段时间拿不到返回null,表示超时。
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

说到getTask()顺便介绍几种队列:

  • SynchronousQueue:Executors.newCachedThreadPool()中使用的队列。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。所以使用这个队列,任务会直接提交给线程处理,所以一般会把maximumPoolSize指定成Integer.MAX_VALUE。
  • ArrayBlockingQueue:有界队列,队列可以限定长度,当添加新任务的时候,如果当前工作线程数量小于corePoolSize,则新建线程执行任务,否则如队列,以为队列是有限的,如果队列已经满了就会新建非核心线程执行任务,如果非核心线程也饱和了就会被拒绝,执行拒绝策略。
  • LinkedBlockingQueue(不设置预定值):无界队列,因为是无界队列,所以maximumPoolSize这个属性的设定失效,线程池中的工作线程数用于不会大于corePoolSize。

processWorkerExit(Worker w, boolean completedAbruptly)

工作线程退出后的操作,在这里进行线程池的终止以及工作线程的回收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void processWorkerExit(Worker w, boolean completedAbruptly) {
//如果是异常退出,workerCount数量减1
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();

final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
//从工作线程组中移除工作线程
workers.remove(w);
} finally {
mainLock.unlock();
}

//尝试终止线程池
tryTerminate();

int c = ctl.get();
if (runStateLessThan(c, STOP)) {
//如果是异常退出,直接新加一个工作线程。
if (!completedAbruptly) {
//如何设置了allowCoreThreadTimeOut,最小工作线程数为0,否则为corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果最小工作线程数为0,而队列又不为空,说明至少要保留一个工作线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果当前工作线程数还大于最小工作线程数,直接返回,不添加新的工作线程
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}

tryTerminate()

尝试终止线程池。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final void tryTerminate() {
for (;;) {
int c = ctl.get();
//以下三种情况表示不需要终止线程池。
//1. isRunning(c):处于Running状态。
//2. runStateAtLeast(c, TIDYING):TIDYING或者TERMINATED,已经在关闭了,不用重复关闭。
//3. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()):线程池处于SHUTDOWN状态,但是队列中还有任务没处理完。
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;

//如果还有工作线程,终止一个空闲的工作线程后退回,这个时候表示并不想终止线程池。
if (workerCountOf(c) != 0) { // Eligible to terminate
//只终止一个工作线程
interruptIdleWorkers(ONLY_ONE);
return;
}

//下面的操作是在终止线程池,修改线程池状态。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//设置为TIDYING状态,调用terminated()钩子方法
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//终止操作完成,设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
// 释放在termination条件上等待的所有线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// CAS操作失败后不断的循环。
}
}

interruptIdleWorkers(boolean onlyOne)

中断闲置工作线程。重点是如何判断哪些是闲置的工作线程,并且可以被中断。满足 !t.isInterrupted() && w.tryLock() :

  1. !t.isInterrupted()表示没有被中断。
  2. w.tryLock()表示能获取到锁,获取到锁AQS的同步状态为0,表示工作线程闲置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//tru表示只中断一个线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//如果没有被中断,并且能成功获取到锁(获取到锁AQS的同步状态为0,表示工作线程闲置),那么就进行中断,不能获取到锁表示队列中有任务正在执行。
if (!t.isInterrupted() && w.tryLock()) {
try {
//进行中断操作。
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
//如果只中断一个,中断一个跳出。
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

shutdown()

调用shutdown(),拒绝接受新的任务加入,会将正在执行的任务以及等待执行的任务执行完成,终止所有的闲置工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有Shutdown权限
checkShutdownAccess();
//设置线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//关闭闲置的工作线程,最后调用的是interruptIdleWorkers(false)
interruptIdleWorkers();
//钩子方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//终止
tryTerminate();
}

shutdownNow()

调用shutdownNow()会尝试停止所有执行的任务,也不会对队列中的等待任务进行处理,并返回等待执行的任务列表,它会终止所有的工作线程,与shutdown()不同,shutdown()只会终止所有的空闲工作线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有Shutdown权限
checkShutdownAccess();
//设置线程池状态为STOP
advanceRunState(STOP);
//终止所有工作线程
interruptWorkers();
//获取等待执行的任务列表
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池。
tryTerminate();
return tasks;
}

interruptWorkers() 以及 interruptIfStarted()

终止所有的工作线程。主要看看interruptIfStarted()中是如何判断哪些工作线程是可以被终止的,满足 getState() >= 0 && (t = thread) != null && !t.isInterrupted() :

  1. getState() >= 0 获取CAS同步状态,上面说到同步状态初始化为-1,-1表示不可以被中断(可以看做当前工作线程没有启动),0表示闲置工作线程,1表示正在执行任务的工作线程。
  2. (t = thread) != null 这个不用说了,为空还终止个啥。
  3. !t.isInterrupted(),没有终止,避免重复调用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}

void interruptIfStarted() {
Thread t;
//getState()用于判断线程是否启动,具体看上面的解释。
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
坚持原创技术分享,您的支持将鼓励我继续创作!