线程同步器AQS源码简析
线程同步器AQS
底层实现(以公平锁为例)
锁执行lock对象时,实际上是调用的Sync对象的方法,而Sync又继承自AbstractQueuedSynchronizer(队列同步器AQS)
既然是AQS中的Q是Queued,那么它自然需要维护一个队列
对于每个节点,他有几个比较重要的字段和方法
- Prev和Next,显而易见是指向前序节点和后续节点的指针
- status:表示节点不同的状态
- thread:记录被封装到该节点内的线程
1 | //每个处于等待状态的线程都可以是一个节点,并且每个节点是有很多状态的 |
在一开始的时候,head
和tail
都是null
,state
为默认值0
继续看AQS初始化的其他内容:
1 | //直接使用Unsafe类进行操作 |
可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。
CAS类提供了一些可重写的方法,同时也为独占式和非独占式锁都提供了对应的方法,已经一些已经写好的模板方法(他们会调用重写的方法)。
首先看一些可重写方法:
1 | //独占式获取同步状态,查看同步状态是否和参数一致,如果返没有问题,那么会使用CAS操作设置同步状态并返回true |
可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException
,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:
1 | static final class FairSync extends Sync { |
我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire()
,我们来看看它在AQS类中的实现细节:
1 | //这个是JEP 270添加的新注解,它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出,下同 |
其中的tryAcquire()方法
1 | static final class FairSync extends Sync { |
在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。
接着我们看addWaiter(Node.EXCLUSIVE)
1 | private Node addWaiter(Node mode) { |
再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
1 |
|
上面是获取锁
释放锁其实也是类似的,在释放的过程中,需要唤醒队列中下一个结点中的线程,然后还要维护AQS中的状态(删除挂起的队列,减少等待队列中节点数量)
还记得JVM的垃圾回收器吗,这里将结点设置为空,然后把指向他的指针知道别的地方,他稍后就会被垃圾回收器回收
具体的代码也贴一下吧
unlock()
方法是在AQS中实现的:
1
2
3 public void unlock() {
sync.release(1); //直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
1
2
3
4
5
6
7
8
9
10
public final boolean release(int arg) {
if (tryRelease(arg)) { //和tryAcquire一样,也得子类去重写,释放锁操作
Node h = head; //释放锁成功后,获取新的头结点
if (h != null && h.waitStatus != 0) //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)
unparkSuccessor(h); //唤醒头节点下一个节点中的线程
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 private void unparkSuccessor(Node node) {
// 将等待状态waitStatus设置为初始值0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//获取下一个结点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的,这时就要遍历所有节点再另外找一个符合unpark要求的节点了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //这里是从队尾向前,因为enq()方法中的t.next = node是在CAS之后进行的,而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //要是找到了,就直接unpark,要是还是没找到,那就算了
LockSupport.unpark(s.thread);
}那么我们来看看
tryRelease()
方法是怎么实现的,具体实现在Sync中:
1
2
3
4
5
6
7
8
9
10
11
12
13
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //先计算本次解锁之后的状态值
if (Thread.currentThread() != getExclusiveOwnerThread()) //因为是独占锁,那肯定这把锁得是当前线程持有才行
throw new IllegalMonitorStateException(); //否则直接抛异常
boolean free = false;
if (c == 0) { //如果解锁之后的值为0,表示已经完全释放此锁
free = true;
setExclusiveOwnerThread(null); //将独占锁持有线程设置为null
}
setState(c); //状态值设定为c
return free; //如果不是0表示此锁还没完全释放,返回false,是0就返回true
}
这样就大概讲完了可重入锁的公平锁实现,实际上也不是无法理解的(迫真
下面是讲师总结的流程图
这是我自己总结的
公平锁真的公平吗?
在并发的情况下,公平锁是有概率变得不公平的
对于每个需要尝试获取锁的进程,他们都会先执行tryAcquire()来尝试获取锁,在尝试获取锁的过程中会先判断在队列中是否有节点处于等待队列,然后一旦发现没有,就会执行CAP操作
现在假设有线程1,线程2和线程3;线程1已经拿到锁了,这时线程2开始尝试获取锁,它发现虽然等待队列是空的,但是CAS操作失败,显然有线程在用锁,这时他开始准备排队了;
现在,突然线程3也开始尝试获取锁,恰巧在这时线程1释放锁了,线程3说:“消息队列没人在排队,CAS也没人在用,那我就不客气了”,于是线程3顺利地拿到了锁,实现了插队的目的。线程2:“我转个身怎么前面大哥换人了?”
虽然概率很低,但高并发的情况也不是遇不见这样离谱的情况。
所以,严格来讲,公平锁只有在等待队列非空的时候才是公平的,就算是公平,也不是按照发请求的先后的公平,而是按照进入等待队列的时间的公平。
Condition实现原理
之前我们看了Condition,了解了它实际上就是替代传统对象实现wait/notify操作(在Condition中是await/signal操作)的,并且同一把锁可以创建多个Condition对象,我们现在对Condition对象进行解析
在AQS中,Condition类有一个实现类ConditionObject,其同样使用链表实现了条件队列
这里的条件队列能允许线程在某些条件不满足的情况下先进入等待状态,并且等待被唤醒;在某个对象进入调用wait()后会被放入条件队列,等待notify()唤醒以争夺锁
1 | public class ConditionObject implements Condition, java.io.Serializable { |
这里条件队列直接借用了AQS的Node类,但是使用的是Node类中的nextWaiter字段来连接节点,并且Node的状态设置为CONDITION(处于条件队列中)
当一个线程调用await()方法时,会进入等待状态(进入条件队列),直到其他线程使用signal()方法将其唤醒(进入AQS的等待队列)
下面将会研究await()方法的实现,这里先明确这个方法的目标
- 仅有已经持有锁的方法才能够调用await
- 当调用await方法后,无论加了多少次锁,都会直接释放锁
- 只有其他线程调用signal或者是被中断时,才会唤醒等待中的线程
- 被唤醒的线程依然需要等待其他线程释放锁,并且等真正抢到锁以后才会继续执行,并且会恢复到await()时的状态(和await时一样的锁层数)
1 | public final void await() throws InterruptedException { |
简而言之
- 首先判断当前调用的线程是否处于中断的状态,若已在中断状态了,那么还谈什么等待状态呢,直接抛异常
- 在条件队列中加入一个新的节点,并保存当前线程的各种状态
- 循环判断当前线程是否还处于条件队列中,并且在循环里监视有没有在等待时被中断(被中断了一样会醒)
- 当跳出循环,表示当前线程一定是醒了,现在只需要拿到锁就可以开始运行了
- 在拿到锁后进行收尾工作,打扫一下等待队列,再回头看一眼有没有被中断,之后就正式开始执行自己的任务了
实际上await()
方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()
方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:
- 只有持有锁的线程才能唤醒锁所属(等着这把锁)Condition的的线程
- 优先唤醒条件队列队首,若出现问题,就按顺序往下找,直到找到可以唤醒的
- 唤醒的本质是将线程从条件队列中移出至等待队列
- 拿到锁之后,线程才能恢复运行
1 | public final void signal() { |
1 | private void doSignal(Node first) { |
1 | final boolean transferForSignal(Node node) { |
这里其实思路也不是很复杂, 跟之前消费等待队列的思路大差不差
先判断调用signal的线程的状态,是不是有锁,没锁你唤醒个锤子
若持有锁,就从条件队列取队首,并进行消费,不断循环判断该节点是否为取消状态,若是则继续看下一个节点;若发现条件队列都空了,就要进行一些收尾工作
消费等待队列结点的过程:
- 先进行CAS判断,若成功了表明已经有资格进入等待队列了
- 将线程加入等待队列,并且获取前驱节点的等待状态
- 若上一个节点的状态为取消(上一个节点都被取消了还管他干啥,直接开抢), 或者尝试设置上一个节点的状态为SIGNAL时失败(想把我挂起?没门,开抢!)
其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?
这里其实是为了进行优化而编写,直接unpark会有两种情况:
- 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
- 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws >0之后 !compareAndSetWaitStatus(p, ws,Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws,Node.SIGNAL) == false
如果这里被提前unpark,那么在await()
方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。
大致流程如下
其实不难理解吧。。就是概念比较多,讲师带着顺一遍其实基本能跑通思路
参考视频:Java JUC 并发编程 已完结(IDEA 2021版本)4K蓝光画质 玩转多线程
视频教程文档:柏码-JUC笔记(二)