Java JUC 笔记(2)

锁框架

JDK5以后增加了Lock接口用来实现锁功能,其提供了与synchronized类似的同步功能,但是在使用时手动的获取和释放锁

Lock和Condition锁

这里的锁与synchronized锁不太一样,我们可以认为是Lock一把真正意义上的锁,每个锁都是一个对应的锁对象,我们在应用时只需要对这个锁对象进行上锁或者解锁的操作即可。我们首先来看看,此接口中定义了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface Lock {
//获取锁,拿不到锁会阻塞,等待其他线程释放锁,获取到锁后返回
void lock();
//同上,但是等待过程中会响应中断
void lockInterruptibly() throws InterruptedException;
//非阻塞地获取锁,如果能获取到会返回true,不能返回false
boolean tryLock();
//尝试获取锁,但是可以限定超时时间,如果超出时间还没拿到锁返回false,否则返回true,可以响应中断
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//释放锁
void unlock();
//暂时可以理解为替代传统的Object的wait()、notify()等操作的工具
Condition newCondition();
}

同样按照(一)中的案例,演示如何使用Lock类来进行加锁和释放锁操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main {
private static int i = 0;
public static void main(String[] args) throws InterruptedException {
Lock testLock = new ReentrantLock(); //可重入锁ReentrantLock类是Lock类的一个实现,我们后面会进行介绍
Runnable action = () -> {
for (int j = 0; j < 100000; j++) { //还是以自增操作为例
testLock.lock(); //加锁,加锁成功后其他线程如果也要获取锁,会阻塞,等待当前线程释放
i++;
testLock.unlock(); //解锁,释放锁之后其他线程就可以获取这把锁了(注意在这之前一定得加锁,不然报错)
}
};
new Thread(action).start();
new Thread(action).start();
Thread.sleep(1000); //等上面两个线程跑完
System.out.println(i);
}
}

这里的线程在执行到action中的代码时,是真的回去请求获取main中定义的锁对象的,他们俩争抢唯一的一把锁,自然也不会出现交错的自增导致线程冲突的问题了。

上面的代码中出现了ReentrantLock,这里简单说一下

reetrantlock可以用来当做synchronized使用的,它比synchronized更安全,在线程tryLock()失败时不会导致死锁,它不会在获取不到锁时无限等待,因为我们必须先获取到锁,然后再进入代码块,最后在finally里释放锁,才能完成全部流程

但是这样开了两个线程去自发地争抢锁,那我们如何去控制线程等待和唤醒等待中(wait()和notify())的线程呢?这里并发包提供了Condition接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface Condition {
//与调用锁对象的wait方法一样,会进入到等待状态,但是这里需要调用Condition的signal或signalAll方法进行唤醒(感觉就是和普通对象的wait和notify是对应的)同时,等待状态下是可以响应中断的
void await() throws InterruptedException;
//同上,但不响应中断(看名字都能猜到)
void awaitUninterruptibly();
//等待指定时间,如果在指定时间(纳秒)内被唤醒,会返回剩余时间,如果超时,会返回0或负数,可以响应中断
long awaitNanos(long nanosTimeout) throws InterruptedException;
//等待指定时间(可以指定时间单位),如果等待时间内被唤醒,返回true,否则返回false,可以响应中断
boolean await(long time, TimeUnit unit) throws InterruptedException;
//可以指定一个明确的时间点,如果在时间点之前被唤醒,返回true,否则返回false,可以响应中断
boolean awaitUntil(Date deadline) throws InterruptedException;
//唤醒一个处于等待状态的线程,注意还得获得锁才能接着运行
void signal();
//同上,但是是唤醒所有等待线程
void signalAll();
}

我们可以直接通过我们创建的锁对象来lock.getCondition()来获取Condition对象,然后调用Condition对象实现唤醒和等待

△ 同一把锁内可以存在多个Condition对象,他们的等待队列是分开计算的(所以可以实现多等待队列)

可重入锁

大家可否学过操作系统或者计组?这里跟CPU响应中断有点像。

