diff --git a/client/v3/concurrency/mutex.go b/client/v3/concurrency/mutex.go index a29c024d038..6cc5a9242fa 100644 --- a/client/v3/concurrency/mutex.go +++ b/client/v3/concurrency/mutex.go @@ -152,6 +152,7 @@ func (m *Mutex) IsOwner() v3.Cmp { } func (m *Mutex) Key() string { return m.myKey } +func (m *Mutex) Rev() int64 { return m.myRev } // Header is the response header received from etcd on acquiring the lock. func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr } @@ -171,7 +172,17 @@ func (lm *lockerMutex) Unlock() { } } -// NewLocker creates a sync.Locker backed by an etcd mutex. -func NewLocker(s *Session, pfx string) sync.Locker { +// NewLocker creates a DLocker backed by an etcd mutex. +func NewLocker(s *Session, pfx string) DLocker { return &lockerMutex{NewMutex(s, pfx)} } + +// A DLocker represents an object that can be locked and unlocked +// in distributed environment. +type DLocker interface { + sync.Locker + // Rev returns a revision which is monotonically incremental. It can + // be used as a fencing token to prevent expired locker from operating + // the shared resource. + Rev() int64 +} diff --git a/contrib/lock/README.md b/contrib/lock/README.md index d33630e25fa..e52e3feedef 100644 --- a/contrib/lock/README.md +++ b/contrib/lock/README.md @@ -64,7 +64,7 @@ If things go well the second client process invoked as `./client 2` finishes soo After checking this, please hit any key for `./client 1` and resume the process. It will show an output like below: ``` resuming client 1 -expected fail to write to storage with old lease version: error: given version (694d82254d5fa305) is different from the existing version (694d82254e18770a) +expected fail to write to storage with old lease version: error: given version (8) is smaller than the existing version (10) ``` [fencing]: https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html diff --git a/contrib/lock/client/client.go b/contrib/lock/client/client.go index ddb5498e55b..bd5e4073d59 100644 --- a/contrib/lock/client/client.go +++ b/contrib/lock/client/client.go @@ -121,11 +121,12 @@ func main() { locker := concurrency.NewLocker(session, "/lock") locker.Lock() defer locker.Unlock() - version := session.Lease() + leaseID := session.Lease() + version := locker.Rev() log.Printf("acquired lock, version: %x", version) if mode == 1 { - log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", version) + log.Printf("please manually revoke the lease using 'etcdctl lease revoke %x' or wait for it to expire, then start executing client 2 and hit any key...", leaseID) reader := bufio.NewReader(os.Stdin) _, _ = reader.ReadByte() log.Print("resuming client 1") @@ -133,7 +134,7 @@ func main() { log.Print("this is client 2, continuing\n") } - err = write("key0", fmt.Sprintf("value from client %x", mode), int64(version)) + err = write("key0", fmt.Sprintf("value from client %x", mode), version) if err != nil { if mode == 1 { log.Printf("expected fail to write to storage with old lease version: %s\n", err) // client 1 should show this message @@ -141,6 +142,6 @@ func main() { log.Fatalf("unexpected fail to write to storage: %s\n", err) } } else { - log.Printf("successfully write a key to storage using lease %x\n", int64(version)) + log.Printf("successfully write a key to storage with version %x\n", version) } } diff --git a/contrib/lock/storage/storage.go b/contrib/lock/storage/storage.go index 7e39e38f62d..54ffb227b00 100644 --- a/contrib/lock/storage/storage.go +++ b/contrib/lock/storage/storage.go @@ -78,8 +78,8 @@ func handler(w http.ResponseWriter, r *http.Request) { } } else if strings.Compare(req.Op, "write") == 0 { if val, ok := data[req.Key]; ok { - if req.Version != val.version { - writeResponse(response{"", -1, fmt.Sprintf("given version (%x) is different from the existing version (%x)", req.Version, val.version)}, w) + if req.Version < val.version { + writeResponse(response{"", -1, fmt.Sprintf("given version (%d) is smaller than the existing version (%d)", req.Version, val.version)}, w) } else { data[req.Key].val = req.Val data[req.Key].version = req.Version