etcd实现分布式锁


前言

分布式锁要解决两个问题:

1、锁竞争

2、死锁

以redis为例,redis提供了setnx来保证原子写入,只有一个客户端能写入成功,也就能成功获得锁。同时为了防止客户端异常导致锁没有及时释放,可以对这个锁设置过期s时间,命令如下:

SET lock_name my_random_value NX PX 30000

除了锁自动过期以外,还需要能手动释放锁,命令如下:

del lock_name

etcd的实现方式

etcd提供了以下几种特性来实现分布式锁:

  • Lease机制

    租约机制(TTL,Time To Live),etcd 可以为存储的 key-value 对设置租约,当租约到期,key-value 将失效删除;

    同时也支持续约,通过客户端可以在租约到期之前续约,以避免 key-value 对过期失效。

    Lease机制可以保证分布式锁的安全性,为锁对应的 key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。

  • Revision机制

    每个 key 带有一个 Revision 号,每进行一次事务便+1,它是全局唯一的,通过 Revision 的大小就可以知道进行写操作的顺序。

    在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” ,实现公平锁。这和zookeeper的临时顺序节点+监听机制可避免羊群效应的原理是一致的。

  • Prefix机制

    即前缀机制。例如,一个名为 /etcd/lock 的锁,两个争抢它的客户端进行写操作,实际写入的 key 分别为:key1="/etcd/lock/UUID1",key2="/etcd/lock/UUID2"。其中,UUID 表示全局唯一的 ID,确保两个 key 的唯一性。

    写操作都会成功,但返回的 Revision 不一样,那么,如何判断谁获得了锁呢?通过前缀 /etcd/lock 查询,返回包含两个 key-value 对的的 KeyValue 列表,同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁 。

  • Watch机制

    即监听机制。Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制)。当被 Watch 的 key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,可通过 Prefix 机制返回的 Key-Value 列表获得 Revision 比自己小且相差最小的 key(称为 pre-key),对 pre-key 进行监听,因为只有它释放锁,自己才能获得锁,如果 Watch 到 pre-key 的 DELETE 事件,则说明 pre-key 已经释放,自己将持有锁。

实现流程如下:

  1. 建立连接

    客户端连接 etcd,以 /etcd/lock 为前缀创建全局唯一的 key,假设第一个客户端对应的 key="/etcd/lock/UUID1",第二个为 key="/etcd/lock/UUID2";客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定。

  2. 创建定时任务作为租约的“心跳”
    当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁。

  3. 客户端将自己全局唯一的 key 写入 etcd
    执行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,
    假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用来判断自己是否获得锁。

  4. 客户端判断是否获得锁
    客户端以前缀 /etcd/lock/ 读取 key-Value 列表,判断自己 key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

  5. 执行业务

    获得锁后,操作共享资源,执行业务代码。

Demo

官方包里(github.com/coreos/etcd/clientv3/concurrency)已经实现了上述流程,我们只需要做下简单的调用就可以实现分布式锁了。

package main

import (
    "context"
    "fmt"
    "github.com/coreos/etcd/clientv3"
    "github.com/coreos/etcd/clientv3/concurrency"
    "log"
    "os"
    "os/signal"
    "time"
)

func main() {
    c := make(chan os.Signal)
    signal.Notify(c)

    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    prefix := "/lock"

    go func () {
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, prefix)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go1 get mutex failed " + err.Error())
        }
        fmt.Printf("go1 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(10) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go1 release lock\n")
    }()

    go func() {
        time.Sleep(time.Duration(2) * time.Second)
        session, err := concurrency.NewSession(cli)
        if err != nil {
            log.Fatal(err)
        }
        m := concurrency.NewMutex(session, prefix)
        if err := m.Lock(context.TODO()); err != nil {
            log.Fatal("go2 get mutex failed " + err.Error())
        }
        fmt.Printf("go2 get mutex sucess\n")
        fmt.Println(m)
        time.Sleep(time.Duration(2) * time.Second)
        m.Unlock(context.TODO())
        fmt.Printf("go2 release lock\n")
    }()

    <-c
}

上述代码里起了两个协程,分别实例化两个session及Mutex对象,并执行加锁和释放锁的操作,下面看看源码包里是怎么进行加锁的。

func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }

    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmp、put、get、getOwner。需要注意的是,key是由pfx和Lease()组成的。

  • cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
  • put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
  • get: get就是通过key来查询
  • getOwner: 注意这里是用m.pfx来查询的,并且带了查询参数WithFirstCreate(),它以m.pfx为前缀去查询所有key,根据创建version正排序,取最前面的一个值,即最早的那个key ,即Revsion最小的那个。

接下来才是通过判断来检查是否持有锁

m.myRev = resp.Header.Revision
if !resp.Succeeded {
    m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    m.hdr = resp.Header
    return nil
}

m.myRev是当前的版本号,resp.Succeeded是cmp为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。

然后判断getOwner返回的最小key,如如果没有这个key或者,或者这个key版本号和当前的版本号一致,则获取到这个锁。

继续往下走:

// wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr

走到这里说明没有获取到锁,那么这里等待锁的删除。

waitDeletes方法会监听比当前会话版本号更低的key的删除事件。一旦这个key删除了,自己也就拿到锁了。

总结

redis的实现方式会竞争同一个锁,但在etcd里客户端是拿自己key(锁)的版本号和前缀里最小的版本号的key比较,如果相同则获得锁。