CPU需要处理许多事,而每件事的优先级都不同。如果CPU在处理一件事时,有一件更紧急的事来了,CPU就需要先放下手上的事,优先处理更紧急的事,以此类推,不断的套娃(笑

虽然跟这篇文章主题没什么关系,但是在CPU中,它心情不好的时候是可以只干自己愿意干的事的(关中断),在那时就无法响应更高的请求了

可重入锁简单的理解其实也还是套娃(同一个线程可以反复上锁),不过这里可重入锁要锁的对象是他自己,在一个锁它负责的代码范围内可以套入一次锁,同时每次上锁和解锁操作必须一一对应,不然显然会出现问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws InterruptedException {
ReentrantLock lock = new ReentrantLock();
lock.lock();
lock.lock(); //连续加锁2次
new Thread(() -> {
System.out.println("线程2想要获取锁");
lock.lock();
System.out.println("线程2成功获取到锁");
}).start();
lock.unlock();
System.out.println("线程1释放了一次锁");
TimeUnit.SECONDS.sleep(1);
lock.unlock();
System.out.println("线程1再次释放了一次锁"); //释放两次后其他线程才能加锁
}

我们可以在一段线程内通过锁调用getHoleCount()方法来查询当前套娃的层数。

实际上,若有线程拿到锁了,如果有其他线程也来获取锁,他们会进入一个等待队列,我们同样可以通过锁对象调用Lock.getQueueLength()方法来获取等待该锁的线程数量的估计值

同样的,锁也能通过lock.getWaitQueueLength(Condition condition)方法来查询等待Condition的线程数量

这里再次强调一下关系,锁拥有不同的condition,然后通过锁来查询condition的等待队列的线程对象

锁对象还提供了hasQueuedThread(Thread t)的方法来查询一个线程是否在等待队列中

公平锁与非公平锁

之前提到若线程间争抢同一把锁,会让他们暂时进入到等待队列中,那么线程获取锁的顺序是否是按照进入等待队列的先后顺序呢?

在ReentrantLock的构造方法中是这样写的

1
2
3
public ReentrantLock() {
sync = new NonfairSync(); //看名字貌似是非公平的
}

其实锁分为公平锁和非公平锁,可以看出默认创建出来的是非公平锁

  • 公平锁:多个线程按照进入等待队列的次序来获得锁
  • 非公平锁:多个线程锁获取锁时,各个线程会直接去抢锁,若获取不到才回去排队

公平锁一定是公平的吗?

读写锁

读写锁并不是专门用作读写操作的的锁,但是可以以读写操作来举例理解

还是以操作系统举例

  • 在没有线程在写某个文件时,同时可以有多个线程在读该文件
  • 在没有线程在读某个文件时,同时只能有一个线程写该文件

这里的读写锁机制如下

  • 读锁:写锁未占用,同时可以多个线程加读锁
  • 写锁:读锁未占用,同时只能有一个线程加写锁

读写锁与可重入锁一样,有专门的接口

1
2
3
4
5
6
public interface ReadWriteLock {
//获取读锁
Lock readLock();
//获取写锁
Lock writeLock();
}

ReadWriteLock接口有一个实现类ReentrantReadWriteLock(它本身不是锁),我们操作ReentrantReadWriteLock时不能直接上锁,而是要先获取读锁或写锁后再上锁。

并且,ReentrantReadWriteLock不仅具有读写锁的功能,还保留了可重入锁和公平/非公平机制,比如同一个线程可以重复为写锁加锁,并且必须全部解锁才真正释放锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.writeLock().lock();
new Thread(() -> {
lock.writeLock().lock();
System.out.println("成功获取到写锁!");
}).start();
System.out.println("释放第一层锁!");
lock.writeLock().unlock();
TimeUnit.SECONDS.sleep(1);
System.out.println("释放第二层锁!");
lock.writeLock().unlock();
}

锁降级和锁升级

锁降级指的是写锁降级成读锁;

当一个线程持有写锁,虽然别的线程不能申请读锁,但是持有写锁的线程可以将加一把读锁,这就是锁降级

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.writeLock().lock();
lock.readLock().lock();
System.out.println("成功加读锁!");
}

那么,如果我们在同时加了写锁和读锁的情况下,释放写锁,是否其他的线程就可以一起加读锁了呢?

在一个线程同时持有写锁和读锁的情况下,释放了写锁,其他线程也会获取到写锁,这种情况就叫做锁降级

注意在仅持有读锁的情况下去申请写锁,属于”锁升级“,ReentrantReadWriteLock是不支持的:

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
lock.readLock().lock();
lock.writeLock().lock();
System.out.println("锁升级成功!");
}

可以看到线程直接卡在加写锁的那一句了。

锁降级: 有写锁 -> 可以再申请读锁; 释放写锁后其他线程也可以申请读锁了
锁升级(ReentrantReadWriteLock不支持): 有读锁 -> 无法申请写锁;

线程同步器AQS

底层实现(以公平锁为例)

锁执行lock对象时,实际上是调用的Sync对象的方法,而Sync又继承自AbstractQueuedSynchronizer(队列同步器AQS)

既然是AQS中的Q是Queued,那么它自然需要维护一个队列

image-20240305165944081

对于每个节点,他有几个比较重要的字段和方法

  • Prev和Next,显而易见是指向前序节点和后续节点的指针
  • status:表示节点不同的状态
  • thread:记录被封装到该节点内的线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
//每个处于等待状态的线程都可以是一个节点,并且每个节点是有很多状态的
static final class Node {
//每个节点都可以被分为独占模式节点或是共享模式节点,分别适用于独占锁和共享锁
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

//等待状态,这里都定义好了
//唯一一个大于0的状态,表示已失效,可能是由于超时或中断,此节点被取消。
static final int CANCELLED = 1;
//此节点后面的节点被挂起(进入等待状态)
//注意这里是后面
static final int SIGNAL = -1;
//在条件队列中的节点才是这个状态
static final int CONDITION = -2;
//传播,一般用于共享锁
static final int PROPAGATE = -3;

volatile int waitStatus; //等待状态值
volatile Node prev; //双向链表基操
volatile Node next;
volatile Thread thread; //每一个线程都可以被封装进一个节点进入到等待队列

Node nextWaiter; //在等待队列中表示模式,条件队列中作为下一个结点的指针

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() {
}

Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}

