go语言sync.Mutex学习


sync.Mutex简单介绍

syncM.Mutex是Go语言当中最常用的互斥锁,用来解决高并发下共享资源的访问问题。在并发下多个协程对同一个数据同时获取和修改,然后写入的过程中会有冲突,导致最终数据出错。

func main() {
 var count  = 0
 var wg sync.WaitGroup
 wg.Add(10)
 //var lock sync.Mutex
 for i := 0; i < 10; i++ {
 	go func() {
 		defer wg.Done()
 		//lock.Lock()
 		for j:=0 ; j<10000 ; j++{
 			count++
 		}
 		//lock.Unlock()
 	}()

 }
 wg.Wait()
 fmt.Println("count:=",count)
}
//运行结果
//count:= 47441

Go语言提供了一个工具可以检查共享资源的访问是否存在问题。使用go run -race demo1.go ,如果存在问题就会输出警告信息。而在Go语言中这个问题非常好解决,把上面的注释去掉,用锁来控制协程对共享资源的访问。

sync.Mutex的使用原理

第一版的互斥锁

第一版的Mutex是在2008年的时候,Mutex本身有两个字段,flag 32和sema uint32。

  • flag 用来表示持有和等待锁的数量,0表示锁未被持有,1表示锁被持有,n表示锁被持有,还有n-1个等待者。
  • sema 表示等待者队列所使用的信号量。
  • Lock()方法:Mutex内部会用循环CAS操作flag,如果获取到了锁,就将flag赋值为1,如果别的goroutine早已经获取到了锁,则将flag的值加1,同时用信号量将自己休眠,等锁释放的时候,会通过信号量唤醒自己。
  • Unlock()方法:持有锁的goroutine使用Unlock()方法释放锁的时候,会将key的值减一,如果此时没有其它goroutine想获取这个锁,则直接返回,如果有,则通过信号量唤醒等待状态下goroutine中的一个。
  • 注意事项 unlock()方法并没有检查锁的状态,也就是说即使goroutine没有获取到锁也可以执行unlock()方法。会出现runtime error,在使用过程中锁的获取和释放成对出现。比如
type s struct{
   sync.Mutex
   count int
}
//并发安全方法
func (s *s)add(){
   s.lock()
   defer s.unlock()
   s.count++
}

现在的互斥锁原理

  • flag 改为 state,state是一个复合型字段,用不同部分的比特来表示数据,分为4部分,与下面的常量进行异或运算得到数据。
 const(
	mutexLocked = 1 << iota // 持有锁的标记
	mutexWoken				//唤醒标记
	mutexStarving			//锁的饥饿标记
	mutexWaiterShift = iota //阻塞等待的waiter数量
 )
  • Lock()方法
    检测方法改为检查state字段中的标志,如果没有goroutine持有锁,也没有等待锁的goroutine,那个当前的goroutine可以直接获取到锁。如果新来的或者唤醒的goroutine首次获取不到锁,那么会在for循环中先自旋一定次数进行尝试(主要优化了一些临界区耗时时间非常少的场景,锁的释放速度比较快,先自旋可以使得获取锁的速度变快。),如果获取不到,就runtime.Semacquire(&m.sema)休眠,休眠醒来时继续争抢锁,并不会直接获取到锁,而是和新来的goroutine一同争抢。这时Mutex会有两种状态,正常和饥饿,
    • 正常状态:等待队列都是先入先出队列,被唤醒的等待协程会和新来的一起竞争,一定情况下被唤醒的等待协程获取不到锁,然后会将这个协程放到队列的前面,如果等待协程获取不到锁的时间超过starvationThresholdNs = 1e6纳秒,那么Mutex会进入到饥饿状态
    • 饥饿状态: Mutex就会将锁直接交给等待队列最前面的协程,新来的goroutine不会获取锁,即使锁没有被持有,它会加入到等待队列的尾部。如果这个等待协程是队列中的最后一个,或者等待协程获取锁的时间少于1e6纳秒,Mutex会重新变为正常状态。
  • Unlock()方法
    先尝试将持有锁的标识设置为未加锁的状态,这是通过减1而不是将标志位置零的方式实现。检测原来锁的状态是否已经未加锁的状态,如果是Unlock一个未加锁的Mutex直接panic。不过,即使将加锁置为未加锁的状态,这个方法也不能直接返回,还需要一些额外的操作, 因为还可能有一等待这个锁的goroutine (有时候我也把它们称之为waiter)要通过信号的方式唤醒它们中的一个。所以接下来的逻辑有两种情况。第一种情况,如果没有其它的waiter,说明对这个锁的竞争的goroutine只有一个,那就可以值接返回了;如果这个时候有唤醒的goroutine,或者仅被别人加了锁,那么,无需我们操劳,goroutine自己干得都很好,当前的这个goroutine就可以放心返回了。第二种情况,如果有等待者,并且没有唤醒的waiter,那就需要唤醒一个等待的 waiter。在唤醒之前,需要将waiter数量减1,并且将mutexWoken标志设置上,这样,Unlock可以返回了。
  • 源码
func (m *Mutex) Lock() {
   // Fast path: 快路:直接就能获取到锁
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
   	if race.Enabled {
   		race.Acquire(unsafe.Pointer(m))
   	}
   	return
   }
   // Slow path (outlined so that the fast path can be inlined)
   //慢路:尝试自旋或者饥饿状态下饥饿goroutine竞争。
   m.lockSlow()
}

