Golang sync.Pool 源码详解【待完善】


结构

type Pool struct {
	noCopy noCopy				// noCopy表明该结构类型不可复制

	local     unsafe.Pointer // per-P数组的地址,实际类型为[P]poolLocal,用来存储每个P的poolLocal,pid为下标
	localSize uintptr        // local数组的大小,等于runtime.GOMAXPROCS

	victim     unsafe.Pointer // 上个垃圾回收周期的local
	victimSize uintptr        // victim的大小

	// New optionally specifies a function to generate
	// a value when Get would otherwise return nil.
	// It may not be changed concurrently with calls to Get.
	New func() interface{}
}

type poolLocalInternal struct {
	private interface{} // 表示只有当前P能读写该字段,潜台词没有并发读写问题
	shared  poolChain   // 双向链表,表示任意P都可访问,只是当前的P可以pushHead/popHead,而其它P可以popTail
}

// 每个P都拥有自己的poolLocal,P先从private上取对象执行,没有就去share上取,也没有就去偷
type poolLocal struct {
	poolLocalInternal	// 该结构继承自poolLocalInternal

	// 防止在多核cpu上发生fase sharing
	// 128 mod (cache line size) = 0 .
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

注意:全文所有注释里的p都是指GMP模型中p,而类名p用pool指代。

全局变量

var poolRaceHash [128]uint64

var (
	allPoolsMu Mutex // 全局allPools锁

	// allPools是持有非空主缓存的pool的集合,有两种方式可以保护它的读写
    // 1) allPoolsMu and pinning or 
    // 2) STW
	allPools []*Pool

	// oldPools 是持有非空victim缓存的pool集合
	// oldPools的读写仅可以被STW保护
	oldPools []*Pool
)

Put

// 将对象x放到pool中
func (p *Pool) Put(x interface{}) {
	if x == nil {
		return
	}
    // 竞争检测代码,不看
	if race.Enabled {
		if fastrandn(4) == 0 {
			// Randomly drop x on floor.
			return
		}
		race.ReleaseMerge(poolRaceAddr(x))
		race.Disable()
	}
    // 获取poolLocal和p的id,简称pid
	l, _ := p.pin()
    // 把对象x放到localpool中,如果private不空则将x放到shared中
	if l.private == nil {
		l.private = x
		x = nil
	}
	if x != nil {
		l.shared.pushHead(x)
	}
    // 解除p的 禁止抢占 状态
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
	}
}

Get

Get

func (p *Pool) Get() interface{} {
	if race.Enabled {
		race.Disable()
	}
    // 获取当前poolLocal 和 p的id
	l, pid := p.pin()
	x := l.private
	l.private = nil
    // 如果private没有对象,那么从share的头部取,
    // 如果share也是空的,那就去偷getSlow()
	if x == nil {
		x, _ = l.shared.popHead()
		if x == nil {
			x = p.getSlow(pid)
		}
	}
    // 解除p的 禁止抢占 模式,以后p就可以被抢占了
	runtime_procUnpin()
	if race.Enabled {
		race.Enable()
		if x != nil {
			race.Acquire(poolRaceAddr(x))
		}
	}
    // 如果偷也没偷到,就返回一个新对象
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

getSlow

func (p *Pool) getSlow(pid int) interface{} {
	// See the comment in pin regarding ordering of the loads.
	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	locals := p.local                            // load-consume
	// 尝试从其它P的poolLocal的share中头一个对象
	for i := 0; i < int(size); i++ {
        // (pid+i+1)%int(size) 保证返回的poolLocal绝对不是自己的
		l := indexLocal(locals, (pid+i+1)%int(size))
        // 拿到其它p的poolLocal只有popTail()操作被允许,可获得一个对象
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 如果上面没有偷到,还可以从上一轮gc遗留下来的poolLocal中取一个,
    // victim就是上一轮gc遗留的poolLocal
	size = atomic.LoadUintptr(&p.victimSize)
    // pid不能比victim的size大,否则越界,同时也说明P的数量发生了变化
	if uintptr(pid) >= size {
		return nil
	}
	locals = p.victim
	l := indexLocal(locals, pid)
    // 同样的先从自己一线的poolLocal中取,没有再去其它p以前的poolLocal中偷
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	for i := 0; i < int(size); i++ {
		l := indexLocal(locals, (pid+i)%int(size))
		if x, _ := l.shared.popTail(); x != nil {
			return x
		}
	}

	// 把victim的sieze置0,以后不再用了
    // 一次性的victim
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}
  • runtime_LoadAcquintptr(&ptr) 原子操作把值加载到内存,在多核cpu系统中保证该变量在使用期间不会被其它线程修改

    //go:nosplit
    //go:noinline
    func LoadAcq(ptr *uint32) uint32 {
    	return *ptr
    }
    

pin

pin

// pin()将当前g和p绑定在一起,不允许被抢占,
// 并且返回poolLocal 和 pid
// 注意:调用者必须在后调用runtime_procUnpin()来允许p被抢占!
func (p *Pool) pin() (*poolLocal, int) {
    // 获取当前p的pid
	pid := runtime_procPin()
	// 在pinSlow中先存储localSize然后存储local, 因此反过来获取
	// 因为已经禁止了抢占,此时不会发生gc
	// 因此,我们要先观察local,确认其大小至少为localSize
	// 如果local是全新的或者很大,都正常 (我们必须观察它的 zero-initialized-ness).
	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
	l := p.local                              // load-consume
    // 此时如果pid大于s,除了会发生越界,还暗示了这期间p的数量发生了改变
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
    // 如果pid大于localSize说明当前p还没有poolLocal,需要创建一个
	return p.pinSlow()
}

pinSlow

func (p *Pool) pinSlow() (*poolLocal, int) {
	// 取消p的禁止抢占,后面加全局锁时p必须是可抢占的
	runtime_procUnpin()
    // 加全局锁,
	allPoolsMu.Lock()
    // 别忘了解锁
	defer allPoolsMu.Unlock()
    //再让p禁止抢占,即被pin住,并取其pid
	pid := runtime_procPin()
	// p被pin住后,poolCleanup不会被调用
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		return indexLocal(l, pid), pid
	}
    // 如果这个p的local为空,就新建一个,并将其添加到allPools中
	if p.local == nil {
		allPools = append(allPools, p)
	}
	// 如果再GC期间GOMAXPROCS的值被改变了, 需要重新分配local数组,旧的local会被丢弃
	size := runtime.GOMAXPROCS(0)
	local := make([]poolLocal, size)
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
	return &local[pid], pid
}
  • 此处要加全局锁,可见程序运行时改变p的数量会付出多大代价。

init

init

func init() {
	runtime_registerPoolCleanup(poolCleanup)
}

poolCleanup

func poolCleanup() {
	// 这个方法实在垃圾回收开始时调用STW
	// STW期间将不会分配任何空间且不调用任何运行时函数

	// 因为在STW时,所以pool的所有者无法读取poollocal,实际上此时所有的p都被禁止抢占

	// 删除所有pools中的victim 缓存
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 将主缓存移动到victim缓存中去
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	// 具有非空主缓存的pools现在具有了非空victim缓存,同时所有的pool都不具有主缓存
	oldPools, allPools = allPools, nil
}

其他方法

indexLocal

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}