在一开始的时候,headtail都是nullstate为默认值0

继续看AQS初始化的其他内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//直接使用Unsafe类进行操作
private static final Unsafe unsafe = Unsafe.getUnsafe();
//记录类中属性的在内存中的偏移地址,方便Unsafe类直接操作内存进行赋值等(直接修改对应地址的内存)
private static final long stateOffset; //这里对应的就是AQS类中的state成员字段
private static final long headOffset; //这里对应的就是AQS类中的head头结点成员字段
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static { //静态代码块,在类加载的时候就会自动获取偏移地址
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));

} catch (Exception ex) { throw new Error(ex); }
}

//通过CAS操作来修改头结点
private final boolean compareAndSetHead(Node update) {
//调用的是Unsafe类的compareAndSwapObject方法,通过CAS算法比较对象并替换
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

//同上,省略部分代码
private final boolean compareAndSetTail(Node expect, Node update) {...}
private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {...}
private static final boolean compareAndSetNext(Node node, Node expect, Node update) {...}

可以发现,队列同步器由于要使用到CAS算法,所以,直接使用了Unsafe工具类,Unsafe类中提供了CAS操作的方法(Java无法实现,底层由C++实现)所有对AQS类中成员字段的修改,都有对应的CAS操作封装。

CAS类提供了一些可重写的方法,同时也为独占式和非独占式锁都提供了对应的方法,已经一些已经写好的模板方法(他们会调用重写的方法)。

首先看一些可重写方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//独占式获取同步状态,查看同步状态是否和参数一致,如果返没有问题,那么会使用CAS操作设置同步状态并返回true
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

//独占式释放同步状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

//共享式获取同步状态,返回值大于0表示成功,否则失败
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}

//共享式释放同步状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}

//是否在独占模式下被当前线程占用(锁是否被当前线程持有)
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}

可以看到,这些需要重写的方法默认是直接抛出UnsupportedOperationException,也就是说根据不同的锁类型,我们需要去实现对应的方法,我们可以来看一下ReentrantLock(此类是全局独占式的)中的公平锁是如何借助AQS实现的:

1
2
3
4
5
6
7
8
9
10
11
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

//加锁操作调用了模板方法acquire
//为了防止各位绕晕,请时刻记住,lock方法一定是在某个线程下为了加锁而调用的,并且同一时间可能会有其他线程也在调用此方法
final void lock() {
acquire(1);
}

...
}

我们先看看加锁操作干了什么事情,这里直接调用了AQS提供的模板方法acquire(),我们来看看它在AQS类中的实现细节:

1
2
3
4
5
6
@ReservedStackAccess //这个是JEP 270添加的新注解,它会保护被注解的方法,通过添加一些额外的空间,防止在多线程运行的时候出现栈溢出,下同
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //节点为独占模式Node.EXCLUSIVE
selfInterrupt();
}

其中的tryAcquire()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static final class FairSync extends Sync {
//可重入独占锁的公平实现
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //先获取当前线程的Thread对象
int c = getState(); //获取当前AQS对象状态(独占模式下0为未占用,大于0表示已占用)
if (c == 0) { //如果是0,那就表示没有占用,现在我们的线程就要来尝试占用它
//一直到这里为止还可能有多个线程在这一步
if (!hasQueuedPredecessors() && //等待队列是否不为空且当前线程没有拿到锁,其实就是看看当前线程有没有必要进行排队,如果没必要排队,就说明可以直接获取锁
compareAndSetState(0, acquires)) { //CAS设置状态,如果成功则说明成功拿到了这把锁,失败则说明可能这个时候其他线程在争抢,并且还比你先抢到
setExclusiveOwnerThread(current); //成功拿到锁,会将独占模式所有者线程设定为当前线程(这个方法是父类AbstractOwnableSynchronizer中的,就表示当前这把锁已经是这个线程的了)
return true; //占用锁成功,返回true
}
}
else if (current == getExclusiveOwnerThread()) { //如果不是0,那就表示被线程占用了,这个时候看看是不是自己占用的,如果是,由于是可重入锁,可以继续加锁
int nextc = c + acquires; //多次加锁会将状态值进行增加,状态值就是加锁次数
if (nextc < 0) //加到int值溢出了?
throw new Error("Maximum lock count exceeded");
setState(nextc); //设置为新的加锁次数
return true;
}
return false; //其他任何情况都是加锁失败
}
}

image-20240305172706008

在了解了公平锁的实现之后,是不是感觉有点恍然大悟的感觉,虽然整个过程非常复杂,但是只要理清思路,还是比较简单的。

接着我们看addWaiter(Node.EXCLUSIVE)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先尝试使用CAS直接入队,如果这个时候其他线程也在入队(就是不止一个线程在同一时间争抢这把锁)就进入enq()
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//此方法是CAS快速入队失败时调用
enq(node);
return node;
}

