Go 互斥锁Mutex

2022-12-01,

Mutex是一个互斥锁,可以创建为其他结构体的字段;零值为解锁状态。Mutex类型的锁和线程无关,可以由不同的线程加锁和解锁。
互斥锁的作用是保证共享资源同一时刻只能被一个 Goroutine 占用,一个 Goroutine 占用了,其他的 Goroutine 则阻塞等待。

1、数据结构

type Mutex struct {
state int32 // 表示当前互斥锁的状态
sema uint32 // 信号量变量,用来控制等待 goroutine 的阻塞休眠和唤醒
}

基于该数据结构,实现了两种方法,加锁、释放锁

type Locker interface {
Lock()
Unlock()
}

const (
mutexLocked = 1 << iota // 表示锁是否可用(0可用,1被别的goroutine占用),001
mutexWoken // 表示mutex是否被唤醒,010
mutexStarving // 当前的互斥锁进入饥饿状态,100
mutexWaiterShift = iota // 表示统计阻塞在该mutex上的goroutine数目需要移位的数值,1<<(32-3)个
)
// sema + 1,挂起 goroutine
// 1.不断调用尝试获取锁
// 2.休眠当前 goroutine
// 3.等待信号量,唤醒 goroutine
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
// sema - 1,唤醒 sema 上等待的一个 goroutine
runtime_Semrelease(&m.sema, false, 1)

2、模式

2.1、正常模式

在正常模式下,等待的 goroutine 会按照先进先出的顺序得到锁。刚被唤醒的 goroutine 与新创建的 goroutine 竞争时,大概率无法获得锁,如 G1和 G2 竞争,此时 G1 已经占着 CPU 了,所以大概率拿到锁。

如果 goroutine 超过 1ms,没有获取锁,就会将当前锁切换为饥饿模式。

2.2、饥饿模式

避免 goroutine 被饿死,1.19 引入了饥饿模式

在饥饿模式下,互斥锁会直接交给等到队列最前面的 goroutine,新的 goroutine 在该状态下不能获取锁,也不能进入自旋,只能在队列末尾等待。

2.3、状态切换

正常模式下,

如果队列中只剩一个goroutine 获得了互斥锁或者它等待的时间少于 1ms,那么就会切换到正常模式。

3、加锁

1、Fast path

// 如果锁没被占用,也不是饥饿状态,也没有唤醒goroutine,也没有等待goroutine,加锁成功
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}
return
}

加锁的时候先通过一次 CAS(Compare And Swap) 看能不能拿到锁,如果拿到,直接返回。

// 先判断参数addr指向的被操作值与参数old的值是否相等
// 如果相等,会用参数new代表的新值替换掉原先的旧值,否则 false
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

2、Slow path

如果状态不是 0 ,就会尝试通过自旋等方式等待锁释放,大致分为:

    判断当前 goroutine 能否进入自旋
    通过自旋等待互斥锁的释放
    计算互斥锁的最新状态
    更新互斥锁的状态并获取锁
