-
Notifications
You must be signed in to change notification settings - Fork 222
feat : Implement slot-based MGET batching for cluster clients #908
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -50,12 +50,7 @@ func MGet(client Client, ctx context.Context, keys []string) (ret map[string]Red | |||||||
return clientMGet(client, ctx, client.B().Mget().Key(keys...).Build(), keys) | ||||||||
} | ||||||||
|
||||||||
cmds := mgetcmdsp.Get(len(keys), len(keys)) | ||||||||
defer mgetcmdsp.Put(cmds) | ||||||||
for i := range cmds.s { | ||||||||
cmds.s[i] = client.B().Get().Key(keys[i]).Build() | ||||||||
} | ||||||||
return doMultiGet(client, ctx, cmds.s, keys) | ||||||||
return clusterMGet(client, ctx, keys) | ||||||||
} | ||||||||
|
||||||||
// MSet is a helper that consults the redis directly with multiple keys by grouping keys within the same slot into MSETs or multiple SETs | ||||||||
|
@@ -139,12 +134,7 @@ func JsonMGet(client Client, ctx context.Context, keys []string, path string) (r | |||||||
return clientMGet(client, ctx, client.B().JsonMget().Key(keys...).Path(path).Build(), keys) | ||||||||
} | ||||||||
|
||||||||
cmds := mgetcmdsp.Get(len(keys), len(keys)) | ||||||||
defer mgetcmdsp.Put(cmds) | ||||||||
for i := range cmds.s { | ||||||||
cmds.s[i] = client.B().JsonGet().Key(keys[i]).Path(path).Build() | ||||||||
} | ||||||||
return doMultiGet(client, ctx, cmds.s, keys) | ||||||||
return clusterJsonMGet(client, ctx, keys, path) | ||||||||
} | ||||||||
|
||||||||
// JsonMSet is a helper that consults redis directly with multiple keys by grouping keys within the same slot into JSON.MSETs or multiple JSON.SETs | ||||||||
|
@@ -277,6 +267,72 @@ func arrayToKV(m map[string]RedisMessage, arr []RedisMessage, keys []string) map | |||||||
return m | ||||||||
} | ||||||||
|
||||||||
func clusterMGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) { | ||||||||
ret = make(map[string]RedisMessage, len(keys)) | ||||||||
slotCmds := intl.MGets(keys) | ||||||||
if len(slotCmds) == 0 { | ||||||||
return ret, nil | ||||||||
} | ||||||||
cmds := mgetcmdsp.Get(len(slotCmds), len(slotCmds)) | ||||||||
defer mgetcmdsp.Put(cmds) | ||||||||
i := 0 | ||||||||
for _, cmd := range slotCmds { | ||||||||
cmds.s[i] = cmd.Pin() | ||||||||
i++ | ||||||||
} | ||||||||
Comment on lines
+278
to
+282
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we put i into the loop? |
||||||||
resps := client.DoMulti(ctx, cmds.s...) | ||||||||
defer resultsp.Put(&redisresults{s: resps}) | ||||||||
for i, resp := range resps { | ||||||||
if err := resp.NonRedisError(); err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
Comment on lines
+286
to
+288
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
The below resp.ToArray should cover this already. |
||||||||
arr, err := resp.ToArray() | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
commands := cmds.s[i].Commands() | ||||||||
cmdKeys := commands[1:] | ||||||||
ret = arrayToKV(ret, arr, cmdKeys) | ||||||||
} | ||||||||
for i := range cmds.s { | ||||||||
intl.PutCompletedForce(cmds.s[i]) | ||||||||
} | ||||||||
return ret, nil | ||||||||
} | ||||||||
|
||||||||
func clusterJsonMGet(client Client, ctx context.Context, keys []string, path string) (ret map[string]RedisMessage, err error) { | ||||||||
ret = make(map[string]RedisMessage, len(keys)) | ||||||||
slotCmds := intl.JsonMGets(keys, path) | ||||||||
if len(slotCmds) == 0 { | ||||||||
return ret, nil | ||||||||
} | ||||||||
cmds := mgetcmdsp.Get(len(slotCmds), len(slotCmds)) | ||||||||
defer mgetcmdsp.Put(cmds) | ||||||||
i := 0 | ||||||||
for _, cmd := range slotCmds { | ||||||||
cmds.s[i] = cmd.Pin() | ||||||||
i++ | ||||||||
} | ||||||||
Comment on lines
+311
to
+315
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we put i into the loop? |
||||||||
resps := client.DoMulti(ctx, cmds.s...) | ||||||||
defer resultsp.Put(&redisresults{s: resps}) | ||||||||
for i, resp := range resps { | ||||||||
if err := resp.NonRedisError(); err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
Comment on lines
+319
to
+321
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
The below resp.ToArray() should cover this already. |
||||||||
arr, err := resp.ToArray() | ||||||||
if err != nil { | ||||||||
return nil, err | ||||||||
} | ||||||||
commands := cmds.s[i].Commands() | ||||||||
cmdKeys := commands[1 : len(commands)-1] | ||||||||
ret = arrayToKV(ret, arr, cmdKeys) | ||||||||
} | ||||||||
for i := range cmds.s { | ||||||||
intl.PutCompletedForce(cmds.s[i]) | ||||||||
} | ||||||||
return ret, nil | ||||||||
} | ||||||||
|
||||||||
// ErrMSetNXNotSet is used in the MSetNX helper when the underlying MSETNX response is 0. | ||||||||
// Ref: https://redis.io/commands/msetnx/ | ||||||||
var ErrMSetNXNotSet = errors.New("MSETNX: no key was set") | ||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When could this happen?