From 553fb2b4cba50c8f2840671db375f8522b506419 Mon Sep 17 00:00:00 2001 From: Amir Malekpour Date: Thu, 23 Feb 2017 10:54:10 -0800 Subject: [PATCH 1/2] Zookeeper: Make Create+Put on a new znode atomic Writes to a new Zookeepr znode should take advantage of Zookeeper's atomic create + write primitive. If not, it is possible that a read that was triggered by a watch will return an empty string. The previous workaround for this does not always work (e.g., when an empty string is returnedi due to a race) and can potentially cause call-stack overflow. This change-set fixes this race and removes the workaround. It also adds a call to Zookeepeer's Sync() on a Get operation, only when an empty string (or SOH) is returned to guard against an older version of libkv doing create+write in a non atomic fashion. This change-set addresses github.com/docker/swarm/issues/1915 Signed-off-by: Amir Malekpour --- store/zookeeper/zookeeper.go | 58 ++++++++++++++++++++++++------------ 1 file changed, 39 insertions(+), 19 deletions(-) diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ff8d4ebe..ec256aea 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -13,7 +13,9 @@ const ( // SOH control character SOH = "\x01" - defaultTimeout = 10 * time.Second + defaultTimeout = 10 * time.Second + + syncRetryLimit = 5 ) // Zookeeper is the receiver type for @@ -66,19 +68,29 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(s.normalize(key)) - if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound + var resp []byte + var meta *zk.Stat + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, err = s.client.Get(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err } - return nil, err - } - // FIXME handle very rare cases where Get returns the - // SOH control character instead of the actual value - if string(resp) == SOH { - return s.Get(store.Normalize(key)) + if (string(resp) == SOH || string(resp) == "") && i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, err + } + } } pair = &store.KVPair{ @@ -91,14 +103,21 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { } // createFullPath creates the entire path for a directory -// that does not exist -func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { +// that does not exist and sets the value of the last +// znode to data +func (s *Zookeeper) createFullPath(path []string, data []byte, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") - if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + + if i == len(path) { + flag := 0 + if ephemeral { + flag = zk.FlagEphemeral + } + _, err := s.client.Create(newpath, data, int32(flag), zk.WorldACL(zk.PermAll)) return err } + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists @@ -121,13 +140,14 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro if !exists { if opts != nil && opts.TTL > 0 { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, true) } else { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, false) } + } else { + _, err = s.client.Set(fkey, value, -1) } - _, err = s.client.Set(fkey, value, -1) return err } @@ -313,7 +333,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Create the directory parts := store.SplitKey(strings.TrimSuffix(key, "/")) parts = parts[:len(parts)-1] - if err = s.createFullPath(parts, false); err != nil { + if err = s.createFullPath(parts, []byte{}, false); err != nil { // Failed to create the directory. return false, nil, err } From a9894d135ee6319a13e5388c5ef4acac56138175 Mon Sep 17 00:00:00 2001 From: Daniel Ferstay Date: Thu, 23 Mar 2017 10:43:39 -0700 Subject: [PATCH 2/2] Zookeeper: Ensure that Watch and WatchTree do not miss events refs CIO-39409 In the implementation of Store.Watch(), which watches for changes in a specific key, clients could miss changes in the value of interest due to the following pattern in the implementation: 1. get value of key 2. send value of key to client on channel 3. get value of key and set watch, but ignore value of key 4. when watch fires, get value of key and send value of key to client on channel The above has been reduced to: 1. get value of key and set watch, send value of key to client on channel In the implementation of Store.WatchTree(), which watches for changes in the children of specific key, clients could miss events due to the following pattern in the implementation: 1. get children of the key 2. send child list to the client over the channel 3. get children and set watch, but ignore the set of children 4. when watch fires, get children of key and send child list to the client over the channel Step 4 was problematic because any failure to get the children after the event was fired would result in clients missing the change to the set of children until the watch fired again due to a subsequent change. The implementation has been reduced to: 1. get children and set watch, send children to client on channel and retry immediately if values of child nodes could not be read Signed-off-by: Daniel Ferstay --- store/zookeeper/zookeeper.go | 201 ++++++++++++++++++++++------------- 1 file changed, 126 insertions(+), 75 deletions(-) diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ec256aea..a7bf008c 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -13,7 +13,7 @@ const ( // SOH control character SOH = "\x01" - defaultTimeout = 10 * time.Second + defaultTimeout = 10 * time.Second syncRetryLimit = 5 ) @@ -69,28 +69,9 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - var resp []byte - var meta *zk.Stat - - // To guard against older versions of libkv - // creating and writing to znodes non-atomically, - // We try to resync few times if we read SOH or - // an empty string - for i := 0; i <= syncRetryLimit; i++ { - resp, meta, err = s.client.Get(s.normalize(key)) - - if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound - } - return nil, err - } - - if (string(resp) == SOH || string(resp) == "") && i < syncRetryLimit { - if _, err = s.client.Sync(s.normalize(key)); err != nil { - return nil, err - } - } + resp, meta, err := s.get(key) + if err != nil { + return nil, err } pair = &store.KVPair{ @@ -175,32 +156,30 @@ func (s *Zookeeper) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the key first - pair, err := s.Get(key) - if err != nil { - return nil, err - } - // Catch zk notifications and fire changes into the channel. watchCh := make(chan *store.KVPair) go func() { defer close(watchCh) - // Get returns the current value to the channel prior - // to listening to any event that may occur on that key - watchCh <- pair + var fireEvt = true for { - _, _, eventCh, err := s.client.GetW(s.normalize(key)) + resp, meta, eventCh, err := s.getW(key) if err != nil { return } + if fireEvt { + watchCh <- &store.KVPair{ + Key: key, + Value: resp, + LastIndex: uint64(meta.Version), + } + } select { case e := <-eventCh: - if e.Type == zk.EventNodeDataChanged { - if entry, err := s.Get(key); err == nil { - watchCh <- entry - } - } + // Only fire an event if the data in the node changed. + // Simply reset the watch if this is any other event + // (e.g. a session event). + fireEvt = e.Type == zk.EventNodeDataChanged case <-stopCh: // There is no way to stop GetW so just quit return @@ -217,36 +196,35 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP // will be sent to the channel .Providing a non-nil stopCh can // be used to stop watching. func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // List the childrens first - entries, err := s.List(directory) - if err != nil { - return nil, err - } - // Catch zk notifications and fire changes into the channel. watchCh := make(chan []*store.KVPair) go func() { defer close(watchCh) - // List returns the children values to the channel - // prior to listening to any events that may occur - // on those keys - watchCh <- entries - + var fireEvt = true for { - _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) + WATCH: + keys, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) if err != nil { return } + if fireEvt { + kvs, err := s.getKVPairs(directory, keys) + if err != nil { + // Failed to get values for one or more of the keys, + // the list may be out of date so try again. + goto WATCH + } + watchCh <- kvs + } select { case e := <-eventCh: - if e.Type == zk.EventNodeChildrenChanged { - if kv, err := s.List(directory); err == nil { - watchCh <- kv - } - } + // Only fire an event if the children have changed. + // Simply reset the watch if this is any other event + // (e.g. a session event). + fireEvt = e.Type == zk.EventNodeChildrenChanged case <-stopCh: - // There is no way to stop GetW so just quit + // There is no way to stop ChildrenW so just quit return } } @@ -257,7 +235,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan // List child nodes of a given directory func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(s.normalize(directory)) + keys, _, err := s.client.Children(s.normalize(directory)) if err != nil { if err == zk.ErrNoNode { return nil, store.ErrKeyNotFound @@ -265,27 +243,16 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { return nil, err } - kv := []*store.KVPair{} - - // FIXME Costly Get request for each child key.. - for _, key := range keys { - pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) - if err != nil { - // If node is not found: List is out of date, retry - if err == store.ErrKeyNotFound { - return s.List(directory) - } - return nil, err + kvs, err := s.getKVPairs(directory, keys) + if err != nil { + // If node is not found: List is out of date, retry + if err == store.ErrKeyNotFound { + return s.List(directory) } - - kv = append(kv, &store.KVPair{ - Key: key, - Value: []byte(pair.Value), - LastIndex: uint64(stat.Version), - }) + return nil, err } - return kv, nil + return kvs, nil } // DeleteTree deletes a range of keys under a given directory @@ -447,3 +414,87 @@ func (s *Zookeeper) normalize(key string) string { key = store.Normalize(key) return strings.TrimSuffix(key, "/") } + +func (s *Zookeeper) get(key string) ([]byte, *zk.Stat, error) { + var resp []byte + var meta *zk.Stat + var err error + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, err = s.client.Get(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, nil, store.ErrKeyNotFound + } + return nil, nil, err + } + + if string(resp) != SOH && string(resp) != "" { + return resp, meta, nil + } + + if i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, nil, err + } + } + } + return resp, meta, nil +} + +func (s *Zookeeper) getW(key string) ([]byte, *zk.Stat, <-chan zk.Event, error) { + var resp []byte + var meta *zk.Stat + var ech <-chan zk.Event + var err error + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, ech, err = s.client.GetW(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, nil, nil, store.ErrKeyNotFound + } + return nil, nil, nil, err + } + + if string(resp) != SOH && string(resp) != "" { + return resp, meta, ech, nil + } + + if i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, nil, nil, err + } + } + } + return resp, meta, ech, nil +} + +func (s *Zookeeper) getKVPairs(directory string, keys []string) ([]*store.KVPair, error) { + kvs := []*store.KVPair{} + + for _, key := range keys { + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) + if err != nil { + return nil, err + } + + kvs = append(kvs, &store.KVPair{ + Key: key, + Value: pair.Value, + LastIndex: pair.LastIndex, + }) + } + + return kvs, nil +}