Go 源码之读写锁 sync.RWMutex
一、总结
sync.RWMutex
- 写锁 需要阻塞 写锁:一个协程拥有 写锁 时,其他协程 写锁定 需要阻塞
- 写锁 需要阻塞 读锁:一个协程拥有 写锁 时,其他协程 读锁定 需要阻塞
- 读锁 需要阻塞 写锁:一个协程拥有 读锁 时,其他协程 写锁定 需要阻塞
- 读锁 不能阻塞 读锁:一个协程拥有 读锁 时,其他协程也可以拥有 读锁
- 写操作通过 readerCount 的操作来阻止读操作的
二、源码
(一)RWMutex 数据结构
type RWMutex struct {
w Mutex // 写锁
writerSem uint32 // 缓冲信号量,获取写锁的阻塞等待信号队列
readerSem uint32 // 缓冲信号量,获取读锁的阻塞等待信号队列
readerCount int32 // 当前持有读锁的 goroutine 数量,负数表示有个写锁在执行
readerWait int32 // 获取写锁时,如果之前还有 readerWait 数量的读锁在执行,则需要等待执行完才能获取写锁
}
-
w
写锁
-
writerSem
缓冲信号量,当有 goroutine 获取写锁时,如果当前有读锁在占有,则调用 runtime_SemacquireMutex(&rw.writerSem, false, 0)
将当前 goroutine 进行睡眠,并排队到 writerSem 队列的队尾,等待所有的读锁释放之后再调用 runtime_Semrelease(&rw.writerSem, false, 1)进行唤醒
-
readerSem
缓冲信号量,当有 goroutine 获取读锁时,如果当前有写锁在占有(readerCount),则调用 runtime_SemacquireMutex(&rw.readerSem, false, 0),将当前 goroutine 进行睡眠,并排队到 readerSem 队列的队尾,等待写锁释放之后再调用 runtime_Semrelease(&rw.readerSem, false, 0)进行唤醒
-
readerCount
当前持有读锁的 goroutine 数量,负数表示有个写锁在执行,在获取写锁时,会将 readerCount-rwmutexMaxReaders 变为负数
写锁释放后 readerCount 会再 +rwmutexMaxReaders 变为正数
-
readerWait
首先读锁是会阻塞写锁的获取的,当一个 goroutine 尝试去获取一个写锁时,会将当前持有读锁的数量 readerCount 赋值给 readerWait,表示当前 goroutine 要等待 readerWait 个 goroutine 释放读锁之后才能成功获取写锁
(二)Lock
// 获取写锁,会等待所有的读锁释放
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 获取互斥锁.
rw.w.Lock()
// readerCount+(-rwmutexMaxReaders),将readerCount变为负数,表示阻塞的读数量
// 结果 + rwmutexMaxReaders 重新将结果变为正数,所以r的值就是readerCount的绝对值,
// 然后readerWait累加r,表示阻塞等待的读数量
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// r!=0表示当前还有读锁,需要将写锁加入writerSem队尾,等待唤醒
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 将当前goroutine进行睡眠,并排队到writerSem队列的队尾,等待
// 函数rUnlockSlow中当所有读锁readerWait=0释放之后,会唤醒写锁
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
(三)Unlock
// 唤醒因读锁定而被阻塞的协程(如果有的话)
// 解除互斥锁
func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 在上锁的时候,readerCount-rwmutexMaxReaders
// 所以解锁的时,再+rwmutexMaxReaders,使readerCount变为正数,表示无写锁存在
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders+)
if r >= rwmutexMaxReaders { // 读的数量超了
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 唤醒readerSem队列中的读等待goroutine
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放互斥锁
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}
(四)TryRLock
非阻塞读锁,和 Rlock 的区别在于:TryRLock 是非阻塞的,也就是当锁被占用时,直接返回 false,而 RLock 会自旋等待其他 goroutine 释放读锁
(五)Rlock
读锁定会先将 RWMutext.readerCount
加 1,此时写操作到来时发现读者数量不为 0,会阻塞等待所有读操作结束。
也就是说,读操作通过 readerCount
来将来阻止写操作的。
// 上读锁
func (rw *RWMutex) RLock() {
if race.Enabled { // 竞态检测
_ = rw.w.state
race.Disable()
}
// 增加读的次数readerCount+1
// 1、如果readerCount<0。说明写锁已经获取了,那么这个读锁需要等待写锁的完成
// 2、如果readerCount>=0。当前读直接获取锁
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 当写锁定进行时,会先将readerCount减去2^30(最大读个数),从而readerCount变成了负值,
// 此时再有读锁定到来时检测到readerCount为负值,便知道有写操作在进行,只好阻塞等待。
// 而真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可获得。
// 当前有个写锁,读操作阻塞等待写锁释放, 则当前goroutine进入休眠,在readerSem信号中排队(排到最后一位).
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
(六)RUnlock
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 读个数减1,readerCount+(-1)
// 1.若readerCount>0,证明当前还有读锁,直接结束本次操作
// 2.若readerCount<=0,证明已经没有读锁,可以唤醒写锁(若有)
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 唤醒可能等待的写锁
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
// 唤醒可能等待的写锁 ,r 是读者的数量
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 若等待读取的 goroutine(readerWait) 数量为0,则唤醒等待的写锁 (writerSem)
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
三、常见问题
1. 什么是 CAS,什么是原子操作
CAS(Compare and Swap)比较并交换,顾名思义:比较两个值,如果他们两者相等就把他们交换。这是一个由CPU硬件提供并实现的原子操作
原子操作:操作系统提高的锁机制来保证操作的原子性和线程安全性。这种锁机制可以使执行==原子操作的 CPU 独占内存总线==或者缓存,并防止其他 CPU 对同一内存地址进行读写操作,从而避免了数据竞争的问题。
具体来说,在执行原子操作时,CPU 会向内存总线或者缓存发送锁请求信号,然后等待锁授权。一旦锁授权成功,CPU 就可以将操作的结果写入内存,然后释放锁。其他 CPU 在锁被释放之前不能对同一内存地址进行读写操作,从而保证了操作的原子性和线程安全性。
需要注意的是,==原子操作增加 CPU 的开销和内存带宽的消耗==
2. 写操作是如何阻止写操作的
sync.Mutex 中的 state 字段标记了当前锁的状态,如果是锁定状态,那新的获取写锁的 goroutine 会自旋阻塞等待(sema 信号量)
3. 写操作是如何阻止读操作的
sync.RWMutex 中的 readCount 记录了当前持有读锁的数量,当有写锁获取时,readCount 会变为负数,因此在获取读锁时,如果 readCount<0 则表示当前写锁占有,读锁的获取操作阻塞等待(readerSem)
4. 读操作是如何阻止写操作的
sync.RWMutex 中的 readCount 记录了当前持有读锁的数量,当有写锁获取时,如果 readCount==0 则写锁获取成功,然后将 readCount 会变为负数,如果 readCount!=0,则会进行阻塞等待,排队到信号量 writerSem 队列中,直达所有的读锁都释放才唤醒
5. 为什么写锁定不会被饿死
go1.9 引入了饥饿模式,防止 goroutine 长时间阻塞
参考资料
【腾讯文档】Go 源码之锁 Mutex
https://docs.qq.com/doc/DQXNFUEt3WldjVmtS
(28 条消息) 每行代码都带注释,带你看懂 Go 互斥锁的源码_kevin_tech 的博客-CSDN 博客
(28 条消息) go sync.RWMutex 源码解析_puzzled li 的博客-CSDN 博客