此文的目的是我在使用etcd分布式锁的一些记录。

简单锁

此前了解到使用etcd分布式锁,是通过一个课程,这里是代码:JobLock.go

大致流程是这样的(如何自己来创建一把锁的逻辑也就是这样了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 尝试上锁
func (jobLock *JobLock) TryLock() (err error) {
// 1, 创建租约(5秒)

// 2, 自动续租

// 3, 处理续租应答的协程
go func() {
for {
// 自动续租的应答
}
}()

// 4, 创建事务txn
// 锁路径

// 5, 事务抢锁
// 提交事务

// 6, 成功返回, 失败释放租约

// 抢锁成功

FAIL:
// 取消自动续租
// 释放租约
}

ETCD Concurrency

但我到现在才知道,原来etcd golang库里头已经把这部分给实现了,Concurrency ,什么TryLock \ Lock都给实现了。

创建租约,自动续约的操作,则是函数concurrency.NewSession直接给封装好了。

不过这里锁的还有点不同,课程中的锁,是将一个key当作一把锁,大家去抢这把锁。

而库中实现的是将key prefix作为一把锁。

所以,在调用lock时,租约session需要为每把锁都创建一个,否则会报错:

1
2
3
4
5
6
7
8
9
10
11
12
13
s1, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
if err != nil {
log.Fatal(err)
}
defer s1.Close()
m1 := concurrency.NewMutex(s1, "/my-lock/")

s2, err := concurrency.NewSession(cli, concurrency.WithTTL(5))
if err != nil {
log.Fatal(err)
}
defer s2.Close()
m1 := concurrency.NewMutex(s2, "/my-lock/")

TryLock\Lock

可以看到这里有两个上锁方式:TryLock \ Lock

TryLockLock,多调用了一个waitDeletes 函数,这个函数模拟了一种公平的先来后到的排队逻辑,等待所有当前比当前 key 的 revision 小的 key 被删除后,锁释放后才返回。

另外此包还封装了一个sync.Locker的接口函数,提供我们使用:clientv3\concurrency

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type lockerMutex struct{ *Mutex }

func (lm *lockerMutex) Lock() {
client := lm.s.Client()
if err := lm.Mutex.Lock(client.Ctx()); err != nil {
panic(err)
}
}
func (lm *lockerMutex) Unlock() {
client := lm.s.Client()
if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
panic(err)
}
}

// NewLocker creates a sync.Locker backed by an etcd mutex.
func NewLocker(s *Session, pfx string) sync.Locker {
return &lockerMutex{NewMutex(s, pfx)}
}

本文并未对源码,以及相关联知识进行延申的讲解,这里贴上我认为对ETCD分布式锁讲解的比较好的博客:Etcd 应用开发之分布式锁。(博客中对revisionCreateRevision 似乎有些搅浑了,我后面查看代码时已改正)

群体效应问题 2020年9月21日

在学习论文ZooKeeper: Wait-free coordination for Internet-scale systems 时,看到了群体效应问题 ,简单理解就是:

1
如果很多客户端等待锁,对这个锁的竞争就会很激烈,当锁释放时,仅仅有一个等待的客户端获得锁。

例如,在博客开头,我们实现的伪代码就是一种有群体效应问题的简单锁结构。而etcd中实现的锁则规避了这一问题。移除一个仅仅会唤起一个客户端抢锁,通过先来先得的概念,避免了群体效应。

整体代码

这里结合着来看一下ETCD的代码:

concurrency.NewSession创建一个ETCD的租约,包含自动续租的操作;注意此处的租约(lease)每一次创建均不相同,所以即使是相同的程序,不同时间,每次使用的锁,也不是一样的。

1
2
3
4
5
s, err := concurrency.NewSession(client, concurrency.WithTTL(5))

m := concurrency.NewMutex(s, "/test/testTime")
ctx, _ = context.WithCancel(context.TODO())
err = m.Lock(ctx)

大体明白了锁的操作,接下来深入源码来看看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session

pfx string // 锁的共同前缀 pfx,如 "/service/lock/"
myKey string // 当前持有锁的客户端的 leaseid 值(完整 Key 的组成为 pfx+"/"+leaseid)
myRev int64 // revision,理解为当前持有锁的 Revision(修改数) 编号 或者是 CreateRevision
hdr *pb.ResponseHeader
}

// 例如:/test/lock + "/"
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()

// 下面的 m.pfx 就是 prefix,是传进来的前缀,后面的 s.Lease() 会返回一个租约,是一个 int64 的整数,和 session 有关系
//mykex 先理解为是 / prefix/leaseid 这样的结构
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())

// 这里定义一个 cmp 方法,比较上面 m.myKey 的 CreateRevision 是否为 0,等于 0 表示目前不存在该 key,需要执行 Put 操作,不等于 0 表示对应的 key 已经创建了,需要执行 Get 操作
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)

// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))

// reuse key in case this session already holds the lock
// 获取key是否已设置成锁
get := v3.OpGet(m.myKey)

// fetch current holder to complete uncontended path with only one RPC
// 获取当前锁的真正持有者
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)

// Txn 事务,判断 cmp 的条件是否成立,成立执行 Then,不成立执行 Else,最终执行 Commit()
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}

// if true; (put, getOwner)
m.myRev = resp.Header.Revision
// else if (get, getOwner)
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}

// if no key on prefix / the minimum rev is key, already hold the lock
// 获取当前实际拿到锁的KEY
ownerKey := resp.Responses[1].GetResponseRange().Kvs
// 比较如果当前没有人获得锁(第一次场景)
// 或者锁的 owner 的 CreateRevision 等于当前的 key 的 CreateRevision,则表示m.myKey即为拿到锁的key,不用新建,直接使用即可
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}

// wait for deletion revisions prior to myKey
// 走到这里代表没有获得锁,需要等待之前的锁被释放,即 CreateRevision 小于当前 CreateRevision 的 kv 被删除
// 阻塞等待 Owner 释放锁
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}

WithFirstCreate

此代码中getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// WithFirstCreate gets the key with the oldest creation revision in the request range.
func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }

// withTop gets the first key over the get's prefix given a sort order
func withTop(target SortTarget, order SortOrder) []OpOption {
return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
}

// WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
// on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
// can return 'foo1', 'foo2', and so on.
func WithPrefix() OpOption {
// 返回所有满足 prefix 匹配的 key-value,和 etcdctl get key --prefix 功能一样
return func(op *Op) {
if len(op.key) == 0 {
op.key, op.end = []byte{0}, []byte{0}
return
}
op.end = getPrefix(op.key)
}
}

看到上面的代码,WithPrefix/WithSort,所以 getOwner 的具体执行效果是会把虽有以 lockkey 开头的 key-value 都拿到,且按照 CreateRevision 升序排列,取第一个值,这个意思就很明白了,就是要拿到当前以 lockkey 为 prefix 的且 CreatereVision 最小的那个 key,就是目前已经拿到锁的 key;

waitDeletes

再看看 waitDeletes 函数的行为, waitDeletes 模拟了一种公平的先来后到的排队逻辑,等待所有当前比当前 key 的 revision 小的 key 被删除后,锁释放后才返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
// 注意 WithLastCreate 和 WithMaxCreateRev 这两个特性
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}

func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
cctx, cancel := context.WithCancel(ctx)
defer cancel()

var wr v3.WatchResponse
// wch 是个 channel,key 被删除后会往这个 chan 发数据
wch := client.Watch(cctx, key, v3.WithRev(rev))
for wr = range wch {
for _, ev := range wr.Events {
if ev.Type == mvccpb.DELETE {
return nil
}
}
}
if err := wr.Err(); err != nil {
return err
}
if err := ctx.Err(); err != nil {
return err
}
return fmt.Errorf("lost watcher waiting for delete")
}

Etcd 分布式锁的步骤

分析完 concurrency 的主要代码,不难总结出用 Etcd 构造(公平式长期)分布式锁的一般流程如下:

  1. 假设锁的 name 为 /root/lockname,用来控制某个共享资源,concurrency 会自动将其转换为目录形式:/root/lockname/

  2. 客户端 A 连接 Etcd,创建一个租约 Leaseid_A,并设置 TTL(以业务逻辑来定), 以 /root/lockname 为前缀创建全局唯一的 key,该 key 的组织形式为 / root/lockname/{leaseid_A};设置TXN条件事务

    判断条件:比较 /root/lockname/{leaseid_A}的 CreateRevision 是否为 0:

    等于 0 表示目前不存在该 key,Then(put, getOwner) :客户端 A 将此 Key 绑定租约写入 Etcd,同时调用 TXN 事务查询写入的情况和具有相同前缀 / root/lockname / 的 CreateRevision的排序情况;后续直接返回此key相关信息;

    不等于 0 表示对应的 key 已经创建了,Else(get, getOwner) :以前缀 /root/lockname/ 读取 keyValue 列表(keyValue 中带有 key 对应的 CreateRevision),判断自己 key 的 CreateRevision是否为当前列表中最小的,如果是则认为获得锁;否则阻塞监听列表中前一个 CreateRevision比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

  3. 执行业务逻辑,操作共享资源

  4. 释放分布式锁,现网的程序逻辑需要实现在正常和异常条件下的释放锁的策略,如捕获 SIGTERM 后执行 Unlock,或者异常退出时,有完善的监控和及时删除 Etcd 中的 Key 的异步机制,避免出现 “死锁” 现象

  5. 当客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务进行续约续期。如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