Golang sync.RWMutex 源码分析


结构

type RWMutex struct {
	w           Mutex  // 写操作的互斥锁
	writerSem   uint32 // 写信号量,表示写者等待读者读完
	readerSem   uint32 // 读信号量,表示读者等待写者写完
	readerCount int32  // 读者计数器
	readerWait  int32  // 获取写锁时需要等待的写者的数量,用于防止写者饿死
}

常量

const rwmutexMaxReaders = 1 << 30	// 最多支持的读锁数量

RLock

RLock

func (rw *RWMutex) RLock() {
    // 竞争检测代码,不看
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
    // 写者在写的时候会将readerCount减去rwmutexMaxReaders,
    // 表示如果readerCount小于0说明还有写者在写
    // 如果(readerCount+1) < 0 表示至少有一个写者在写,新来的读者阻塞
	if atomic.AddInt32(&rw.readerCount, 1) < 0 {
		// 原语:将这个读者放到读锁的等待队列的尾部
		runtime_SemacquireMutex(&rw.readerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
	}
}

TryRLock

func (rw *RWMutex) TryRLock() bool {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	for {
        // 原语:把readerCount的值读入内存
		c := atomic.LoadInt32(&rw.readerCount)
        // readerCount<0说明有读者在执行,不能加锁
		if c < 0 {
			if race.Enabled {
				race.Enable()
			}
			return false
		}
        // CAS操作:如果readerCount和c相等,则可以直接抢锁
        // 只要readerCount不小于0,读者抢锁只需要readerCount加1
		if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(&rw.readerSem))
			}
			return true
		}
	}
}

  • atomic.LoadInt32(&SB) 原子操作:把SB的值读入内存

    func Load(ptr *uint32) uint32 {
    	return *ptr
    }
    

    用途:

    1. 兼容不同指令集
    2. 多和系统内保证内存的读写同步

RUnlock

RUnlock

func (rw *RWMutex) RUnlock() {
	if race.Enabled {
		_ = rw.w.state
		race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
		race.Disable()
	}
    // 如果r >= 0,说明至少还有一个读者在工作,直接解锁成功
    // 如果r < 0,说明所有的读者都已经读完了,下一步可能要释放一个阻塞的读者
	if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
		// 决定要不要释放一个读者
		rw.rUnlockSlow(r)
	}
	if race.Enabled {
		race.Enable()
	}
}

rUnlockSlow

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		race.Enable()
		throw("sync: RUnlock of unlocked RWMutex")
	}
    // (readerWait-1) == 0 说明比写者早到的读者都已经读完了
    // 接下来轮到读者读
    // readerWait作用:为了防止读者饿死,go规定只要比读者早到的写者们写完就要轮到读者执行
	if atomic.AddInt32(&rw.readerWait, -1) == 0 {
		// 释放读锁阻塞队列上首部的读者去读
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Lock

Lock

func (rw *RWMutex) Lock() {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
	// 先加读互斥锁,因为一个读者的操作和其他所有读者都互斥
	rw.w.Lock()
	// 先给readerCount减去rwmutexMaxReaders,方便与写者实现互斥
    // r代表目前写者的总数
    // 很明显,readerCount变成了不大于0的数,意用于
    // 阻止晚于当前读者到达的写者获得写锁,后到的读者会自行阻塞
	r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
	// 如果r=0说明没有写者了,直接加锁成功
    // 如果在这个写者前面还有读者在执行,那就阻塞这个写者,等读者都执行完
	if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
        // 把这个写者放到写锁阻塞队列尾部
		runtime_SemacquireMutex(&rw.writerSem, false, 0)
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
}

TryLock

func (rw *RWMutex) TryLock() bool {
	if race.Enabled {
		_ = rw.w.state
		race.Disable()
	}
    // 尝试加互斥锁
	if !rw.w.TryLock() {
		if race.Enabled {
			race.Enable()
		}
		return false
	}
    // 如果有读者那就尝试加锁失败,尝试加锁不需要写者阻塞等待
	if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
		rw.w.Unlock()
		if race.Enabled {
			race.Enable()
		}
		return false
	}
	if race.Enabled {
		race.Enable()
		race.Acquire(unsafe.Pointer(&rw.readerSem))
		race.Acquire(unsafe.Pointer(&rw.writerSem))
	}
	return true
}

Unlock

func (rw *RWMutex) Unlock() {
	if race.Enabled {
		_ = rw.w.state
		race.Release(unsafe.Pointer(&rw.readerSem))
		race.Disable()
	}

	// 先把readerCount的值还原
	r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
	if r >= rwmutexMaxReaders {
		race.Enable()
		throw("sync: Unlock of unlocked RWMutex")
	}
	// 释放所有阻塞的读者
	for i := 0; i < int(r); i++ {
		runtime_Semrelease(&rw.readerSem, false, 0)
	}
	// 释放读互斥锁,其它读者可以尝试抢锁了
	rw.w.Unlock()
	if race.Enabled {
		race.Enable()
	}
}