ReentrantLock源码探究1:非公平锁的获取和释放

2022-11-18,,,,

AQS简单介绍

​ Sync是ReentrantLock的一个内部类,它继承了AbstractQueuedSynchronizer,即AQS,在CountDownLatch、FutureTask、Semaphore、ReentrantLock等源码中,我们都能看到它们的身影,足见其重要性。此处我们需要先了解下AQS才能更愉悦地阅读源码。

​ AQS中是基于FIFO队列的实现,那么它必然包含队列中元素的定义,在这里它是Node:

属 性 定 义
Node SHARED = new Node() 表示Node处于共享模式
Node EXCLUSIVE = null 表示Node处于独占模式
int CANCELLED = 1 因为超时或者中断,Node被设置为取消状态,被取消的Node不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态,处于这种状态的Node会被踢出队列,被GC回收
int SIGNAL = -1 表示这个Node的继任Node被阻塞了,到时需要通知它
int CONDITION = -2 表示这个Node在条件队列中,因为等待某个条件而被阻塞
int PROPAGATE = -3 使用在共享模式头Node有可能处于这种状态, 表示锁的下一次获取可以无条件传播
int waitStatus 0,新Node会处于这种状态
Node prev 队列中某个Node的前驱Node
Node next 队列中某个Node的后继Node
Thread thread 这个Node持有的线程,表示等待锁的线程
Node nextWaiter 表示下一个等待condition的Node

AQS中包含的方法有

属性/方法 含 义
Thread exclusiveOwnerThread 这个是AQS父类AbstractOwnableSynchronizer的属性,表示独占模式同步器的当前拥有者
Node 上面已经介绍过了,FIFO队列的基本单位
Node head FIFO队列中的头Node
Node tail FIFO队列中的尾Node
int state 同步状态,0表示未锁
int getState() 获取同步状态
setState(int newState) 设置同步状态
boolean compareAndSetState(int expect, int update) 利用CAS进行State的设置
long spinForTimeoutThreshold = 1000L 线程自旋等待的时间
Node enq(final Node node) 插入一个Node到FIFO队列中
Node addWaiter(Node mode) 为当前线程和指定模式创建并扩充一个等待队列
void setHead(Node node) 设置队列的头Node
void unparkSuccessor(Node node) 如果存在的话,唤起Node持有的线程
void doReleaseShared() 共享模式下做释放锁的动作
void cancelAcquire(Node node) 取消正在进行的Node获取锁的尝试
boolean shouldParkAfterFailedAcquire(Node pred, Node node) 在尝试获取锁失败后是否应该禁用当前线程并等待
void selfInterrupt() 中断当前线程本身
boolean parkAndCheckInterrupt() 禁用当前线程进入等待状态并中断线程本身
boolean acquireQueued(final Node node, int arg) 队列中的线程获取锁
tryAcquire(int arg) 尝试获得锁(由AQS的子类实现它
tryRelease(int arg) 尝试释放锁(由AQS的子类实现它
isHeldExclusively() 是否独自持有锁
acquire(int arg) 获取锁
release(int arg) 释放锁
compareAndSetHead(Node update) 利用CAS设置头Node
compareAndSetTail(Node expect, Node update) 利用CAS设置尾Node
compareAndSetWaitStatus(Node node, int expect, int update) 利用CAS设置某个Node中的等待状态

另外在源码中多处使用了CAS,有关CAS的内容,可查看:

乐观锁的一种实现方式:CAS

公平锁的获取过程

假设有两个线程:线程1和线程2尝试获取同一个锁(非公平锁),过程如下

    线程1调用lock方法
    final void lock() {
if (compareAndSetState(0, 1)) //使用CAS将同步状态设置为1
setExclusiveOwnerThread(Thread.currentThread());//成功则设置线程1为当前锁的独占线程
else
acquire(1); //设置失败时,尝试获取锁
}
//上述代码正常情况下执行完毕后,线程1成为了独占线程。
    线程2此时也尝试获取锁,调用lock方法,此时CAS设置时会失败,进入acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt(); //重新获取锁失败且线程发生了中断,自行中断
}

这里面,会首先调用tryAcquire方法尝试再次获取锁,因为我们演示的是非公平锁,因此调用的方法是nonfairTryAcquire。

    final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread(); //current指向当前线程2
int c = getState(); //若线程1未释放锁,则c>0,若线程1已经释放锁,则c=0
if (c == 0) { //线程1已经释放了锁
if (compareAndSetState(0, acquires)) { //使用CAS将state设置为1
setExclusiveOwnerThread(current); //并设置线程2为独占线程
return true; //返回true,获取锁成功
}
}
//判断该线程是否是重入,即之前已经获取到了锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; //每重入一次,将state+1。
if (nextc < 0) // overflow //state+1<0,说明原state为负数,抛出异常
throw new Error("Maximum lock count exceeded");
setState(nextc); //设置state为新值
return true; //返回true,获取重入锁成功。
}
return false; //返回flase,获取锁失败
}

