go源码之读写锁 sync.RWMutex

go源码之读写锁 sync.RWMutex

Go 源码之读写锁 sync.RWMutex

一、总结

sync.RWMutex

  • 写锁 需要阻塞 写锁:一个协程拥有 写锁 时,其他协程 写锁定 需要阻塞
  • 写锁 需要阻塞 读锁:一个协程拥有 写锁 时,其他协程 读锁定 需要阻塞
  • 读锁 需要阻塞 写锁:一个协程拥有 读锁 时,其他协程 写锁定 需要阻塞
  • 读锁 不能阻塞 读锁:一个协程拥有 读锁 时,其他协程也可以拥有 读锁
  • 写操作通过 readerCount 的操作来阻止读操作的

二、源码

(一)RWMutex 数据结构

type RWMutex struct {
	w           Mutex  // 写锁
	writerSem   uint32 // 缓冲信号量,获取写锁的阻塞等待信号队列
	readerSem   uint32 // 缓冲信号量,获取读锁的阻塞等待信号队列
	readerCount int32  // 当前持有读锁的 goroutine 数量,负数表示有个写锁在执行
	readerWait  int32  // 获取写锁时,如果之前还有 readerWait 数量的读锁在执行,则需要等待执行完才能获取写锁
}

  • w

    写锁

  • writerSem

    缓冲信号量,当有 goroutine 获取写锁时,如果当前有读锁在占有,则调用 runtime_SemacquireMutex(&rw.writerSem, false, 0)

    将当前 goroutine 进行睡眠,并排队到 writerSem 队列的队尾,等待所有的读锁释放之后再调用 runtime_Semrelease(&rw.writerSem, false, 1)进行唤醒

  • readerSem

    缓冲信号量,当有 goroutine 获取读锁时,如果当前有写锁在占有(readerCount),则调用 runtime_SemacquireMutex(&rw.readerSem, false, 0),将当前 goroutine 进行睡眠,并排队到 readerSem 队列的队尾,等待写锁释放之后再调用 runtime_Semrelease(&rw.readerSem, false, 0)进行唤醒

  • readerCount

    当前持有读锁的 goroutine 数量,负数表示有个写锁在执行,在获取写锁时,会将 readerCount-rwmutexMaxReaders 变为负数

    写锁释放后 readerCount 会再 +rwmutexMaxReaders 变为正数

  • readerWait

    首先读锁是会阻塞写锁的获取的,当一个 goroutine 尝试去获取一个写锁时,会将当前持有读锁的数量 readerCount 赋值给 readerWait,表示当前 goroutine 要等待 readerWait 个 goroutine 释放读锁之后才能成功获取写锁

(二)Lock

// 获取写锁,会等待所有的读锁释放
func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	//  获取互斥锁.
	rw.w.Lock()

	// readerCount+(-rwmutexMaxReaders),将readerCount变为负数,表示阻塞的读数量
	// 结果 + rwmutexMaxReaders 重新将结果变为正数,所以r的值就是readerCount的绝对值,
	// 然后readerWait累加r,表示阻塞等待的读数量
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// r!=0表示当前还有读锁,需要将写锁加入writerSem队尾,等待唤醒
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
		// 将当前goroutine进行睡眠,并排队到writerSem队列的队尾,等待
		// 函数rUnlockSlow中当所有读锁readerWait=0释放之后,会唤醒写锁
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

(三)Unlock

// 唤醒因读锁定而被阻塞的协程(如果有的话)
// 解除互斥锁
func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// 在上锁的时候,readerCount-rwmutexMaxReaders
  // 所以解锁的时,再+rwmutexMaxReaders,使readerCount变为正数,表示无写锁存在
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders+)
	if r >= rwmutexMaxReaders { // 读的数量超了
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 唤醒readerSem队列中的读等待goroutine
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 释放互斥锁
	rw.w.Unlock()
  
	if race.Enabled {
		race.Enable()
	}
}

(四)TryRLock

非阻塞读锁,和 Rlock 的区别在于:TryRLock 是非阻塞的,也就是当锁被占用时,直接返回 false,而 RLock 会自旋等待其他 goroutine 释放读锁

