Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 68 additions & 12 deletions helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,7 @@ func MGet(client Client, ctx context.Context, keys []string) (ret map[string]Red
return clientMGet(client, ctx, client.B().Mget().Key(keys...).Build(), keys)
}

cmds := mgetcmdsp.Get(len(keys), len(keys))
defer mgetcmdsp.Put(cmds)
for i := range cmds.s {
cmds.s[i] = client.B().Get().Key(keys[i]).Build()
}
return doMultiGet(client, ctx, cmds.s, keys)
return clusterMGet(client, ctx, keys)
}

// MSet is a helper that consults the redis directly with multiple keys by grouping keys within the same slot into MSETs or multiple SETs
Expand Down Expand Up @@ -139,12 +134,7 @@ func JsonMGet(client Client, ctx context.Context, keys []string, path string) (r
return clientMGet(client, ctx, client.B().JsonMget().Key(keys...).Path(path).Build(), keys)
}

cmds := mgetcmdsp.Get(len(keys), len(keys))
defer mgetcmdsp.Put(cmds)
for i := range cmds.s {
cmds.s[i] = client.B().JsonGet().Key(keys[i]).Path(path).Build()
}
return doMultiGet(client, ctx, cmds.s, keys)
return clusterJsonMGet(client, ctx, keys, path)
}

// JsonMSet is a helper that consults redis directly with multiple keys by grouping keys within the same slot into JSON.MSETs or multiple JSON.SETs
Expand Down Expand Up @@ -277,6 +267,72 @@ func arrayToKV(m map[string]RedisMessage, arr []RedisMessage, keys []string) map
return m
}

func clusterMGet(client Client, ctx context.Context, keys []string) (ret map[string]RedisMessage, err error) {
ret = make(map[string]RedisMessage, len(keys))
slotCmds := intl.MGets(keys)
if len(slotCmds) == 0 {
return ret, nil
}
Comment on lines +273 to +275
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could this happen?

cmds := mgetcmdsp.Get(len(slotCmds), len(slotCmds))
defer mgetcmdsp.Put(cmds)
i := 0
for _, cmd := range slotCmds {
cmds.s[i] = cmd.Pin()
i++
}
Comment on lines +278 to +282
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put i into the loop?

resps := client.DoMulti(ctx, cmds.s...)
defer resultsp.Put(&redisresults{s: resps})
for i, resp := range resps {
if err := resp.NonRedisError(); err != nil {
return nil, err
}
Comment on lines +286 to +288
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := resp.NonRedisError(); err != nil {
return nil, err
}

The below resp.ToArray should cover this already.

arr, err := resp.ToArray()
if err != nil {
return nil, err
}
commands := cmds.s[i].Commands()
cmdKeys := commands[1:]
ret = arrayToKV(ret, arr, cmdKeys)
}
for i := range cmds.s {
intl.PutCompletedForce(cmds.s[i])
}
return ret, nil
}

func clusterJsonMGet(client Client, ctx context.Context, keys []string, path string) (ret map[string]RedisMessage, err error) {
ret = make(map[string]RedisMessage, len(keys))
slotCmds := intl.JsonMGets(keys, path)
if len(slotCmds) == 0 {
return ret, nil
}
cmds := mgetcmdsp.Get(len(slotCmds), len(slotCmds))
defer mgetcmdsp.Put(cmds)
i := 0
for _, cmd := range slotCmds {
cmds.s[i] = cmd.Pin()
i++
}
Comment on lines +311 to +315
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put i into the loop?

resps := client.DoMulti(ctx, cmds.s...)
defer resultsp.Put(&redisresults{s: resps})
for i, resp := range resps {
if err := resp.NonRedisError(); err != nil {
return nil, err
}
Comment on lines +319 to +321
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if err := resp.NonRedisError(); err != nil {
return nil, err
}

The below resp.ToArray() should cover this already.

arr, err := resp.ToArray()
if err != nil {
return nil, err
}
commands := cmds.s[i].Commands()
cmdKeys := commands[1 : len(commands)-1]
ret = arrayToKV(ret, arr, cmdKeys)
}
for i := range cmds.s {
intl.PutCompletedForce(cmds.s[i])
}
return ret, nil
}

