Golang sync.Pool 源码详解【待完善】
结构
type Pool struct {
noCopy noCopy // noCopy表明该结构类型不可复制
local unsafe.Pointer // per-P数组的地址,实际类型为[P]poolLocal,用来存储每个P的poolLocal,pid为下标
localSize uintptr // local数组的大小,等于runtime.GOMAXPROCS
victim unsafe.Pointer // 上个垃圾回收周期的local
victimSize uintptr // victim的大小
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
}
type poolLocalInternal struct {
private interface{} // 表示只有当前P能读写该字段,潜台词没有并发读写问题
shared poolChain // 双向链表,表示任意P都可访问,只是当前的P可以pushHead/popHead,而其它P可以popTail
}
// 每个P都拥有自己的poolLocal,P先从private上取对象执行,没有就去share上取,也没有就去偷
type poolLocal struct {
poolLocalInternal // 该结构继承自poolLocalInternal
// 防止在多核cpu上发生fase sharing
// 128 mod (cache line size) = 0 .
pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}
注意:全文所有注释里的p都是指GMP模型中p,而类名p用pool指代。
全局变量
var poolRaceHash [128]uint64
var (
allPoolsMu Mutex // 全局allPools锁
// allPools是持有非空主缓存的pool的集合,有两种方式可以保护它的读写
// 1) allPoolsMu and pinning or
// 2) STW
allPools []*Pool
// oldPools 是持有非空victim缓存的pool集合
// oldPools的读写仅可以被STW保护
oldPools []*Pool
)
Put
// 将对象x放到pool中
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
// 竞争检测代码,不看
if race.Enabled {
if fastrandn(4) == 0 {
// Randomly drop x on floor.
return
}
race.ReleaseMerge(poolRaceAddr(x))
race.Disable()
}
// 获取poolLocal和p的id,简称pid
l, _ := p.pin()
// 把对象x放到localpool中,如果private不空则将x放到shared中
if l.private == nil {
l.private = x
x = nil
}
if x != nil {
l.shared.pushHead(x)
}
// 解除p的 禁止抢占 状态
runtime_procUnpin()
if race.Enabled {
race.Enable()
}
}
Get
Get
func (p *Pool) Get() interface{} {
if race.Enabled {
race.Disable()
}
// 获取当前poolLocal 和 p的id
l, pid := p.pin()
x := l.private
l.private = nil
// 如果private没有对象,那么从share的头部取,
// 如果share也是空的,那就去偷getSlow()
if x == nil {
x, _ = l.shared.popHead()
if x == nil {
x = p.getSlow(pid)
}
}
// 解除p的 禁止抢占 模式,以后p就可以被抢占了
runtime_procUnpin()
if race.Enabled {
race.Enable()
if x != nil {
race.Acquire(poolRaceAddr(x))
}
}
// 如果偷也没偷到,就返回一个新对象
if x == nil && p.New != nil {
x = p.New()
}
return x
}
getSlow
func (p *Pool) getSlow(pid int) interface{} {
// See the comment in pin regarding ordering of the loads.
size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
locals := p.local // load-consume
// 尝试从其它P的poolLocal的share中头一个对象
for i := 0; i < int(size); i++ {
// (pid+i+1)%int(size) 保证返回的poolLocal绝对不是自己的
l := indexLocal(locals, (pid+i+1)%int(size))
// 拿到其它p的poolLocal只有popTail()操作被允许,可获得一个对象
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 如果上面没有偷到,还可以从上一轮gc遗留下来的poolLocal中取一个,
// victim就是上一轮gc遗留的poolLocal
size = atomic.LoadUintptr(&p.victimSize)
// pid不能比victim的size大,否则越界,同时也说明P的数量发生了变化
if uintptr(pid) >= size {
return nil
}
locals = p.victim
l := indexLocal(locals, pid)
// 同样的先从自己一线的poolLocal中取,没有再去其它p以前的poolLocal中偷
if x := l.private; x != nil {
l.private = nil
return x
}
for i := 0; i < int(size); i++ {
l := indexLocal(locals, (pid+i)%int(size))
if x, _ := l.shared.popTail(); x != nil {
return x
}
}
// 把victim的sieze置0,以后不再用了
// 一次性的victim
atomic.StoreUintptr(&p.victimSize, 0)
return nil
}
-
runtime_LoadAcquintptr(&ptr) 原子操作把值加载到内存,在多核cpu系统中保证该变量在使用期间不会被其它线程修改
//go:nosplit //go:noinline func LoadAcq(ptr *uint32) uint32 { return *ptr }
pin
pin
// pin()将当前g和p绑定在一起,不允许被抢占,
// 并且返回poolLocal 和 pid
// 注意:调用者必须在后调用runtime_procUnpin()来允许p被抢占!
func (p *Pool) pin() (*poolLocal, int) {
// 获取当前p的pid
pid := runtime_procPin()
// 在pinSlow中先存储localSize然后存储local, 因此反过来获取
// 因为已经禁止了抢占,此时不会发生gc
// 因此,我们要先观察local,确认其大小至少为localSize
// 如果local是全新的或者很大,都正常 (我们必须观察它的 zero-initialized-ness).
s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
l := p.local // load-consume
// 此时如果pid大于s,除了会发生越界,还暗示了这期间p的数量发生了改变
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 如果pid大于localSize说明当前p还没有poolLocal,需要创建一个
return p.pinSlow()
}
pinSlow
func (p *Pool) pinSlow() (*poolLocal, int) {
// 取消p的禁止抢占,后面加全局锁时p必须是可抢占的
runtime_procUnpin()
// 加全局锁,
allPoolsMu.Lock()
// 别忘了解锁
defer allPoolsMu.Unlock()
//再让p禁止抢占,即被pin住,并取其pid
pid := runtime_procPin()
// p被pin住后,poolCleanup不会被调用
s := p.localSize
l := p.local
if uintptr(pid) < s {
return indexLocal(l, pid), pid
}
// 如果这个p的local为空,就新建一个,并将其添加到allPools中
if p.local == nil {
allPools = append(allPools, p)
}
// 如果再GC期间GOMAXPROCS的值被改变了, 需要重新分配local数组,旧的local会被丢弃
size := runtime.GOMAXPROCS(0)
local := make([]poolLocal, size)
atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
runtime_StoreReluintptr(&p.localSize, uintptr(size)) // store-release
return &local[pid], pid
}
- 此处要加全局锁,可见程序运行时改变p的数量会付出多大代价。
init
init
func init() {
runtime_registerPoolCleanup(poolCleanup)
}
poolCleanup
func poolCleanup() {
// 这个方法实在垃圾回收开始时调用STW
// STW期间将不会分配任何空间且不调用任何运行时函数
// 因为在STW时,所以pool的所有者无法读取poollocal,实际上此时所有的p都被禁止抢占
// 删除所有pools中的victim 缓存
for _, p := range oldPools {
p.victim = nil
p.victimSize = 0
}
// 将主缓存移动到victim缓存中去
for _, p := range allPools {
p.victim = p.local
p.victimSize = p.localSize
p.local = nil
p.localSize = 0
}
// 具有非空主缓存的pools现在具有了非空victim缓存,同时所有的pool都不具有主缓存
oldPools, allPools = allPools, nil
}
其他方法
indexLocal
func indexLocal(l unsafe.Pointer, i int) *poolLocal {
lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
return (*poolLocal)(lp)
}