From cb5bd379564e53abdd1f8c4a94c02dd3d4ffca5a Mon Sep 17 00:00:00 2001 From: Daniel Ferstay Date: Fri, 17 Mar 2017 14:12:30 -0700 Subject: [PATCH] Zookeeper: Relinquish locks on session close The zookeeper store implements locking by using ephermeral nodes. On session close, all ephemeral nodes created by a client are removed, which means that other clients can acquire the lock. Previously, the zookeeper store failed to notify clients that the lock had been relinquished due to the session being closed (due to expiration or otherwise). Which means that the clients would continue execution while believing (wrongfully) that they still hold the lock. Now, the zookeeper store monitors the state of the session and notifies clients that the lock has been lost when the session is closed. Signed-off-by: Daniel Ferstay --- store/zookeeper/zookeeper.go | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ff8d4ebe..4c29334f 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -401,14 +401,16 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. func (l *zookeeperLock) Lock(stopChan chan struct{}) (<-chan struct{}, error) { err := l.lock.Lock() + lostCh := make(chan struct{}) if err == nil { // We hold the lock, we can set our value - // FIXME: The value is left behind - // (problematic for leader election) _, err = l.client.Set(l.key, l.value, -1) + if err == nil { + go l.monitorLock(stopChan, lostCh) + } } - return make(chan struct{}), err + return lostCh, err } // Unlock the "key". Calling unlock while @@ -427,3 +429,31 @@ func (s *Zookeeper) normalize(key string) string { key = store.Normalize(key) return strings.TrimSuffix(key, "/") } + +func (l *zookeeperLock) monitorLock(stopCh <-chan struct{}, lostCh chan struct{}) { + defer close(lostCh) + + for { + _, _, eventCh, err := l.client.GetW(l.key) + if err != nil { + // We failed to set watch, relinquish the lock + return + } + select { + case e := <-eventCh: + if e.Type == zk.EventNotWatching || + (e.Type == zk.EventSession && e.State == zk.StateExpired) { + // Either the session has been closed and our watch has been + // invalidated or the session has expired. + return + } else if e.Type == zk.EventNodeDataChanged { + // Somemone else has written to the lock node and believes + // that they have the lock. + return + } + case <-stopCh: + // The caller has requested that we relinquish our lock + return + } + } +}