// ErrMSetNXNotSet is used in the MSetNX helper when the underlying MSETNX response is 0.
// Ref: https://redis.io/commands/msetnx/
var ErrMSetNXNotSet = errors.New("MSETNX: no key was set")
Expand Down
70 changes: 43 additions & 27 deletions helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,22 @@ func TestMGetCache(t *testing.T) {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate DisabledCache DoCache", func(t *testing.T) {
keys := make([]string, 100)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"}
m.DoMultiFn = func(cmd ...Completed) *redisresults {
result := make([]RedisResult, len(cmd))
for i, key := range keys {
if !reflect.DeepEqual(cmd[i].Commands(), []string{"GET", key}) {
t.Fatalf("unexpected command %v", cmd)
for i, c := range cmd {
// Each command should be MGET with keys from the same slot
commands := c.Commands()
if commands[0] != "MGET" {
t.Fatalf("expected MGET command, got %v", commands)
return nil
}
result[i] = newResult(strmsg('+', key), nil)
// Build response array with values matching the keys
values := make([]RedisMessage, len(commands)-1)
for j := 1; j < len(commands); j++ {
values[j-1] = strmsg('+', commands[j])
}
result[i] = newResult(slicemsg('*', values), nil)
}
return &redisresults{s: result}
}
Expand All @@ -200,7 +204,7 @@ func TestMGetCache(t *testing.T) {
}
for _, key := range keys {
if vKey, ok := v[key]; !ok || vKey.string() != key {
t.Fatalf("unexpected response %v", v)
t.Fatalf("unexpected response for key %s: %v", key, v)
}
}
})
Expand Down Expand Up @@ -358,18 +362,22 @@ func TestMGet(t *testing.T) {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate Do", func(t *testing.T) {
keys := make([]string, 100)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"}
m.DoMultiFn = func(cmd ...Completed) *redisresults {
result := make([]RedisResult, len(cmd))
for i, key := range keys {
if !reflect.DeepEqual(cmd[i].Commands(), []string{"GET", key}) {
t.Fatalf("unexpected command %v", cmd)
for i, c := range cmd {
// Each command should be MGET with keys from the same slot
commands := c.Commands()
if commands[0] != "MGET" {
t.Fatalf("expected MGET command, got %v", commands)
return nil
}
result[i] = newResult(strmsg('+', key), nil)
// Build response array with values matching the keys
values := make([]RedisMessage, len(commands)-1)
for j := 1; j < len(commands); j++ {
values[j-1] = strmsg('+', commands[j])
}
result[i] = newResult(slicemsg('*', values), nil)
}
return &redisresults{s: result}
}
Expand All @@ -379,7 +387,7 @@ func TestMGet(t *testing.T) {
}
for _, key := range keys {
if vKey, ok := v[key]; !ok || vKey.string() != key {
t.Fatalf("unexpected response %v", v)
t.Fatalf("unexpected response for key %s: %v", key, v)
}
}
})
Expand Down Expand Up @@ -1162,18 +1170,26 @@ func TestJsonMGet(t *testing.T) {
t.Fatalf("unexpected err %v", err)
}
t.Run("Delegate Do", func(t *testing.T) {
keys := make([]string, 100)
for i := range keys {
keys[i] = strconv.Itoa(i)
}
keys := []string{"{slot1}a", "{slot1}b", "{slot2}a", "{slot2}b"}
m.DoMultiFn = func(cmd ...Completed) *redisresults {
result := make([]RedisResult, len(cmd))
for i, key := range keys {
if !reflect.DeepEqual(cmd[i].Commands(), []string{"JSON.GET", key, "$"}) {
t.Fatalf("unexpected command %v", cmd)
for i, c := range cmd {
// Each command should be JSON.MGET with keys from the same slot and path at the end
commands := c.Commands()
if commands[0] != "JSON.MGET" {
t.Fatalf("expected JSON.MGET command, got %v", commands)
return nil
}
result[i] = newResult(strmsg('+', key), nil)
if commands[len(commands)-1] != "$" {
t.Fatalf("expected $ as last parameter, got %v", commands)
return nil
}
// Build response array with values matching the keys (exclude the path)
values := make([]RedisMessage, len(commands)-2)
for j := 1; j < len(commands)-1; j++ {
values[j-1] = strmsg('+', commands[j])
}
result[i] = newResult(slicemsg('*', values), nil)
}
return &redisresults{s: result}
}
Expand All @@ -1183,7 +1199,7 @@ func TestJsonMGet(t *testing.T) {
}
for _, key := range keys {
if vKey, ok := v[key]; !ok || vKey.string() != key {
t.Fatalf("unexpected response %v", v)
t.Fatalf("unexpected response for key %s: %v", key, v)
}
}
})
Expand Down
Loading