diff --git a/store/zookeeper/zookeeper.go b/store/zookeeper/zookeeper.go index ff8d4ebe..a7bf008c 100644 --- a/store/zookeeper/zookeeper.go +++ b/store/zookeeper/zookeeper.go @@ -14,6 +14,8 @@ const ( SOH = "\x01" defaultTimeout = 10 * time.Second + + syncRetryLimit = 5 ) // Zookeeper is the receiver type for @@ -66,21 +68,12 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(s.normalize(key)) + resp, meta, err := s.get(key) if err != nil { - if err == zk.ErrNoNode { - return nil, store.ErrKeyNotFound - } return nil, err } - // FIXME handle very rare cases where Get returns the - // SOH control character instead of the actual value - if string(resp) == SOH { - return s.Get(store.Normalize(key)) - } - pair = &store.KVPair{ Key: key, Value: resp, @@ -91,14 +84,21 @@ func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { } // createFullPath creates the entire path for a directory -// that does not exist -func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { +// that does not exist and sets the value of the last +// znode to data +func (s *Zookeeper) createFullPath(path []string, data []byte, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") - if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + + if i == len(path) { + flag := 0 + if ephemeral { + flag = zk.FlagEphemeral + } + _, err := s.client.Create(newpath, data, int32(flag), zk.WorldACL(zk.PermAll)) return err } + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists @@ -121,13 +121,14 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro if !exists { if opts != nil && opts.TTL > 0 { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, true) } else { - s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), value, false) } + } else { + _, err = s.client.Set(fkey, value, -1) } - _, err = s.client.Set(fkey, value, -1) return err } @@ -155,32 +156,30 @@ func (s *Zookeeper) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the key first - pair, err := s.Get(key) - if err != nil { - return nil, err - } - // Catch zk notifications and fire changes into the channel. watchCh := make(chan *store.KVPair) go func() { defer close(watchCh) - // Get returns the current value to the channel prior - // to listening to any event that may occur on that key - watchCh <- pair + var fireEvt = true for { - _, _, eventCh, err := s.client.GetW(s.normalize(key)) + resp, meta, eventCh, err := s.getW(key) if err != nil { return } + if fireEvt { + watchCh <- &store.KVPair{ + Key: key, + Value: resp, + LastIndex: uint64(meta.Version), + } + } select { case e := <-eventCh: - if e.Type == zk.EventNodeDataChanged { - if entry, err := s.Get(key); err == nil { - watchCh <- entry - } - } + // Only fire an event if the data in the node changed. + // Simply reset the watch if this is any other event + // (e.g. a session event). + fireEvt = e.Type == zk.EventNodeDataChanged case <-stopCh: // There is no way to stop GetW so just quit return @@ -197,36 +196,35 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP // will be sent to the channel .Providing a non-nil stopCh can // be used to stop watching. func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // List the childrens first - entries, err := s.List(directory) - if err != nil { - return nil, err - } - // Catch zk notifications and fire changes into the channel. watchCh := make(chan []*store.KVPair) go func() { defer close(watchCh) - // List returns the children values to the channel - // prior to listening to any events that may occur - // on those keys - watchCh <- entries - + var fireEvt = true for { - _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) + WATCH: + keys, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) if err != nil { return } + if fireEvt { + kvs, err := s.getKVPairs(directory, keys) + if err != nil { + // Failed to get values for one or more of the keys, + // the list may be out of date so try again. + goto WATCH + } + watchCh <- kvs + } select { case e := <-eventCh: - if e.Type == zk.EventNodeChildrenChanged { - if kv, err := s.List(directory); err == nil { - watchCh <- kv - } - } + // Only fire an event if the children have changed. + // Simply reset the watch if this is any other event + // (e.g. a session event). + fireEvt = e.Type == zk.EventNodeChildrenChanged case <-stopCh: - // There is no way to stop GetW so just quit + // There is no way to stop ChildrenW so just quit return } } @@ -237,7 +235,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan // List child nodes of a given directory func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(s.normalize(directory)) + keys, _, err := s.client.Children(s.normalize(directory)) if err != nil { if err == zk.ErrNoNode { return nil, store.ErrKeyNotFound @@ -245,27 +243,16 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { return nil, err } - kv := []*store.KVPair{} - - // FIXME Costly Get request for each child key.. - for _, key := range keys { - pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) - if err != nil { - // If node is not found: List is out of date, retry - if err == store.ErrKeyNotFound { - return s.List(directory) - } - return nil, err + kvs, err := s.getKVPairs(directory, keys) + if err != nil { + // If node is not found: List is out of date, retry + if err == store.ErrKeyNotFound { + return s.List(directory) } - - kv = append(kv, &store.KVPair{ - Key: key, - Value: []byte(pair.Value), - LastIndex: uint64(stat.Version), - }) + return nil, err } - return kv, nil + return kvs, nil } // DeleteTree deletes a range of keys under a given directory @@ -313,7 +300,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Create the directory parts := store.SplitKey(strings.TrimSuffix(key, "/")) parts = parts[:len(parts)-1] - if err = s.createFullPath(parts, false); err != nil { + if err = s.createFullPath(parts, []byte{}, false); err != nil { // Failed to create the directory. return false, nil, err } @@ -427,3 +414,87 @@ func (s *Zookeeper) normalize(key string) string { key = store.Normalize(key) return strings.TrimSuffix(key, "/") } + +func (s *Zookeeper) get(key string) ([]byte, *zk.Stat, error) { + var resp []byte + var meta *zk.Stat + var err error + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, err = s.client.Get(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, nil, store.ErrKeyNotFound + } + return nil, nil, err + } + + if string(resp) != SOH && string(resp) != "" { + return resp, meta, nil + } + + if i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, nil, err + } + } + } + return resp, meta, nil +} + +func (s *Zookeeper) getW(key string) ([]byte, *zk.Stat, <-chan zk.Event, error) { + var resp []byte + var meta *zk.Stat + var ech <-chan zk.Event + var err error + + // To guard against older versions of libkv + // creating and writing to znodes non-atomically, + // We try to resync few times if we read SOH or + // an empty string + for i := 0; i <= syncRetryLimit; i++ { + resp, meta, ech, err = s.client.GetW(s.normalize(key)) + + if err != nil { + if err == zk.ErrNoNode { + return nil, nil, nil, store.ErrKeyNotFound + } + return nil, nil, nil, err + } + + if string(resp) != SOH && string(resp) != "" { + return resp, meta, ech, nil + } + + if i < syncRetryLimit { + if _, err = s.client.Sync(s.normalize(key)); err != nil { + return nil, nil, nil, err + } + } + } + return resp, meta, ech, nil +} + +func (s *Zookeeper) getKVPairs(directory string, keys []string) ([]*store.KVPair, error) { + kvs := []*store.KVPair{} + + for _, key := range keys { + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) + if err != nil { + return nil, err + } + + kvs = append(kvs, &store.KVPair{ + Key: key, + Value: pair.Value, + LastIndex: pair.LastIndex, + }) + } + + return kvs, nil +}