func (m *Mutex) lockSlow() {
   var waitStartTime int64
   starving := false//这个goroutine的饥饿标记
   awoke := false//唤醒标记
   iter := 0//自旋次数
   old := m.state//当前锁的状态
   for {
   	// Don't spin in starvation mode, ownership is handed off to waiters
   	// so we won't be able to acquire the mutex anyway.
   	//锁是非饥饿状态,非被释放,尝试自旋
   	if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   		// Active spinning makes sense.
   		// Try to set mutexWoken flag to inform Unlock
   		// to not wake other blocked goroutines.
   		if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
   			atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
   			awoke = true
   		}
   		//自旋操作
   		runtime_doSpin()
   		iter++
   		old = m.state//再次获取锁的状态,会重复检查
   		continue
   	}
   	new := old
   	// Don't try to acquire starving mutex, new arriving goroutines must queue.
   	
   	if old&mutexStarving == 0 {
   		new |= mutexLocked//非饥饿状态,加锁
   	}
   	if old&(mutexLocked|mutexStarving) != 0 {
   		new += 1 << mutexWaiterShift//等待数量加一
   	}
   	// The current goroutine switches mutex to starvation mode.
   	// But if the mutex is currently unlocked, don't do the switch.
   	// Unlock expects that starving mutex has waiters, which will not
   	// be true in this case.
   	if starving && old&mutexLocked != 0 {
   		new |= mutexStarving//设置饥饿状态
   	}
   	if awoke {
   		// The goroutine has been woken from sleep,
   		// so we need to reset the flag in either case.
   		if new&mutexWoken == 0 {
   			throw("sync: inconsistent mutex state")
   		}
   		new &^= mutexWoken//新状态清除唤醒标记
   	}
   	//成功设置饥饿状态
   	if atomic.CompareAndSwapInt32(&m.state, old, new) {
   		//获取到了锁
   		if old&(mutexLocked|mutexStarving) == 0 {
   			break // locked the mutex with CAS
   		}
   		// If we were already waiting before, queue at the front of the queue.
   		//如果以前就在队列里面,加入到队列头
   		queueLifo := waitStartTime != 0
   		if waitStartTime == 0 {
   			waitStartTime = runtime_nanotime()
   		}
   		//阻塞等待
   		runtime_SemacquireMutex(&m.sema, queueLifo, 1)
   		//唤醒后检查锁是否处于饥饿状态
   		starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
   		old = m.state
   		//处于饥饿状态下的话直接抢到锁并返回。
   		if old&mutexStarving != 0 {
   			// If this goroutine was woken and mutex is in starvation mode,
   			// ownership was handed off to us but mutex is in somewhat
   			// inconsistent state: mutexLocked is not set and we are still
   			// accounted as waiter. Fix that.
   			if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
   				throw("sync: inconsistent mutex state")
   			}
   			//加锁,等待数量减一
   			delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 {
   				// Exit starvation mode.
   				// Critical to do it here and consider wait time.
   				// Starvation mode is so inefficient, that two goroutines
   				// can go lock-step infinitely once they switch mutex
   				// to starvation mode.
   				delta -= mutexStarving
   			}
   			atomic.AddInt32(&m.state, delta)
   			break
   		}
   		awoke = true
   		iter = 0
   	} else {
   		old = m.state
   	}
   }

   if race.Enabled {
   	race.Acquire(unsafe.Pointer(m))
   }
}

// Unlock unlocks m.
// It is a run-time error if m is not locked on entry to Unlock.
//
// A locked Mutex is not associated with a particular goroutine.
// It is allowed for one goroutine to lock a Mutex and then
// arrange for another goroutine to unlock it.
func (m *Mutex) Unlock() {
   if race.Enabled {
   	_ = m.state
   	race.Release(unsafe.Pointer(m))
   }

   // Fast path: drop lock bit.
   //释放锁,
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
   	// Outlined slow path to allow inlining the fast path.
   	// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
   	m.unlockSlow(new)
   }
}

func (m *Mutex) unlockSlow(new int32) {
   //在这里判断当前写是否获取到了锁
   if (new+mutexLocked)&mutexLocked == 0 {
   	throw("sync: unlock of unlocked mutex")
   }
   if new&mutexStarving == 0 {
   	old := new
   	for {
   		// If there are no waiters or a goroutine has already
   		// been woken or grabbed the lock, no need to wake anyone.
   		// In starvation mode ownership is directly handed off from unlocking
   		// goroutine to the next waiter. We are not part of this chain,
   		// since we did not observe mutexStarving when we unlocked the mutex above.
   		// So get off the way.
   		//如果没有其它的waiter,说明对这个锁的竞争的goroutine只有一个,那就可以值接返回了
   		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
   			return
   		}
   		// Grab the right to wake someone.
   		//唤醒等待队列中的协程。
   		new = (old - 1<

使用注意事项

  1. lock()和Unlock()一定要成对出现,在一个方法内部,可以先lock(),然后defer Unlock()。
  2. 在方法参数上传递Mutex要使用地址的发送传递,直接使用(lock sync.Mutex)传递的是值,也就是副本,在并发场景下不能确定Mutex的具体状态。这种情况下可以使用go vet demo.go 进行检测
  3. Mutex是不可重入锁,无法重复获取锁。
    • 解决方法
    1. 获取协程的ID,然后在加锁和解锁的时候判断是不是同一个协程。
    2. 使用token区分不同的协程。
  • 以上内容是在极客时间Go专栏学习做的笔记。