Golang sync.RWMutex 源码分析
结构
type RWMutex struct {
w Mutex // 写操作的互斥锁
writerSem uint32 // 写信号量,表示写者等待读者读完
readerSem uint32 // 读信号量,表示读者等待写者写完
readerCount int32 // 读者计数器
readerWait int32 // 获取写锁时需要等待的写者的数量,用于防止写者饿死
}
常量
const rwmutexMaxReaders = 1 << 30 // 最多支持的读锁数量
RLock

RLock
func (rw *RWMutex) RLock() {
// 竞争检测代码,不看
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 写者在写的时候会将readerCount减去rwmutexMaxReaders,
// 表示如果readerCount小于0说明还有写者在写
// 如果(readerCount+1) < 0 表示至少有一个写者在写,新来的读者阻塞
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 原语:将这个读者放到读锁的等待队列的尾部
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
}
TryRLock
func (rw *RWMutex) TryRLock() bool {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
for {
// 原语:把readerCount的值读入内存
c := atomic.LoadInt32(&rw.readerCount)
// readerCount<0说明有读者在执行,不能加锁
if c < 0 {
if race.Enabled {
race.Enable()
}
return false
}
// CAS操作:如果readerCount和c相等,则可以直接抢锁
// 只要readerCount不小于0,读者抢锁只需要readerCount加1
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
return true
}
}
}
-
atomic.LoadInt32(&SB) 原子操作:把SB的值读入内存
func Load(ptr *uint32) uint32 { return *ptr }
用途:
- 兼容不同指令集
- 多和系统内保证内存的读写同步
RUnlock

RUnlock
func (rw *RWMutex) RUnlock() {
if race.Enabled {
_ = rw.w.state
race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
race.Disable()
}
// 如果r >= 0,说明至少还有一个读者在工作,直接解锁成功
// 如果r < 0,说明所有的读者都已经读完了,下一步可能要释放一个阻塞的读者
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// 决定要不要释放一个读者
rw.rUnlockSlow(r)
}
if race.Enabled {
race.Enable()
}
}
rUnlockSlow
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// (readerWait-1) == 0 说明比写者早到的读者都已经读完了
// 接下来轮到读者读
// readerWait作用:为了防止读者饿死,go规定只要比读者早到的写者们写完就要轮到读者执行
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// 释放读锁阻塞队列上首部的读者去读
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
Lock

Lock
func (rw *RWMutex) Lock() {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 先加读互斥锁,因为一个读者的操作和其他所有读者都互斥
rw.w.Lock()
// 先给readerCount减去rwmutexMaxReaders,方便与写者实现互斥
// r代表目前写者的总数
// 很明显,readerCount变成了不大于0的数,意用于
// 阻止晚于当前读者到达的写者获得写锁,后到的读者会自行阻塞
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 如果r=0说明没有写者了,直接加锁成功
// 如果在这个写者前面还有读者在执行,那就阻塞这个写者,等读者都执行完
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
// 把这个写者放到写锁阻塞队列尾部
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
}
TryLock
func (rw *RWMutex) TryLock() bool {
if race.Enabled {
_ = rw.w.state
race.Disable()
}
// 尝试加互斥锁
if !rw.w.TryLock() {
if race.Enabled {
race.Enable()
}
return false
}
// 如果有读者那就尝试加锁失败,尝试加锁不需要写者阻塞等待
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
return false
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
race.Acquire(unsafe.Pointer(&rw.writerSem))
}
return true
}
Unlock

func (rw *RWMutex) Unlock() {
if race.Enabled {
_ = rw.w.state
race.Release(unsafe.Pointer(&rw.readerSem))
race.Disable()
}
// 先把readerCount的值还原
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 释放所有阻塞的读者
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放读互斥锁,其它读者可以尝试抢锁了
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
}