此时线程2使用tryAcquire方法获取锁,如果也是失败,那么,会调用addWaiter(Node.EXCLUSIVE)方法

private Node addWaiter(Node mode) {  //此处mode为独占模式
Node node = new Node(Thread.currentThread(), mode);//将当前线程(此处为线程2)绑定到新节点node上,并设置为独占模式
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //获取原队列的尾节点pred
if (pred != null) { //若原尾节点pred非空,则说明已经存在一个队列
node.prev = pred; //设置新节点node的前置为pred
if (compareAndSetTail(pred, node)) {//使用CAS设置新的尾节点为node
pred.next = node; //设置pred的后置为node,建立双向链接
return node; //返回node
}
}
enq(node); //进入此处说明原队列不存在,需要初始化队列
return node;
}
private Node enq(final Node node) { //此处传入的参数node是绑定了线程2的节点
for (;;) {
Node t = tail; //获取原队列的尾节点t
if (t == null) { // Must initialize //若尾节点为空,说明队列尚未形成
if (compareAndSetHead(new Node())) //设置一个空的,未绑定任何线程的节点为新队列的头节点
tail = head; //新队列只有一个节点,既是头也是尾
} else { //若t非空,说明队列已经形成
node.prev = t; //将node的前置设为t
if (compareAndSetTail(t, node)) { //CAS设置新的尾节点为node
t.next = node; //设置t的后继为node,建立双向链接
return t; //返回t
}
}
}
}

