Skip to content

Conversation

@ofekshenawa
Copy link
Collaborator

@ofekshenawa ofekshenawa commented Jun 30, 2025

This PR introduces support for Redis COMMAND-based request_policy and response_policy routing for Redis commands when used in OSS Cluster client.

Key Additions:

Command Policy Loader: Parses and caches COMMAND metadata with routing/aggregation tips on first use.
Routing Engine Enhancements:
Implements support for all request policies: default(keyless), default(hashslot), all_shards, all_nodes, multi_shard, and special.
Response Aggregator: Combines multi-shard replies based on response_policy:
all_succeeded, one_succeeded, agg_sum, special, etc.
Includes custom handling for special commands like FT.CURSOR.
Raw Command Support: Policies are enforced on Client.Do(ctx, args...).

@ofekshenawa ofekshenawa changed the title Load balance search commands to shards Implement Request and Response Policy Based Routing in Cluster Mode Jun 30, 2025
@ofekshenawa ofekshenawa marked this pull request as ready for review July 6, 2025 12:54
ofekshenawa and others added 8 commits July 16, 2025 23:06
* Add search module builders and tests

* Add tests
Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
}
if result.cmd != nil && result.err == nil {
// For MGET, extract individual values from the array result
if strings.ToLower(cmd.Name()) == "mget" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually need this special case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@htemelski-redis htemelski-redis force-pushed the load-balance-search-commands-to-shards branch from 6e3b627 to 1b2eaa6 Compare October 8, 2025 08:05
Copy link
Member

@ndyakov ndyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitting partial review for the aggregators.

Comment on lines +446 to +461
// For MGET without policy, use keyed aggregator
if cmdName == "mget" {
return routing.NewDefaultAggregator(true)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are passing the cmd.Name() in routing.NewResponseAggregator this can be handler by it. If policy is nil for mget, maybe the NewResponseAggregator can accept a policy and check the nil as well`.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@ndyakov ndyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitting another partial review.

Copy link
Member

@ndyakov ndyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final part of initial review

Overview:

  • Let's use atomics when possible.
  • Left questions related to the node selection and setting of values.

Overall the design of the solution looks good, would have to do an additional pass over the test files once this review is addressed.

Thank you both @ofekshenawa and @htemelski-redis!

Comment on lines 50 to 51
if c.hasKeys(cmd) {
// execute on key based shard
return node.Client.Process(ctx, cmd)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know that this node servers the slot for the key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the node should've been selected based on the slot osscluster.go:L1906

func (c *ClusterClient) cmdNode(

// execute on key based shard
return node.Client.Process(ctx, cmd)
}
return c.executeOnArbitraryShard(ctx, cmd)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it doesn't matter and there is already some node selected, why not use it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have two different ways of picking an arbitrary shard, either round robin or a random one

Copy link
Member

@ndyakov ndyakov Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I understand that, but for some reason there is already a node selected here that may have been selected because MOVED OR normal key based selection. Why do we have to reselect the node? Shouldn't this selection of arbitrary node be done outside, so we do the node selection only one time and the node on line #52 is the one that should be used for this command?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The re-selection is required so keyless commands respect the configured ShardPicker strategy.
cmdNode() picks nodes by hash slot, which is correct for keyed commands but ignores the picker for keyless ones. Calling executeOnArbitraryShard() ensures keyless commands use the user’s picker

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if the command is keyless, what will be the node here? Won't it be picked based on the ShardPolicy? Will it be just a random node? Can't we pick the correct node there?

@htemelski-redis htemelski-redis force-pushed the load-balance-search-commands-to-shards branch from 727a799 to 14bd6e1 Compare October 14, 2025 07:42
Copy link
Member

@ndyakov ndyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some comments related to aggregators

@htemelski-redis htemelski-redis self-requested a review October 28, 2025 09:32
Copy link
Member

@ndyakov ndyakov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The aggregators look good, there are some prints left in the code as bunch of unanswered questions. Let's resolve them before merging this. cc @ofekshenawa , @htemelski-redis

Comment on lines +376 to +380
// AggMaxAggregator returns the maximum numeric value from all shards.
type AggMaxAggregator struct {
err atomic.Value
res *util.AtomicMax
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question, are those min,max aggregators only for ints?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question, the initial implementation was for ints only, but looking at the docs, we should support any numerical types

Comment on lines +446 to +461
// For MGET without policy, use keyed aggregator
if cmdName == "mget" {
return routing.NewDefaultAggregator(true)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


defer func() {
if r := recover(); r != nil {
cmd.SetErr(fmt.Errorf("redis: failed to set command value: %v", r))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why don't we return the error as well? it will return nil, but the err will be set on the cmd.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure, how good of a practice is to modify the return value from within recover

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is possible exactly for scenarios like this one.

@htemelski-redis htemelski-redis force-pushed the load-balance-search-commands-to-shards branch from a4ac8df to 7181bcc Compare October 30, 2025 08:44
jit-ci[bot]

This comment was marked as resolved.

@htemelski-redis htemelski-redis force-pushed the load-balance-search-commands-to-shards branch from 07963c2 to cd74db0 Compare October 30, 2025 09:27
@htemelski-redis htemelski-redis changed the base branch from load-balance-search-commands-to-shards to master October 30, 2025 09:28
@ndyakov ndyakov requested review from Copilot and ndyakov November 9, 2025 11:49
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for explicit routing policies in Redis Cluster operations by introducing a command policy resolver system. The changes enable commands to be routed to specific shards based on their requirements (all shards, multiple shards, single shard) and aggregate responses accordingly.

  • Implements a comprehensive routing and aggregation framework for cluster commands
  • Adds Clone() methods to various command types to support concurrent execution
  • Introduces configurable shard picker strategies (round-robin, random, static)
  • Adds extensive test coverage for routing policies and command aggregation

Reviewed Changes

Copilot reviewed 30 out of 46 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
osscluster_router.go New file implementing command routing logic and response aggregation
osscluster.go Integration of command policy resolver and routing infrastructure
command_policy_resolver.go New resolver for determining command routing policies
internal/routing/*.go Core routing policy types and aggregation implementations
internal/util/atomic_*.go Atomic min/max utilities for response aggregation
*_commands.go Added cmdType fields and Clone() methods to command structs
osscluster_test.go Extensive test coverage for routing policies and aggregation

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

}

func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
func (c *ClusterClient) cmdSlot(cmd Cmder, prefferedSlot int) int {
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'preffered' to 'preferred'.

Copilot uses AI. Check for mistakes.
}

return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
return cmdSlot(cmd, cmdFirstKeyPos(cmd), prefferedSlot)
Copy link

Copilot AI Nov 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'preffered' to 'preferred'.

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants