看看 Singleflight


源码地址:https://cs.opensource.google/go/x/sync/+/036812b2:singleflight/singleflight.go
singleflight

type call struct {
   wg sync.WaitGroup
   val interface{}
   err error
   forgotten bool
   // 统计被拦截调用fn的请求数,可以不关心
   dups  int
   // DoChan 中用的,先不关心
   chans []chan<- Result
}

type Group struct {
   mu sync.Mutex       // protects m
   m  map[string]*call // lazily initialized
}

// 个人理解:有请求进来时,在map里赋值,之后的请求只要key 相同,就都等待第一个进去的执行fn()
// 等第一个执行完后,等待的都使用第一个请求的结果
// 配合Forget 可以放更多的请求执行fn()
func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
   g.mu.Lock()
   if g.m == nil {
      g.m = make(map[string]*call)
   }
  // 个人理解:如果ok=true,说明已经有fn()已经执行或执行完毕
   if c, ok := g.m[key]; ok {
      c.dups++
      g.mu.Unlock()
      // fn()还没执行完,在这儿等着吧
      c.wg.Wait()

      if e, ok := c.err.(*panicError); ok {
         panic(e)
      } else if c.err == errGoexit {
         runtime.Goexit()
      }
      return c.val, c.err, true
   }
   c := new(call)
   // 个人理解:我要开始了,你们都别动
   c.wg.Add(1)
   g.m[key] = c
   g.mu.Unlock()

   g.doCall(c, key, fn)
   return c.val, c.err, c.dups > 0
}

// 调用fn()
func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   normalReturn := false
   recovered := false

   defer func() {
 
      if !normalReturn && !recovered {
         c.err = errGoexit
      }

      c.wg.Done()
      g.mu.Lock()
      defer g.mu.Unlock()
      if !c.forgotten {
         delete(g.m, key)
      }

      if e, ok := c.err.(*panicError); ok {
         if len(c.chans) > 0 {
            go panic(e)
            select {} 
         } else {
            panic(e)
         }
      } else if c.err == errGoexit {
      } else {
         for _, ch := range c.chans {
            ch <- Result{c.val, c.err, c.dups > 0}
         }
      }
   }()

   func() {
      defer func() {
         // 个人理解:当fn() panic的时候在这里recover,预防死锁
         if !normalReturn {
            if r := recover(); r != nil {
               c.err = newPanicError(r)
            }
         }
      }()

      c.val, c.err = fn()
      normalReturn = true
   }()

   if !normalReturn {
      recovered = true
   }
}

// 个人理解:释放这个key,使更多的请求调用请求到下游服务。当调用下游服务需要消耗大量时间时,
// 并发环境下,在调用下游服务时,只有一个请求会调用到下游服务,其他请求会被阻塞,当这个请求失败时,会严重影响效率。
// 如果启动Goroutine 定时Forget,则会释放更多的请求到下游服务,提高并发和容错。
func (g *Group) Forget(key string) {
   g.mu.Lock()
   if c, ok := g.m[key]; ok {
      c.forgotten = true
   }
   delete(g.m, key)
   g.mu.Unlock()
}