现在看外层方法acquireQueued,此时传入的参数node是线程2所在节点,该方法的作用是在等待队列中,当有其他线程释放了资源,那么队列中在等待的线程就可以开始行动

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否获取到资源
try {
boolean interrupted = false; //等待过程中是否被中断
//自旋,维护等待队列中线程的执行。
for (;;) {
final Node p = node.predecessor(); //获取node的前置p
if (p == head && tryAcquire(arg)) { //若前置p为头结点并且重新获取锁成功
setHead(node); //设置新的头节点为node
p.next = null; // help GC //取消p和链表的链接
failed = false; //获取资源未失败
return interrupted; //等待过程未被中断
}
if (shouldParkAfterFailedAcquire(p, node) && //若前置节点是Node.SIGNAL状态
parkAndCheckInterrupt()) //将节点设置为Waitting状态
interrupted = true; //此时线程中断状态为true
}
} finally {
if (failed) //如果获取资源成功那么取消获取过程
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; //获取前置节点的等待状态
if (ws == Node.SIGNAL) //Node.SIGNAL表示继任者线程需要被唤醒,那么就可以直接返回;
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { //若ws>0,说明前驱被取消,那么执行循环往前一直查找,知道找到未被取消的,将node排在它的后面。
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { //进入else,说明ws=0或者Node.PROPAGATE
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//使用CAS设置前置的节点状态为Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

该部分代码可以用现实中排队办理业务的情况来说明:

假设你排队去办理业务,队伍很长,因此除了当前正在办理业务的人,其他所有排队的人都在低头玩手机,且每个排队的人有以下三种状态:①.正常排队,且办完业务后会通知后面的人别玩手机了可以开始办理业务了。②.发现队伍过长,不排队了,走了。③.正常排队,办完业务后不通知后面的人,直接走。
此时你进入该队伍的尾部开始排队。
1.第一步,判断排队在你前面的人是否会通知你,如果会通知,那么我们就可以不用关心其他问题,在队列中待着玩手机即可。
2.第二步,如果发现排在你前面的人不排队了,要走了,那么此时我们就得往前走一位,并开始不断询问前面的人是不是也准备不排队了,直到我们排在了一个确定不会走的人后面。
3.第三步,排在你前面的人不是准备走的,但是他也不会通知你,那么你就要告诉他,一定得在办完业务后通知你。

当我们确定我们已经在队列中待好后(前置会通知我们),那么我们就可以开始休息。parkAndCheckInterrupt方法让我们的线程进入等待的状态,即休息状态。

private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //调用park()使线程进入waiting状态
return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的。
}

锁的释放过程

public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { //若tryRelease后无人占用锁
Node h = head; //获取队列的头结点h
if (h != null && h.waitStatus != 0) //若h非空,且h的waitStatus不为0
unparkSuccessor(h); //唤醒后继
return true;
}
return false;
}
    protected final boolean tryRelease(int releases) {
int c = getState() - releases; //当前state-1,得到c
if (Thread.currentThread() != getExclusiveOwnerThread()) //执行releas的不是获取锁的独占线程,抛出异常
throw new IllegalMonitorStateException();
boolean free = false; //free用来标记锁是否可获取状态
if (c == 0) { //若state=0
free = true; //那么当前锁是可获取的
setExclusiveOwnerThread(null); //设置当前锁的独占线程为null
}
setState(c); //设置当前state为c
return free; //返回锁是否是可获取状态
}
 private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; //获取当前线程对应节点的waitStatus
if (ws < 0) //将当前线程对应节点waitStatus置为0
compareAndSetWaitStatus(node, ws, 0); /*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; //获取当前线程对应节点的后继节点s
if (s == null || s.waitStatus > 0) { //若s为空或s的状态是canceled
s = null; //将s设置为null。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //此处从尾到头进行遍历,找到队列最前列的节点且状态不是Canceled,将其设置为s。但此处为何从尾部开始遍历尚未弄清楚。
s = t;
}
if (s != null) //若上述遍历找到的s非空
LockSupport.unpark(s.thread); //调用lockSupport.unpark唤醒s对应的线程
}

release方法的逻辑仍然可以用一个办理完业务的人的后续动作来进行说明:

1.若A办理业务后无其他业务需要办理,那么表示当前业务窗口是free的。
2.A将自己的等待状态置为0,相当于退出队列。然后检查自己后面的人是否是空或者取消排队的状态。若为真,将后置设为空。
3.从队列的尾部遍历到头部,直到找到队列最前头的那个,且它的等待状态不是取消状态,那么将其唤醒,告知他可以开始办理业务了。

关于源码的一点疑问

本文中部分源码本人暂时也尚未能理解,希望各位大佬不吝赐教,主要有以下一些问题:

1.在unparkSuccessor方法中,找到队列下一个节点并将其唤醒时,为什么从尾到头遍历

        if (s == null || s.waitStatus > 0) { //若s为空或s的状态是canceled
s = null; //将s设置为null。
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) //倒序遍历?
s = t;
}

2.在acquireQueued方法中,自旋结束后的finally代码块的作用。

final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否获取到资源
try {
boolean interrupted = false; //等待过程中是否被中断
//自旋,维护等待队列中线程的执行。
for (;;) {
final Node p = node.predecessor(); //获取node的前置p
if (p == head && tryAcquire(arg)) { //若前置p为头结点并且重新获取锁成功
setHead(node); //设置新的头节点为node
p.next = null; // help GC //取消p和链表的链接
failed = false; //获取资源未失败
return interrupted; //等待过程未被中断
}
if (shouldParkAfterFailedAcquire(p, node) && //若前置节点是Node.SIGNAL状态
parkAndCheckInterrupt()) //将节点设置为Waitting状态
interrupted = true; //此时线程中断状态为true
}
} finally {
if (failed) //如果自旋结束,那么说明failed = false已经执行了,那么这个canclAcquire方法什么情况下会执行?
cancelAcquire(node);
}
}

ReentrantLock源码探究1:非公平锁的获取和释放的相关教程结束。

《ReentrantLock源码探究1:非公平锁的获取和释放.doc》

下载本文的Word格式文档,以方便收藏与打印。