private Node enq(final Node node) {
//自旋形式入队,可以看到这里是一个无限循环
for (;;) {
Node t = tail;
if (t == null) { //这种情况只能说明头结点和尾结点都还没初始化
if (compareAndSetHead(new Node())) //初始化头结点和尾结点
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t; //只有CAS成功的情况下,才算入队成功,如果CAS失败,那说明其他线程同一时间也在入队,并且手速还比当前线程快,刚好走到CAS操作的时候,其他线程就先入队了,那么这个时候node.prev就不是我们预期的节点了,而是另一个线程新入队的节点,所以说得进下一次循环再来一次CAS,这种形式就是自旋
}
}
}
}

img

再看acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ReservedStackAccess
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //可以看到当此节点位于队首(node.prev == head)时,会再次调用tryAcquire方法获取锁,如果获取成功,会返回此过程中是否被中断的值
setHead(node); //新的头结点设置为当前结点
p.next = null; // 原有的头结点没有存在的意义了
failed = false; //没有失败
return interrupted; //直接返回等待过程中是否被中断
}
//依然没获取成功,
if (shouldParkAfterFailedAcquire(p, node) && //将当前节点的前驱节点等待状态设置为SIGNAL,如果失败将直接开启下一轮循环,直到成功为止,如果成功接着往下
parkAndCheckInterrupt()) //挂起线程进入等待状态,等待被唤醒,如果在等待状态下被中断,那么会返回true,直接将中断标志设为true,否则就是正常唤醒,继续自旋
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //通过unsafe类操作底层挂起线程(会直接进入阻塞状态)
return Thread.interrupted();
}

img

上面是获取锁

释放锁其实也是类似的,在释放的过程中,需要唤醒队列中下一个结点中的线程,然后还要维护AQS中的状态(删除挂起的队列,减少等待队列中节点数量)

还记得JVM的垃圾回收器吗,这里将结点设置为空,然后把指向他的指针知道别的地方,他稍后就会被垃圾回收器回收

具体的代码也贴一下吧

unlock()方法是在AQS中实现的:

1
2
3
public void unlock() {
sync.release(1); //直接调用了AQS中的release方法,参数为1表示解锁一次state值-1
}
1
2
3
4
5
6
7
8
9
10
@ReservedStackAccess
public final boolean release(int arg) {
if (tryRelease(arg)) { //和tryAcquire一样,也得子类去重写,释放锁操作
Node h = head; //释放锁成功后,获取新的头结点
if (h != null && h.waitStatus != 0) //如果新的头结点不为空并且不是刚刚建立的结点(初始状态下status为默认值0,而上面在进行了shouldParkAfterFailedAcquire之后,会被设定为SIGNAL状态,值为-1)
unparkSuccessor(h); //唤醒头节点下一个节点中的线程
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
// 将等待状态waitStatus设置为初始值0
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

//获取下一个结点
Node s = node.next;
if (s == null || s.waitStatus > 0) { //如果下一个结点为空或是等待状态是已取消,那肯定是不能通知unpark的,这时就要遍历所有节点再另外找一个符合unpark要求的节点了
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) //这里是从队尾向前,因为enq()方法中的t.next = node是在CAS之后进行的,而 node.prev = t 是CAS之前进行的,所以从后往前一定能够保证遍历所有节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null) //要是找到了,就直接unpark,要是还是没找到,那就算了
LockSupport.unpark(s.thread);
}

那么我们来看看tryRelease()方法是怎么实现的,具体实现在Sync中:

1
2
3
4
5
6
7
8
9
10
11
12
13
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
int c = getState() - releases; //先计算本次解锁之后的状态值
if (Thread.currentThread() != getExclusiveOwnerThread()) //因为是独占锁,那肯定这把锁得是当前线程持有才行
throw new IllegalMonitorStateException(); //否则直接抛异常
boolean free = false;
if (c == 0) { //如果解锁之后的值为0,表示已经完全释放此锁
free = true;
setExclusiveOwnerThread(null); //将独占锁持有线程设置为null
}
setState(c); //状态值设定为c
return free; //如果不是0表示此锁还没完全释放,返回false,是0就返回true
}

