From 553fb2b4cba50c8f2840671db375f8522b506419 Mon Sep 17 00:00:00 2001 From: Amir Malekpour Date: Thu, 23 Feb 2017 10:54:10 -0800 Subject: [PATCH] Zookeeper: Make Create+Put on a new znode atomic Writes to a new Zookeepr znode should take advantage of Zookeeper's atomic create + write primitive. If not, it is possible that a read that was triggered by a watch will return an empty string. The previous workaround for this does not always work (e.g., when an empty string is returnedi due to a race) and can potentially cause call-stack overflow. This change-set fixes this race and removes the workaround. It also adds a call to Zookeepeer's Sync() on a Get operation, only when an empty string (or SOH) is returned to guard against an older version of libkv doing create+write in a non atomic fashion. This change-set addresses github.com/docker/swarm/issues/1915 Signed-off-by: Amir Malekpour --- store/zookeeper/zookeeper.go | 58 ++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ff8d4ebe..ec256aea 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -13,7 +13,9 @@ const ( // SOH control character SOH = "\x01" - defaultTimeout = 10 * time.Second + defaultTimeout = 10 * time.Second + + syncRetryLimit = 5 ) // Zookeeper is the receiver type for @@ -66,19 +68,29 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(s.normalize(key)) - if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound + var resp []byte + var meta *zk.Stat + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, err = s.client.Get(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err } - return nil, err - } - // FIXME handle very rare cases where Get returns the - // SOH control character instead of the actual value - if string(resp) == SOH { - return s.Get(store.Normalize(key)) + if (string(resp) == SOH || string(resp) == "") && i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, err + } + } } pair = &store.KVPair{ @@ -91,14 +103,21 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { } // createFullPath creates the entire path for a directory -// that does not exist -func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { +// that does not exist and sets the value of the last +// znode to data +func (s *Zookeeper) createFullPath(path []string, data []byte, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") - if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + + if i == len(path) { + flag := 0 + if ephemeral { + flag = zk.FlagEphemeral + } + _, err := s.client.Create(newpath, data, int32(flag), zk.WorldACL(zk.PermAll)) return err } + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists @@ -121,13 +140,14 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro if !exists { if opts != nil && opts.TTL > 0 { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, true) } else { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, false) } + } else { + _, err = s.client.Set(fkey, value, -1) } - _, err = s.client.Set(fkey, value, -1) return err } @@ -313,7 +333,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Create the directory parts := store.SplitKey(strings.TrimSuffix(key, "/")) parts = parts[:len(parts)-1] - if err = s.createFullPath(parts, false); err != nil { + if err = s.createFullPath(parts, []byte{}, false); err != nil { // Failed to create the directory. return false, nil, err }