一、概述
在etcd的clientv3包中,实现了分布式锁。使用起来和mutex
是类似的,为了了解其中的工作机制,这里简要的做一下总结。
二、使用方式
etcd分布式锁的实现在go.etcd.io/etcd/clientv3/concurrency
包中,主要提供了以下几个方法:
- func NewMutex(s *Session, pfx string) *Mutex, 用来新建一个mutex
- func (m *Mutex) Lock(ctx context.Context) error,它会阻塞直到拿到了锁,并且支持通过context来取消获取锁。
- func (m *Mutex) Unlock(ctx context.Context) error,解锁
因此在使用etcd提供的分布式锁式非常简单,通常就是实例化一个mutex,然后尝试抢占锁,之后进行业务处理,最后解锁即可。
一个简单的例子如下:
package main
import (
"context"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
"log"
"sync"
"time"
)
var n = 0
// 使用worker模拟锁的抢占
func worker(key string) error {
endpoints := []string{"127.0.0.1:2379"}
cfg := clientv3.Config{
Endpoints: endpoints,
DialTimeout: 3 * time.Second,
}
cli, err := clientv3.New(cfg)
if err != nil {
log.Println("new cli error:", err)
return err
}
sess, err := concurrency.NewSession(cli)
if err != nil {
return err
}
m := concurrency.NewMutex(sess, "/"+key)
err = m.Lock(context.TODO())
if err != nil {
log.Println("lock error:", err)
return err
}
defer func() {
err = m.Unlock(context.TODO())
if err != nil {
log.Println("unlock error:", err)
}
}()
log.Println("get lock: ", n)
n++
time.Sleep(time.Second) // 模拟执行代码
return nil
}
func main() {
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
err := worker("lockname")
if err != nil {
log.Println(err)
}
}()
go func() {
defer wg.Done()
err := worker("lockname")
if err != nil {
log.Println(err)
}
}()
go func() {
defer wg.Done()
err := worker("lockname")
if err != nil {
log.Println(err)
}
}()
wg.Wait()
}
三、实现机制
Lock()
函数的实现很简单。这里可以贴出来看一下:
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
首先通过一个事务来尝试加锁,这个事务主要包含了4个操作: cmp
、put
、get
、getOwner
。需要注意的是,key是由pfx
和Lease()
组成的。
- cmp: 比较加锁的key的修订版本是否是0。如果是0就代表这个锁不存在。
- put: 向加锁的key中存储一个空值,这个操作就是一个加锁的操作,但是这把锁是有超时时间的,超时的时间是session的默认时长。超时是为了防止锁没有被正常释放导致死锁。
- get: get就是通过key来查询
- getOwner: 注意这里是用
m.pfx
来查询的,并且带了查询参数WithFirstCreate()
。使用pfx
来查询是因为其他的session也会用同样的pfx
来尝试加锁,并且因为每个LeaseID都不同,所以第一次肯定会put
成功。但是只有最早使用这个pfx
的session
才是持有锁的,所以这个getOwner的含义就是这样的。
接下来才是通过判断来检查是否持有锁
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
m.myRev
是当前的版本号,resp.Succeeded
是cmp
为true时值为true,否则是false。这里的判断表明当同一个session非第一次尝试加锁,当前的版本号应该取这个key的最新的版本号。
下面是取得锁的持有者的key。如果当前没有人持有这把锁,那么默认当前会话获得了锁。或者锁持有者的版本号和当前的版本号一致, 那么当前的会话就是锁的持有者。
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
上面这段代码就很好理解了,因为走到这里说明没有获取到锁,那么这里等待锁的删除。
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
for {
resp, err := client.Get(ctx, pfx, getOpts...)
if err != nil {
return nil, err
}
if len(resp.Kvs) == 0 {
return resp.Header, nil
}
lastKey := string(resp.Kvs[0].Key)
if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
return nil, err
}
}
}
waitDeletes
方法的实现也很简单,但是需要注意的是,这里的getOpts
只会获取比当前会话版本号更低的key,然后去监控最新的key的删除。等这个key删除了,自己也就拿到锁了。
这种分布式锁的实现和我一开始的预想是不同的。它不存在锁的竞争,不存在重复的尝试加锁的操作。而是通过使用统一的前缀pfx
来put,然后根据各自的版本号来排队获取锁。效率非常的高。
如图所示,共有4个session来加锁,那么根据revision来排队,获取锁的顺序为session2 -> session3 -> session1 -> session4。
当然,这里为什么可以通过revision来判定获取锁的顺序,就需要更深入的了解etcd的内部机制以及raft协议了。