您现在的位置是:网站首页> 编程资料编程资料
golang中sync.Mutex的实现方法_Golang_
2023-05-26
425人已围观
简介 golang中sync.Mutex的实现方法_Golang_
mutex 的实现思想
mutex 主要有两个 method: Lock() 和 Unlock()
Lock() 可以通过一个 CAS 操作来实现
func (m *Mutex) Lock() { for !atomic.CompareAndSwapUint32(&m.locked, 0, 1) { } } func (m *Mutex) Unlock() { atomic.StoreUint32(&m.locked, 0) } Lock() 一直进行 CAS 操作,比较耗 CPU。因此带来了一个优化:如果协程在一段时间内抢不到锁,可以把该协程挂到一个等待队列上,Unlock() 的一方除了更新锁的状态,还需要从等待队列中唤醒一个协程。
但是这个优化会存在一个问题,如果一个协程从等待队列中唤醒后再次抢锁时,锁已经被一个新来的协程抢走了,它就只能再次被挂到等待队列中,接着再被唤醒,但又可能抢锁失败...... 这个悲催的协程可能会一直抢不到锁,由此产生饥饿 (starvation) 现象。
饥饿现象会导致尾部延迟 (Tail Latency) 特别高。什么是尾部延迟?用一句话来说就是:最慢的特别慢!
如果共有 1000 个协程,假设 999 个协程可以在 1ms 内抢到锁,虽然平均时间才 2ms,但是最慢的那个协程却需要 1s 才抢到锁,这就是尾部延迟。
golang 中 mutex 的实现思想
➜ go version go version go1.16.5 darwin/arm64
本次阅读的 go 源码版本为 go1.16.5。
golang 标准库里的 mutex 避免了饥饿现象的发生,先大致介绍一下 golang 的加锁和解锁流程,对后面的源码阅读有帮助。
锁有两种 mode,分别是 normal mode 和 starvation mode。初始为 normal mode,当一个协程来抢锁时,依旧是做 CAS 操作,如果成功了,就直接返回,如果没有抢到锁,它会做一定次数的自旋操作,等待锁被释放,在自旋操作结束后,如果锁依旧没有被释放,那么这个协程就会被放到等待队列中。如果一个处于等待队列中的协程一直都没有抢到锁,mutex 就会从 normal mode 变成 starvation mode,在 starvation mode 下,当有协程释放锁时,这个锁会被直接交给等待队列中的协程,从而避免产生饥饿线程。
除此之外,golang 还有一点小优化,当有协程正在自旋抢锁时,Unlock() 的一方不会从等待队列中唤醒协程,因为即使唤醒了,被唤醒的协程也抢不过正在自旋的协程。
下面正式开始阅读源码。
mutex 的结构以及一些 const 常量值
type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota // 3 // Mutex fairness. // // Mutex can be in 2 modes of operations: normal and starvation. // In normal mode waiters are queued in FIFO order, but a woken up waiter // does not own the mutex and competes with new arriving goroutines over // the ownership. New arriving goroutines have an advantage -- they are // already running on CPU and there can be lots of them, so a woken up // waiter has good chances of losing. In such case it is queued at front // of the wait queue. If a waiter fails to acquire the mutex for more than 1ms, // it switches mutex to the starvation mode. // // In starvation mode ownership of the mutex is directly handed off from // the unlocking goroutine to the waiter at the front of the queue. // New arriving goroutines don't try to acquire the mutex even if it appears // to be unlocked, and don't try to spin. Instead they queue themselves at // the tail of the wait queue. // // If a waiter receives ownership of the mutex and sees that either // (1) it is the last waiter in the queue, or (2) it waited for less than 1 ms, // it switches mutex back to normal operation mode. // // Normal mode has considerably better performance as a goroutine can acquire // a mutex several times in a row even if there are blocked waiters. // Starvation mode is important to prevent pathological cases of tail latency. starvationThresholdNs = 1e6 )
mutex 的状态是通过 state 来维护的,state 有 32 个 bit。
前面 29 个 bit 用来记录当前等待队列中有多少个协程在等待,将等待队列的协程数量记录为 waiterCount。
state >> mutexWaiterShift // mutexWaiterShift 的值为 3
第 30 个 bit 表示当前 mutex 是否处于 starvation mode,将这个 bit 记为 starvationFlag。
state & mutexStarving
第 31 个 bit 表示当前是否有协程正在 (第一次) 自旋,将这个 bit 记为 wokenFlag,woken 的意思也就是醒着,代表它不在等待队列上睡眠。
state & mutexWoken
第 32 个 bit 表示当前锁是否被锁了 (感觉有点绕口哈哈) ,将这个 bit 记为 lockFlag。
state & mutexLocked
用一个图来表示这些 bit
0 0 0 0 0 0 0 0 ... 0 0 0 0 | | | waiterCount starvationFlag wokenFlag lockFlag
sema 是一个信号量,它会被用来关联一个等待队列。
分别讨论几种 case 下,代码的执行情况。
Mutex 没有被锁住,第一个协程来拿锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } 在 Mutex 没有被锁住时,state 的值为 0,此时第一个协程来拿锁时,由于 state 的值为 0,因此 CAS 操作会成功,CAS 操作之后的 state 的值变成 1 (lockFlag = 1) ,然后 return 掉,不会进入到 m.lockSlow() 里面。
Mutex 仅被协程 A 锁住,没有其他协程抢锁,协程 A 释放锁
func (m *Mutex) Unlock() { // ... // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } 紧接上面,state 的值为 1,AddInt32(m.state,-1) 之后,state 的值变成了 0 (lockFlag = 0) ,new 的值为 0,然后就返回了。
Mutex 已经被协程 A 锁住,协程 B 来拿锁
func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { // ... return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } 因为 state 的值不为 0,CompareAndSwapInt32 会返回 false,所以会进入到 lockSlow() 里面
lockSlow()
首先看一下 lockSlow() 这个方法的全貌
func (m *Mutex) lockSlow() { var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to set mutexwokenFlag to inform Unlock // to not wake other blocked goroutines. if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // Don't try to acquire starving mutex, new arriving goroutines must queue. if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // The current goroutine switches mutex to starvation mode. // But if the mutex is currently unlocked, don't do the switch. // Unlock expects that starving mutex has waiters, which will not // be true in this case. if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // The goroutine has been woken from sleep, // so we need to reset the flag in either case. if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } 第一步: doSpin (空转)
进入 for 循环后,会执行一个判断
for { // Don't spin in starvation mode, ownership is handed off to waiters // so we won't be able to acquire the mutex anyway. if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // Active spinning makes sense. // Try to s
点击排行
本栏推荐