// 等待时间
var waitStartTime int64
// 饥饿标记
starving := false
// 唤醒标记
awoke := false
// 自旋次数
iter := 0
// 当前的锁的状态
old := m.state
for {
// 步骤一
// 如果锁是正常状态,锁还没被释放,就自旋
// 因为饥饿模式下,需要保证等到队列中的 goroutine 能够获得锁的的所有权,防止等待队列饿死
// 如果锁在饥饿模式或已经解锁,或不符合自旋条件就结束自旋
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// 如果等待队列有 goroutine ,锁没有设置唤醒状态,就设置为唤醒
// 用来,当锁解锁时,不会去唤醒已经阻塞的 goroutine,保证自己更大概率拿到锁
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
// 自旋
runtime_doSpin()
// 自旋次数加1
iter++
// 设置当前锁的状态
old = m.state
continue
}
------------------------------------------------------------------------------>
// 步骤二
// 此时可能锁变为饥饿状态或者已经解锁了,或者不符合自旋条件
// 获取锁最新状态
new := old // 如果当前是正常模式,尝试加锁。
// 饥饿状态下要让出竞争权利,不能加锁
if old&mutexStarving == 0 {
new |= mutexLocked
}
// 如果当前被锁定或者处于饥饿模式,把自己放到等待队列,waiter加一,表示等待一个等待计数
// 这块的状态,goroutine 只能等着,饥饿状态要让出竞争权利
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
} // 如果已经是饥饿状态,starving为真,并且old 的锁是占用情况,更新状态改为饥饿状态
if starving && old&mutexLocked != 0 {
new |= mutexStarving
} // 如果awoke在上面自旋时设置成功,那么在这要消除标志位
// 因为该 goroutine 要么获得了锁,要么进入休眠,和唤醒状态没啥关系
// 后续流程会导致当前线程被挂起,需要等待其他释放锁的 goroutine 唤醒,
// 如果 unlock 是发现mutexWoken不是 0,就不会去唤醒
if awoke {
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
// 清除唤醒标志位
new &^= mutexWoken
}
------------------------------------------------------------------------------>
// 步骤三
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 1.如果原来状态没有上锁,也没有饥饿,那么直接返回,表示获取到锁
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
} // 2.到这里是没有获取到锁,判断一下等待时长是否不为0
// 如果新的 goroutine 来抢占锁,会返回 false
// 如果不是新的,那么加入到队列头部
// 保证等待最久的 goroutine 优先拿到锁
queueLifo := waitStartTime != 0 // 3.如果等待时间为0,那么初始化等待时间
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
// 如果不等于,说明不是第一次来,是被唤醒后过来的,则加入队列头部,queueLifo=true // 4.阻塞等待,sema+1,并挂起 goroutine,
// 如果后面 goroutine 被唤醒,就从该位置往下执行
runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 5.说明该 goroutine 被唤醒
// 判断该 goroutine 是否长时间没有获得锁,如果是,就是饥饿的 goroutine
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
// 被挂起的时间有点长,需要重新获取一下当前锁的状态
old = m.state // 6.判断是否已经处于饥饿状态,处于,直接获得锁,如果不处于直接跳出
// 饥饿状态下,被唤醒的协程直接获得锁。
if old&mutexStarving != 0 {
// 饥饿状态下,被唤醒,发现锁没释放,唤醒值是 1,等待列表没有,报错
if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
throw("sync: inconsistent mutex state")
}
delta := int32(mutexLocked - 1<<mutexWaiterShift) // 7.如果唤醒等待队列的 goroutine 不饥饿,或是等待队列中的最后一个 goroutine
if !starving || old>>mutexWaiterShift == 1 {
// 就从饥饿模式切换会正常模式
delta -= mutexStarving
} // 9.设置状态
// 将锁状态设置为等待数量减1,同时设置为锁定,加锁成功
atomic.AddInt32(&m.state, delta)
break
}
// 当前 goroutine 是被系统唤醒的
awoke = true
// 重置自旋次数
iter = 0
} else {
// 如果 CAS 失败,重新开始
old = m.state
}
}
if race.Enabled {
race.Acquire(unsafe.Pointer(m))
}

3、小结

4、自旋

自旋是一种多线程同步机制,当前的进程在进入自旋的过程中会一直保持 CPU 的占用,持续检查某个条件是否为真。

4.1、canSpin

runtime_canSpin(iter)

CPU核数要大于1,否则自旋没有意义,因为此时不可能有其他协程释放锁
当前Goroutine为了获取该锁进入自旋的次数 iter 小于四次
当前机器上至少存在一个正在运行 Process
处理的运行 G 队列为空,否则会延迟调度

它的实现方法链接到了sync_runtime_canSpin

4.2、doSpin

runtime_doSpin()

func sync_runtime_doSpin() {
procyield(active_spin_cnt)
} TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET

它的实现方法链接到了 sync_runtime_doSpin

会执行 30 次 PAUSE指令,每执行一次再检查是否可以加锁,循环进行。该过程中,进程仍是执行状态

4.3、优势

更充分的利用CPU,尽量避免 goroutine 切换。因为当前申请加锁的 goroutine 拥有CPU,如果经过短时间的自旋可以获得锁,当前协程可以继续运行,不必进入阻塞状态。

对于新来进程一直进行自旋加锁,排队中的进程长时间无法拿到锁,则设置饥饿状态,该状态下不允许自旋。

5、小结

    上来先一个 CAS ,如果锁正空闲,并且没人抢,那么加锁成功;
    否则,自旋几次,如果成功,也不用加入队列;
    否则,加入队列;
    从队列中被唤醒:
      正常模式:和新来的一起抢锁,大概率失败
      饥饿模式:肯定拿到锁

Go 互斥锁Mutex的相关教程结束。

《Go 互斥锁Mutex.doc》

下载本文的Word格式文档,以方便收藏与打印。