这样就大概讲完了可重入锁的公平锁实现,实际上也不是无法理解的(迫真

下面是讲师总结的流程图

image-20230306171428206

这是我自己总结的

img

公平锁真的公平吗?

在并发的情况下,公平锁是有概率变得不公平的

对于每个需要尝试获取锁的进程,他们都会先执行tryAcquire()来尝试获取锁,在尝试获取锁的过程中会先判断在队列中是否有节点处于等待队列,然后一旦发现没有,就会执行CAP操作

现在假设有线程1,线程2和线程3;线程1已经拿到锁了,这时线程2开始尝试获取锁,它发现虽然等待队列是空的,但是CAS操作失败,显然有线程在用锁,这时他开始准备排队了;

现在,突然线程3也开始尝试获取锁,恰巧在这时线程1释放锁了,线程3说:“消息队列没人在排队,CAS也没人在用,那我就不客气了”,于是线程3顺利地拿到了锁,实现了插队的目的。线程2:“我转个身怎么前面大哥换人了?”

image-20240305203430604

虽然概率很低,但高并发的情况也不是遇不见这样离谱的情况。

所以,严格来讲,公平锁只有在等待队列非空的时候才是公平的,就算是公平,也不是按照发请求的先后的公平,而是按照进入等待队列的时间的公平。

Condition实现原理

之前我们看了Condition,了解了它实际上就是替代传统对象实现wait/notify操作(在Condition中是await/signal操作)的,并且同一把锁可以创建多个Condition对象,我们现在对Condition对象进行解析

在AQS中,Condition类有一个实现类ConditionObject,其同样使用链表实现了条件队列

这里的条件队列能允许线程在某些条件不满足的情况下先进入等待状态,并且等待被唤醒;在某个对象进入调用wait()后会被放入条件队列,等待notify()唤醒以争夺锁

1
2
3
4
5
6
7
8
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** 条件队列的头结点 */
private transient Node firstWaiter;
/** 条件队列的尾结点 */
private transient Node lastWaiter;

//...

这里条件队列直接借用了AQS的Node类,但是使用的是Node类中的nextWaiter字段来连接节点,并且Node的状态设置为CONDITION(处于条件队列中)

image-20240307130533174

当一个线程调用await()方法时,会进入等待状态(进入条件队列),直到其他线程使用signal()方法将其唤醒(进入AQS的等待队列)

下面将会研究await()方法的实现,这里先明确这个方法的目标

  • 仅有已经持有锁的方法才能够调用await
  • 当调用await方法后,无论加了多少次锁,都会直接释放锁
  • 只有其他线程调用signal或者是被中断时,才会唤醒等待中的线程
  • 被唤醒的线程依然需要等待其他线程释放锁,并且等真正抢到锁以后才会继续执行,并且会恢复到await()时的状态(和await时一样的锁层数)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //如果在调用await之前就被添加了中断标记,那么会直接抛出中断异常
Node node = addConditionWaiter(); //为当前线程创建一个新的节点,并将其加入到条件队列中
int savedState = fullyRelease(node); //完全释放当前线程持有的锁,并且保存一下state值,因为唤醒之后还得恢复
int interruptMode = 0; //用于保存中断状态
while (!isOnSyncQueue(node)) { //循环判断是否位于同步队列中,如果等待状态下的线程被其他线程唤醒,那么会正常进入到AQS的等待队列中(之后我们会讲)
LockSupport.park(this); //如果依然处于等待状态,那么继续挂起
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //看看等待的时候是不是被中断了
break;
}
//出了循环之后,那线程肯定是已经醒了,这时就差拿到锁就可以恢复运行了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //直接开始acquireQueued尝试拿锁(之前已经讲过了)从这里开始基本就和一个线程去抢锁是一样的了
interruptMode = REINTERRUPT;
//已经拿到锁了,基本可以开始继续运行了,这里再进行一下后期清理工作
if (node.nextWaiter != null)
unlinkCancelledWaiters(); //将等待队列中,不是Node.CONDITION状态的节点移除
if (interruptMode != 0) //依然是响应中断
reportInterruptAfterWait(interruptMode);
//OK,接着该干嘛干嘛
}

简而言之

  1. 首先判断当前调用的线程是否处于中断的状态,若已在中断状态了,那么还谈什么等待状态呢,直接抛异常
  2. 在条件队列中加入一个新的节点,并保存当前线程的各种状态
  3. 循环判断当前线程是否还处于条件队列中,并且在循环里监视有没有在等待时被中断(被中断了一样会醒)
  4. 当跳出循环,表示当前线程一定是醒了,现在只需要拿到锁就可以开始运行了
  5. 在拿到锁后进行收尾工作,打扫一下等待队列,再回头看一眼有没有被中断,之后就正式开始执行自己的任务了

实际上await()方法比较中规中矩,大部分操作也在我们的意料之中,那么我们接着来看signal()方法是如何实现的,同样的,为了防止各位绕晕,先明确signal的目标:

  • 只有持有锁的线程才能唤醒锁所属(等着这把锁)Condition的的线程
  • 优先唤醒条件队列队首,若出现问题,就按顺序往下找,直到找到可以唤醒的
  • 唤醒的本质是将线程从条件队列中移出至等待队列
  • 拿到锁之后,线程才能恢复运行