(五)Rlock

读锁定会先将 RWMutext.readerCount 加 1,此时写操作到来时发现读者数量不为 0,会阻塞等待所有读操作结束。

也就是说,读操作通过 readerCount 来将来阻止写操作的。

// 上读锁
func (rw *RWMutex) RLock() {
	if race.Enabled { // 竞态检测
		_ = rw.w.state
		race.Disable()
	}
	// 增加读的次数readerCount+1
	// 1、如果readerCount<0。说明写锁已经获取了,那么这个读锁需要等待写锁的完成
	// 2、如果readerCount>=0。当前读直接获取锁

	if atomic.AddInt32(&rw.readerCount, 1) < 0 {

		// 当写锁定进行时,会先将readerCount减去2^30(最大读个数),从而readerCount变成了负值,
		// 此时再有读锁定到来时检测到readerCount为负值,便知道有写操作在进行,只好阻塞等待。
		// 而真实的读操作个数并不会丢失,只需要将readerCount加上2^30即可获得。
		// 当前有个写锁,读操作阻塞等待写锁释放, 则当前goroutine进入休眠,在readerSem信号中排队(排到最后一位).
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

(六)RUnlock

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
	// 读个数减1,readerCount+(-1)
	// 1.若readerCount>0,证明当前还有读锁,直接结束本次操作
	// 2.若readerCount<=0,证明已经没有读锁,可以唤醒写锁(若有)
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 唤醒可能等待的写锁
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}
// 唤醒可能等待的写锁 ,r 是读者的数量
func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
	// 若等待读取的 goroutine(readerWait) 数量为0,则唤醒等待的写锁 (writerSem)
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// The last reader unblocks the writer.
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

三、常见问题

1. 什么是 CAS,什么是原子操作

CAS(Compare and Swap)比较并交换,顾名思义:比较两个值,如果他们两者相等就把他们交换。这是一个由CPU硬件提供并实现的原子操作

原子操作:操作系统提高的锁机制来保证操作的原子性和线程安全性。这种锁机制可以使执行==原子操作的 CPU 独占内存总线==或者缓存,并防止其他 CPU 对同一内存地址进行读写操作,从而避免了数据竞争的问题。

具体来说,在执行原子操作时,CPU 会向内存总线或者缓存发送锁请求信号,然后等待锁授权。一旦锁授权成功,CPU 就可以将操作的结果写入内存,然后释放锁。其他 CPU 在锁被释放之前不能对同一内存地址进行读写操作,从而保证了操作的原子性和线程安全性。

需要注意的是,==原子操作增加 CPU 的开销和内存带宽的消耗==

2. 写操作是如何阻止写操作的

sync.Mutex 中的 state 字段标记了当前锁的状态,如果是锁定状态,那新的获取写锁的 goroutine 会自旋阻塞等待(sema 信号量)

3. 写操作是如何阻止读操作的

sync.RWMutex 中的 readCount 记录了当前持有读锁的数量,当有写锁获取时,readCount 会变为负数,因此在获取读锁时,如果 readCount<0 则表示当前写锁占有,读锁的获取操作阻塞等待(readerSem)

4. 读操作是如何阻止写操作的

sync.RWMutex 中的 readCount 记录了当前持有读锁的数量,当有写锁获取时,如果 readCount==0 则写锁获取成功,然后将 readCount 会变为负数,如果 readCount!=0,则会进行阻塞等待,排队到信号量 writerSem 队列中,直达所有的读锁都释放才唤醒

5. 为什么写锁定不会被饿死

go1.9 引入了饥饿模式,防止 goroutine 长时间阻塞

参考资料

【腾讯文档】Go 源码之锁 Mutex
https://docs.qq.com/doc/DQXNFUEt3WldjVmtS

(28 条消息) 每行代码都带注释,带你看懂 Go 互斥锁的源码_kevin_tech 的博客-CSDN 博客

(28 条消息) go sync.RWMutex 源码解析_puzzled li 的博客-CSDN 博客