一. 概述
AQS(AbstractQueuedSynchronizer),队列同步器,Java中很多同步类都是基于AQS实现的,比如:ReentrantLock、Semaphore、CountDownLatch等。
AQS提供了一个实现阻塞锁和相关同步组件的框架,这个框架实现依赖于FIFO(first-in-first-out,先进先出)的等待队列来完成获取资源线程的排队工作。AQS是绝大多数同步器的基础,它的内部使用一个 int 型的变量表示同步状态(资源状态),AQS并不维护这个状态的值,只是提供了一系列的原子更新方法,getState、setState、compareAndSetState,而由继承AQS的子类去重写特定定的方法实现对共享资源的获取和释放,而其他的比如线程排队、线程挂起、线程唤醒出队等已经在AQS中实现好了(典型的设计模式中模板方法模式的使用)。
AQS定义了两种资源共享的方式:
- Exclusive:独占式,只有一个线程能获取资源并执行,比如ReentrantLock。
- Share:共享式,多个线程获取资源,多个线程可以同时执行,比如CountDownLatch。
上面提到过AQS需要继承它的子类去重新特定的方法,而不同方式(共享和独占)需要重写的方法也不一样,下面来看看AQS中定义的可以重写的方法:
- protected boolean tryAcquire(int arg) : 独占式获取同步状态,tru表示获取成功,false表示获取失败,获取成功后其他线程需要等待该线程释放同步状态后才能获取同步状态。
- protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态。
- protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0表示获取成功;否则获取失败。
- protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false。
- protected boolean isHeldExclusively() : 当前同步器是是否在独占模式下被线程占用,使用conditions的时候才需要覆盖该方法。
看看tryAcquire(int arg),发现AQS中并没有把其定义为抽象方法,而是抛出UnsupportedOperationException异常,像上面所说的,不同的共享方式覆盖特定的方法,而不用实现其所不需要的方法,提供了灵活性。1
2
3protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
二. FIFO等待队列的实现
在具体分析之前,先看看AQS是如何实现FIFO的等待队列的,AQS的等待队列是”CLH” (Craig, Landin, and Hagersten) 队列的一种变体,关于”CLH”锁可以参照——自旋锁&CLH锁&MCS锁学习记录。在每个节点中保存了前后节点的引用,节点中的“waitStatus”字段用于表示线程的状态。节点的前驱节点在释放资源的时候发出信号,通知节点可以竞争资源。如果线程是队列中的第一个线程,则可能尝试获取资源,但是并不保证一定成功,队列中的第一个线程只是具有了竞争资源的权利。
AQS注释中给出的CLH队列结构如下:
1 | +------+ prev +-----+ +-----+ |
- head:head最开始是一个空节点,之后可以认为是当前获取到资源的线程所在的节点。
- prev:节点持有的前驱节点的引用(其实AQS代码中还有一个next,持有对下一个节点的引用,所以AQS中的等待队列是一个双向的队列)。
- tial:资源竞争失败会插入到队列,而tail始终指向最后插入队列的节点。
下面是AQS中等待队列实现的部分代码:
1 | static final class Node { |
- 使用volatile修饰变量,保证变量在多线程之间的可见性
waitStatus表示线程的状态:
- CANCELLED:线程被取消。(非负值)
- SIGNAL:等待触发,只有当前节点的前驱节点状态为SIGNAL,当前节点的线程才可以被挂起。
- CONDITION:等待条件状态。
- PROPAGATE:状态需要向后传播。
- 0:普通的同步器初始化为0。
- 前驱节点prev和后继节点next
三. 独占式
在AQS类注释中,有一段独占锁代码的实现,以下面的例子来分析AQS的源码:
1 | package io.github.brightloong.concurrent.aqs; |
lock——加锁
看上面的代码,加锁lock()函数,调用的是sync.acquire(1),从acquire(int arg)函数入手,依次分析加锁过程中涉及到的函数代码。
acquire(int arg)
1 | public final void acquire(int arg) { |
- tryAcquire(arg)获取资源,获取失败执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg))进行等待
- addWaiter(Node.EXCLUSIVE)将独占模式节点添加到等待队列中,添加成功后返回节点
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))节点在队列中获取资源,获取不成功,阻塞节点,直到被唤醒,返回中断状态
- 如果发生了中断,进行中断补偿
addWaiter(Node mode)
用于将当前线程加入到等待队列中,并返回当前节点。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19private Node addWaiter(Node mode) {
//按照给定的方式构造队列,上面提到的EXCLUSIVE(独占模式)和Share(共享模式)
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//快速入队,尝试将节点放到队尾,获取前驱节点,如果获取到了表示有线程占用了资源。
Node pred = tail;
if (pred != null) {
//将新建节点的前驱节点设置为获取到的前驱节点
node.prev = pred;
//原子更新tail,将tail更新为当前节点,可能更新失败,因为在执行step1前可能有其他线程更新了tail
if (compareAndSetTail(pred, node)) {//step1
pred.next = node;
return node;
}
}
//如果上面的操作执行失败,执行enq(node)
enq(node);
return node;
}
- 按照给定的方式构造节点,上面提到的EXCLUSIVE(独占模式)和Share(共享模式)。
- 快速入队,如果失败通过 enq(node) 入队(full enq)。
enq(final Node node)
将当前线程对应的节点添加到等待队列中,不断循环直到添加成功。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private Node enq(final Node node) {
//不断循环,直到节点成功添加到队列中。
for (;;) {
Node t = tail;
//tail等于null,表示当前资源没有被占用
if (t == null) {
//原子操作,初始化head节点,操作可能失败,因为可能有其他线程在这个时候已经初始化成功了。
if (compareAndSetHead(new Node()))
//成功后将tail指向head
tail = head;
} else {
//如果tail不等于null,将当前节点的前驱节点设置为tail。
node.prev = t;
//将tail原子更新成当前节点,可能失败,因为tail可能被其他线程更新。
if (compareAndSetTail(t, node)) {
//构建为双向队列
t.next = node;
//添加成功,放回当前节点。
return t;
}
}
}
}
- 判断tail节点是否为空,如果为空执行1,不为空执行2。
- 如果tail节点为空,初始化head节点,并将tail节点指向head节点,执行1。
- 如果tail节点不为空,尝试将当前节点添加到队尾,添加成功返回当前节点,否则执行1,直到添加成功。
acquireQueued(final Node node, int arg)
在队列中尝试获取资源,获取失败后判断是否真正需要进入阻塞状态,如果是将阻塞线程,直到被唤醒,并返回中断状态。不断循环,直到获取到资源或者进入阻塞状态等待被唤醒。
1 | final boolean acquireQueued(final Node node, int arg) { |
shouldParkAfterFailedAcquire(Node pred, Node node)
获取资源失败后判断线程是否需要真正进入阻塞,只有在前驱节点waitStatus值为SIGNAL,当前节点的线程才需要进入阻塞。
1 | private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { |
parkAndCheckInterrupt()
终于,在这里线程进入了阻塞,调用 LockSupport.park(this)阻塞线程
1 | private final boolean parkAndCheckInterrupt() { |
流程图
获取资源的流程如下所示,可以看到流程中有两个循环。
releaseLock——释放锁
从release(int arg)函数开始,一步步分析独占方式锁的释放。
release(int arg)
1 | public final boolean release(int arg) { |
unparkSuccessor(Node node)
唤醒后继节点
1 | private void unparkSuccessor(Node node) { |
四. 共享式
AQS类注释中同样提供了一个简单的实现:
1 | package io.github.brightloong.lab.concurrent.cas; |
和CountDownLatch很类似可以多个线程await(),但是只需要调用一次signal() 就可以启动阻塞的线程。
共享模式从 acquireShared(int arg)入手来进行分析,与独占模式不同的是,共享模式下同一时刻可以有多个线程获取到资源执行。
acquireShared(int arg)
获取同步状态。
1 | public final void acquireShared(int arg) { |
使用tryAcquireShared(arg) 获取资源状态
- 返回值大于0,获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
- 返回值等于0时,获取同步状态成功,但没有可用同步状态了;
- 返回值小于0时,表示获取同步状态失败。
获取失败执行doAcquireShared(arg)
doAcquireShared(int arg)
1 | private void doAcquireShared(int arg) { |
- 使用addWaiter(Node.SHARED),添加到等待队列中。
- 在队列中获取资源,获取成功后因为是共享式,如果还有资源可用,向后传播,唤醒后继节点。
- 获取失败,和独占式一样,检查是否真正需要进入阻塞等待被唤醒或者中断,如果是进入阻塞,如果不是继续循环。
setHeadAndPropagate(Node node, int propagate)
1 | private void setHeadAndPropagate(Node node, int propagate) { |
因为获取资源成功,将当前节点设置为head,并唤醒后继节点
doReleaseShared()
相对的分析一下 acquireShared(int arg)
1 | private void doReleaseShared() { |
- compareAndSetWaitStatus(h, Node.SIGNAL, 0)执行失败后继续循环,因为这个函数本身就会被多个线程调用(release和Acquire也是同时),所以状态很有可能被其他线程更改。
- compareAndSetWaitStatus(h, 0, Node.PROPAGATE)和上面同理
releaseShared(int arg)
1 | public final boolean releaseShared(int arg) { |
- 调用tryReleaseShared(arg)释放资源,成功返回true
- 调用doReleaseShared()唤醒后继节点并进行传播
其他
关于AQS中的ConditionObject,之后再分析