image-20230306171620786

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively()) //先看看当前线程是不是持有锁的状态
throw new IllegalMonitorStateException(); //不是?那你不配唤醒别人
Node first = firstWaiter; //获取条件队列的第一个结点
if (first != null) //如果队列不为空,获取到了,那么就可以开始唤醒操作
doSignal(first);
}
1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null) //如果当前节点在本轮循环没有后继节点了,条件队列就为空了
lastWaiter = null; //所以这里相当于是直接清空
first.nextWaiter = null; //将给定节点的下一个结点设置为null,因为当前结点马上就会离开条件队列了
} while (!transferForSignal(first) && //接着往下看
(first = firstWaiter) != null); //能走到这里只能说明给定节点被设定为了取消状态,那就继续看下一个结点
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final boolean transferForSignal(Node node) {
/*
* 如果这里CAS失败,那有可能此节点被设定为了取消状态
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

//CAS成功之后,结点的等待状态就变成了默认值0,接着通过enq方法直接将节点丢进AQS的等待队列中,相当于唤醒并且可以等待获取锁了
//这里enq方法返回的是加入之后等待队列队尾的前驱节点,就是原来的tail
Node p = enq(node);
int ws = p.waitStatus; //保存前驱结点的等待状态
//如果上一个节点的状态为取消, 或者尝试设置上一个节点的状态为SIGNAL失败(可能是在ws>0判断完之后马上变成了取消状态,导致CAS失败)
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //直接唤醒线程
return true;
}

这里其实思路也不是很复杂, 跟之前消费等待队列的思路大差不差

  • 先判断调用signal的线程的状态,是不是有锁,没锁你唤醒个锤子

  • 若持有锁,就从条件队列取队首,并进行消费,不断循环判断该节点是否为取消状态,若是则继续看下一个节点;若发现条件队列都空了,就要进行一些收尾工作

  • 消费等待队列结点的过程:

    1. 先进行CAS判断,若成功了表明已经有资格进入等待队列了
    2. 将线程加入等待队列,并且获取前驱节点的等待状态
    3. 若上一个节点的状态为取消(上一个节点都被取消了还管他干啥,直接开抢), 或者尝试设置上一个节点的状态为SIGNAL时失败(想把我挂起?没门,开抢!)

    image-20240307133318626

其实最让人不理解的就是倒数第二行,明明上面都正常进入到AQS等待队列了,应该是可以开始走正常流程了,那么这里为什么还要提前来一次unpark呢?

这里其实是为了进行优化而编写,直接unpark会有两种情况:

  • 如果插入结点前,AQS等待队列的队尾节点就已经被取消,则满足wc > 0
  • 如果插入node后,AQS内部等待队列的队尾节点已经稳定,满足tail.waitStatus == 0,但在执行ws >0之后 !compareAndSetWaitStatus(p, ws,Node.SIGNAL)之前被取消,则CAS也会失败,满足compareAndSetWaitStatus(p, ws,Node.SIGNAL) == false

如果这里被提前unpark,那么在await()方法中将可以被直接唤醒,并跳出while循环,直接开始争抢锁,因为前一个等待结点是被取消的状态,没有必要再等它了。

大致流程如下

image-20230306171643082

其实不难理解吧。。就是概念比较多,讲师带着顺一遍其实基本能跑通思路


原子类

原子类,正如其名,是可以实现原子操作的类,JUC为我们提供的原子类底层采用CAS算法;所有的原子类都位于java.util.concurrent.atomic包下。

原子类介绍

常用的基本数据类型有其对应的原子类封装:

  • AutomicInteger:原子更新int
  • AtomicLon:原子更新long
  • AtomicBoolean:原子更新boolean

现在我们使用int类型对应的原子类:

1
2
3
4
5
6
public class Main {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(1);
System.out.println(i.getAndIncrement()); //如果想实现i += 2这种操作,可以使用 addAndGet() 自由设置delta 值
}
}

我们可以将int数值封装到此类中(注意必须调用构造方法,它不像Integer那样有装箱机制),并且通过调用此类提供的方法来获取或是对封装的int值进行自增。

底层实现

1
2
3
4
5
6
7
private volatile int value;

public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}

AtomicInteger的底层本质是实现了一个volatile类型的int值,这样就可以保证在CAS时的可见性,进而实现原子性

原子类底层也是采用了CAS算法来保证的原子性,包括getAndSetgetAndAdd等方法都是这样。原子类也直接提供了CAS操作方法,我们可以直接使用:

1
2
3
4
5
6
public static void main(String[] args) throws InterruptedException {
AtomicInteger integer = new AtomicInteger(10);
System.out.println(integer.compareAndSet(30, 20));
System.out.println(integer.compareAndSet(10, 20));
System.out.println(integer);
}

如果想以普通变量的方式来设定值,那么可以使用lazySet()方法,这样就不采用volatile的立即可见机制了。

1
2
AtomicInteger integer = new AtomicInteger(1);
integer.lazySet(2);

除了基本类有原子类以外,基本类型的数组类型也有原子类:

  • AtomicIntegerArray:原子更新int数组
  • AtomicLongArray:原子更新long数组
  • AtomicReferenceArray:原子更新引用数组

其实原子数组和原子类型一样的,不过我们可以对数组内的元素进行原子操作:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
AtomicIntegerArray array = new AtomicIntegerArray(new int[]{0, 4, 1, 3, 5});
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
array.getAndAdd(0, 1);
};
new Thread(r).start();
new Thread(r).start();
TimeUnit.SECONDS.sleep(1);
System.out.println(array.get(0));
}

在JDK8之后,新增了DoubleAdderLongAdder,在高并发情况下,LongAdder的性能比AtomicLong的性能更;

比如说在自增时普通的原子类会只对一个value进行CAS,而DoubleAdderLongAdder会对一个数组进行分散的CAS操作(即不同线程可以对数组中不同的元素进行CAS自增),这样就避免了所有线程都对同一个值进行CAS

image-20240308105732094

使用如下:

1
2
3
4
5
6
7
8
9
10
11
public static void main(String[] args) throws InterruptedException {
LongAdder adder = new LongAdder();
Runnable r = () -> {
for (int i = 0; i < 100000; i++)
adder.add(1);
};
for (int i = 0; i < 100; i++)
new Thread(r).start(); //100个线程
TimeUnit.SECONDS.sleep(1);
System.out.println(adder.sum()); //最后求和即可
}

除了对基本数据类型支持原子操作外,对于引用类型,也是可以实现原子操作的:

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException {
String a = "Hello";
String b = "World";
AtomicReference<String> reference = new AtomicReference<>(a);
reference.compareAndSet(a, b);
System.out.println(reference.get());
}

JUC还提供了字段原子更新器,可以对类中的某个指定字段进行原子操作(注意字段必须添加volatile关键字):

1
2
3
4
5
6
7
8
9
10
11
public class Main {
public static void main(String[] args) throws InterruptedException {
Student student = new Student();
AtomicIntegerFieldUpdater<Student> fieldUpdater =
AtomicIntegerFieldUpdater.newUpdater(Student.class, "age");
System.out.println(fieldUpdater.incrementAndGet(student));
}
public static class Student{
volatile int age;
}
}

ABA问题及其解决方案

之前的Redis中提过,这里不再赘述Redis 秒杀 笔记1

这里的解决方法时记录一个不会重复的版本号

1
2
3
4
5
6
7
public static void main(String[] args) throws InterruptedException {
String a = "Hello";
String b = "World";
AtomicStampedReference<String> reference = new AtomicStampedReference<>(a, 1); //在构造时需要指定初始值和对应的版本号
reference.attemptStamp(a, 2); //可以中途对版本号进行修改,注意要填写当前的引用对象
System.out.println(reference.compareAndSet(a, b, 2, 3)); //CAS操作时不仅需要提供预期值和修改值,还要提供预期版本号和新的版本号
}

并发容器

传统容器线程安全吗?

以ArrayList为例子,我们创建两个线程,同时向ArrayList中添加数据,各添加1万次,最后获得的队列长度应该是2万。但是在实际情况下,一般都会小于2万个

ArrayList的入队操作是先确认是否有足够的容量进行插入操作,在确认可以插入后就执行插入操作。在多线程的情况下,A线程执行确认和插入操作之间,B线程执行了插入,导致A线程插入时队列的长度不足,就会导致数组越界,造成最后的结果与预期不一致。

传统容器虽然在单线程情况下很好用,但是在多线程情况下就会产生安全问题,下面会介绍一些常用的线程安全的并发容器

并发容器介绍

如何让传统线程容器安全?显而易见的我们可以使用synchronized关键字,但是这样效率太低了。

我们直接使用JUC提供的专用于并发场景下的容器。

CopyOnWriteArrayList

比如之前的ArrayList我们可以替换为CopyOnWriteArrayList,它是线程安全的,就不会造成线程之间互相冲突的问题

那么它是如何执行add()操作的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //直接加锁,保证同一时间只有一个线程进行添加操作
try {
Object[] elements = getArray(); //获取当前存储元素的数组
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1); //直接复制一份数组
newElements[len] = e; //修改复制出来的数组
setArray(newElements); //将元素数组设定为复制出来的数组
return true;
} finally {
lock.unlock();
}
}

可以看到添加操作是直接上锁,并且会先拷贝一份当前存放元素的数组,然后对数组进行修改,再将此数组替换(CopyOnWrite)接着我们来看读操作:

1
2
3
public E get(int index) {
return get(getArray(), index);
}

正如其名,CopyOnWriteArrayList会对写操作加上锁,而读操作不需要加锁,毕竟再多人读都不会改变数组的内容,但是有人在写就可能造成前后读取的数据不一致的问题

ConcurrentHashMap

接着我们来看对于HashMap的并发容器ConcurrentHashMap

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) throws InterruptedException {
Map<Integer, String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
for (int j = 0; j < 100; j++)
map.put(finalI * 100 + j, "lbwnb");
}).start();
}
TimeUnit.SECONDS.sleep(1);
System.out.println(map.size());
}

在多线程的情况下, 多个线程会争抢同一把锁,我们之前的LongAdder中有一种分散出几个锁来缓解压力的思想,这里也是类似的;在JDK7以前,ConcurrentHashMap的原理是将数据分段存储,每一段都有自己的锁,这样当给一段数据加了锁也不会影响对其他段的操作了

image-20240311105110251

在JDK8之后,ConcurrentHashMap又引入了红黑树和哈希表综合的机制

image-20240311105441364

当插入数据时,会先计算对应的哈希值,根据哈希值找到应在数组中存放的位置,然后创建一个新的结点并添加到对应下标的链表后面,打拿个链表长度达到8以后,会自动将链表转化为红黑树,这样就能提升查询效率。

ConcurrentHashMap的put()实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException(); //键值不能为空,基操
int hash = spread(key.hashCode()); //计算键的hash值,用于确定在哈希表中的位置
int binCount = 0; //一会用来记录链表长度的,忽略
for (Node<K,V>[] tab = table;;) { //无限循环,而且还是并发包中的类,盲猜一波CAS自旋锁
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //如果数组(哈希表)为空肯定是要进行初始化的,然后再重新进下一轮循环
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果哈希表该位置为null,直接CAS插入结点作为头结即可(注意这里会将f设置当前哈希表位置上的头结点)
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 如果CAS成功,直接break结束put方法,失败那就继续下一轮循环
} else if ((fh = f.hash) == MOVED) //头结点哈希值为-1,这里只需要知道是因为正在扩容即可
tab = helpTransfer(tab, f); //帮助进行迁移,完事之后再来下一次循环
else { //特殊情况都完了,这里就该是正常情况了,
V oldVal = null;
synchronized (f) { //在前面的循环中f肯定是被设定为了哈希表某个位置上的头结点,这里直接把它作为锁加锁了,防止同一时间其他线程也在操作哈希表中这个位置上的链表或是红黑树
if (tabAt(tab, i) == f) {
if (fh >= 0) { //头结点的哈希值大于等于0说明是链表,下面就是针对链表的一些列操作
...实现细节略
} else if (f instanceof TreeBin) { //肯定不大于0,肯定也不是-1,还判断是不是TreeBin,所以不用猜了,肯定是红黑树,下面就是针对红黑树的情况进行操作
//在ConcurrentHashMap并不是直接存储的TreeNode,而是TreeBin
...实现细节略
}
}
}
//根据链表长度决定是否要进化为红黑树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); //注意这里只是可能会进化为红黑树,如果当前哈希表的长度小于64,它会优先考虑对哈希表进行扩容
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}

总结一下就是:

image-20230306172102878

讲师的图画的也太好!

ConcurrentHashMap的get()实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //计算哈希值
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 如果头结点就是我们要找的,那直接返回值就行了
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//要么是正在扩容,要么就是红黑树,负数只有这两种情况
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
//确认无误,肯定在列表里,开找
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//没找到只能null了
return null;
}

综上,ConcurrentHashMap的put操作,实际上是对哈希表上的所有头结点元素分别加锁,理论上来说哈希表的长度很大程度上决定了ConcurrentHashMap在同一时间能够处理的线程数量,这也是为什么treeifyBin()会优先考虑为哈希表进行扩容的原因。显然,这种加锁方式比JDK7的分段锁机制性能更好。

get操作就是根据hash来找到对应的队列/红黑树,然后在里面找到对应的数据并且返回

阻塞队列

了我们常用的容器类之外,JUC还提供了各种各样的阻塞队列,用于不同的工作场景。

阻塞队列本身也是队列,但是它是适用于多线程环境下的,基于ReentrantLock实现的,它的接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);

//入队,如果队列已满,返回false否则返回true(非阻塞)
boolean offer(E e);

//入队,如果队列已满,阻塞线程直到能入队为止
void put(E e) throws InterruptedException;

//入队,如果队列已满,阻塞线程直到能入队或超时、中断为止,入队成功返回true否则false
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

//出队,如果队列为空,阻塞线程直到能出队为止
E take() throws InterruptedException;

//出队,如果队列为空,阻塞线程直到能出队超时、中断为止,出队成功正常返回,否则返回null
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

//返回此队列理想情况下(在没有内存或资源限制的情况下)可以不阻塞地入队的数量,如果没有限制,则返回 Integer.MAX_VALUE
int remainingCapacity();

boolean remove(Object o);

public boolean contains(Object o);

//一次性从BlockingQueue中获取所有可用的数据对象(还可以指定获取数据的个数)
int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);

比如现在有一个容量为3的阻塞队列,这个时候一个线程put向其添加了三个元素,第二个线程接着put向其添加三个元素,那么这个时候由于容量已满,会直接被阻塞,而这时第三个线程从队列中取走2个元素,线程二停止阻塞,先丢两个进去,还有一个还是进不去,所以说继续阻塞。

image-20230306172123711

利用阻塞队列,我们可以轻松地实现消费者和生产者模式;

一共有三种常用的阻塞队列:

  • ArrayBlockingQueue:有界带缓冲阻塞队列(就是队列是有容量限制的,装满了肯定是不能再装的,只能阻塞,数组实现)
  • SynchronousQueue:无缓冲阻塞队列(相当于没有容量的ArrayBlockingQueue,因此只有阻塞的情况)
  • LinkedBlockingQueue:无界带缓冲阻塞队列(没有容量限制,也可以限制容量,也会阻塞,链表实现)

对SynchronousQueue,它没有容量,也就是说入队必须和出队同时出现,我们需要通过transfer方法来对生产者和消费者之间的数据来进行操作

如果只有消费者或者只有生产者都不能完成数据传递,所以会被阻塞;只有生产者和消费者全部到齐了才能进行消费

image-20230306172203832

LinkedBlockingQueue更牛逼了,它在SynchronousQueue的基础上加了容量,可以暂时让多个消费者/生产者在队列中多等着

了解一些其他的队列:

  • PriorityBlockingQueue - 是一个支持优先级的阻塞队列,元素的获取顺序按优先级决定。
  • DelayQueue - 它能够实现延迟获取元素,同样支持优先级。

DelayQueue可以实现代优先级的延迟出队,在这个情况下,就算优先级比较低的结点已经可以出队了,还是需要等待优先级更高的结点结束延迟出队后才能进行出队操作


参考视频:Java JUC 并发编程 已完结(IDEA 2021版本)4K蓝光画质 玩转多线程

视频教程文档:柏码-JUC笔记(二)