From 2e7f23311abb018386a92a29f0253485607f763b Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Thu, 27 Nov 2025 14:10:15 +0200 Subject: [PATCH 1/2] feat(cluster): refactor sharded pub/sub --- lib/cluster/ClusterSubscriberGroup.ts | 475 ++++++++++++----------- lib/cluster/ShardedSubscriber.ts | 112 ++++++ lib/cluster/index.ts | 110 +++++- test/cluster/cluster_subscriber_group.ts | 2 +- test/functional/cluster/spub_ssub.ts | 3 +- test/scenario/sharded-pub-sub.test.ts | 105 +++-- 6 files changed, 523 insertions(+), 284 deletions(-) create mode 100644 lib/cluster/ShardedSubscriber.ts diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index be85d05f..476d6078 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -1,272 +1,289 @@ -import {Debug} from "../utils"; -import ClusterSubscriber from "./ClusterSubscriber"; -import Cluster from "./index"; -import ConnectionPool from "./ConnectionPool"; -import {getNodeKey} from "./util"; +import { Debug } from "../utils"; +import { getNodeKey } from "./util"; import * as calculateSlot from "cluster-key-slot"; +import * as EventEmitter from "events"; +import ShardedSubscriber from "./ShardedSubscriber"; const debug = Debug("cluster:subscriberGroup"); - /** - * Redis differs between "normal" and sharded PubSub. If using the "normal" PubSub feature, exactly one - * ClusterSubscriber exists per cluster instance. This works because the Redis cluster bus forwards m - * messages between shards. However, this has scalability limitations, which is the reason why the sharded - * PubSub feature was added to Redis. With sharded PubSub, each shard is responsible for its own messages. - * Given that, we need at least one ClusterSubscriber per master endpoint/node. + * Redis distinguishes between "normal" and sharded PubSub. When using the normal PubSub feature, + * exactly one subscriber exists per cluster instance because the Redis cluster bus forwards + * messages between shards. Sharded PubSub removes this limitation by making each shard + * responsible for its own messages. * - * This class leverages the previously exising ClusterSubscriber by adding support for multiple such subscribers - * in alignment to the master nodes of the cluster. The ClusterSubscriber class was extended in a non-breaking way - * to support this feature. + * This class coordinates one ShardedSubscriber per master node in the cluster, providing + * sharded PubSub support while keeping the public API backward compatible. */ export default class ClusterSubscriberGroup { - - private shardedSubscribers: Map = new Map(); - private clusterSlots: string[][] = []; - //Simple [min, max] slot ranges aren't enough because you can migrate single slots - private subscriberToSlotsIndex: Map = new Map(); - private channels: Map> = new Map(); - - /** - * Register callbacks - * - * @param cluster - */ - constructor(private cluster: Cluster, refreshSlotsCacheCallback: () => void) { - - cluster.on("+node", (redis) => { - this._addSubscriber(redis); - }); - - cluster.on("-node", (redis) => { - this._removeSubscriber(redis); - }); - - cluster.on("refresh", () => { - this._refreshSlots(cluster); - }); - - cluster.on("forceRefresh", () => { - refreshSlotsCacheCallback(); - }); + private shardedSubscribers: Map = new Map(); + private clusterSlots: string[][] = []; + // Simple [min, max] slot ranges aren't enough because you can migrate single slots + private subscriberToSlotsIndex: Map = new Map(); + private channels: Map> = new Map(); + + /** + * Register callbacks + * + * @param cluster + */ + constructor(private readonly subscriberGroupEmitter: EventEmitter) {} + + /** + * Get the responsible subscriber. + * + * @param slot + */ + getResponsibleSubscriber(slot: number): ShardedSubscriber | undefined { + const nodeKey = this.clusterSlots[slot][0]; + return this.shardedSubscribers.get(nodeKey); + } + + /** + * Adds a channel for which this subscriber group is responsible + * + * @param channels + */ + addChannels(channels: (string | Buffer)[]): number { + const slot = calculateSlot(channels[0]); + + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } } + const currChannels = this.channels.get(slot); - /** - * Get the responsible subscriber. - * - * Returns null if no subscriber was found - * - * @param slot - */ - getResponsibleSubscriber(slot: number) : ClusterSubscriber { - const nodeKey = this.clusterSlots[slot][0] - return this.shardedSubscribers.get(nodeKey); + if (!currChannels) { + this.channels.set(slot, channels); + } else { + this.channels.set(slot, currChannels.concat(channels)); } - /** - * Adds a channel for which this subscriber group is responsible - * - * @param channels - */ - addChannels(channels: (string | Buffer)[]): number { - const slot = calculateSlot(channels[0]); - - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) - return -1 - }); - - const currChannels = this.channels.get(slot); - - if (!currChannels) { - this.channels.set(slot, channels); - } else { - this.channels.set(slot, currChannels.concat(channels)) - } - - return [...this.channels.values()].flatMap(v => v).length; + return Array.from(this.channels.values()).reduce( + (sum, array) => sum + array.length, + 0 + ); + } + + /** + * Removes channels for which the subscriber group is responsible by optionally unsubscribing + * @param channels + */ + removeChannels(channels: (string | Buffer)[]): number { + const slot = calculateSlot(channels[0]); + + // Check if the all channels belong to the same slot and otherwise reject the operation + for (const c of channels) { + if (calculateSlot(c) !== slot) { + return -1; + } } - /** - * Removes channels for which the subscriber group is responsible by optionally unsubscribing - * @param channels - */ - removeChannels(channels: (string | Buffer)[]): number { - - const slot = calculateSlot(channels[0]); - - //Check if the all channels belong to the same slot and otherwise reject the operation - channels.forEach((c: string) => { - if (calculateSlot(c) != slot) - return -1; - }); + const slotChannels = this.channels.get(slot); - const slotChannels = this.channels.get(slot); - - if (slotChannels) { - const updatedChannels = slotChannels.filter(c => !channels.includes(c)); - this.channels.set(slot, updatedChannels); - } - - return [...this.channels.values()].flatMap(v => v).length; + if (slotChannels) { + const updatedChannels = slotChannels.filter((c) => !channels.includes(c)); + this.channels.set(slot, updatedChannels); } - /** - * Disconnect all subscribers - */ - stop() { - for (const s of this.shardedSubscribers.values()) { - s.stop(); - } + return Array.from(this.channels.values()).reduce( + (sum, array) => sum + array.length, + 0 + ); + } + + /** + * Disconnect all subscribers + */ + stop() { + for (const s of this.shardedSubscribers.values()) { + s.stop(); + } + } + + /** + * Start all not yet started subscribers + */ + start() { + const startPromises = []; + for (const s of this.shardedSubscribers.values()) { + if (!s.isStarted()) { + startPromises.push(s.start()); + } + } + return Promise.all(startPromises); + } + + /** + * Resets the subscriber group by disconnecting all subscribers that are no longer needed and connecting new ones. + */ + public async reset( + clusterSlots: string[][], + clusterNodes: any[] + ): Promise { + // Update the slots cache and continue if there was a change + if (!this._refreshSlots(clusterSlots)) { + return; } - /** - * Start all not yet started subscribers - */ - start() { - for (const s of this.shardedSubscribers.values()) { - if (!s.isStarted()) { - s.start(); - } - } + // For each of the sharded subscribers + for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { + if ( + // If the subscriber is still responsible for a slot range and is running then keep it + this.subscriberToSlotsIndex.has(nodeKey) && + shardedSubscriber.isStarted() + ) { + continue; + } + + // Otherwise stop the subscriber and remove it + shardedSubscriber.stop(); + this.shardedSubscribers.delete(nodeKey); + + this.subscriberGroupEmitter.emit("-subscriber"); } - /** - * Add a subscriber to the group of subscribers - * - * @param redis - */ - private _addSubscriber(redis: any): ClusterSubscriber { - const pool: ConnectionPool= new ConnectionPool(redis.options); - - if (pool.addMasterNode(redis)) { - const sub = new ClusterSubscriber(pool, this.cluster, true); - const nodeKey = getNodeKey(redis.options); - this.shardedSubscribers.set(nodeKey, sub); - sub.start(); - - // We need to attempt to resubscribe them in case the new node serves their slot - this._resubscribe(); - this.cluster.emit("+subscriber"); - return sub; - } + const startPromises = []; + // For each node in slots cache + for (const [nodeKey, _] of this.subscriberToSlotsIndex) { + // If we already have a subscriber for this node then keep it + if (this.shardedSubscribers.has(nodeKey)) { + continue; + } - return null; - } + // Otherwise create a new subscriber + const redis = clusterNodes.find((node) => { + return getNodeKey(node.options) === nodeKey; + }); - /** - * Removes a subscriber from the group - * @param redis - */ - private _removeSubscriber(redis: any): Map { + if (!redis) { + debug("Failed to find node for key %s", nodeKey); + continue; + } - const nodeKey = getNodeKey(redis.options); - const sub = this.shardedSubscribers.get(nodeKey); + const sub = new ShardedSubscriber( + this.subscriberGroupEmitter, + redis.options + ); - if (sub) { - sub.stop(); - this.shardedSubscribers.delete(nodeKey); + this.shardedSubscribers.set(nodeKey, sub); - // Even though the subscriber to this node is going down, we might have another subscriber - // handling the same slots, so we need to attempt to subscribe the orphaned channels - this._resubscribe(); - this.cluster.emit("-subscriber"); - } + startPromises.push(sub.start()); - return this.shardedSubscribers; + this.subscriberGroupEmitter.emit("+subscriber"); } + // It's vital to await the start promises before resubscribing + // Otherwise we might try to resubscribe to a subscriber that is not yet connected + // This can cause a race condition + try { + await Promise.all(startPromises); + } catch (err) { + debug("Error while starting subscribers: %s", err); + this.subscriberGroupEmitter.emit("error", err); + } - /** - * Refreshes the subscriber-related slot ranges - * - * Returns false if no refresh was needed - * - * @param cluster - */ - private _refreshSlots(cluster: Cluster) : boolean { - //If there was an actual change, then reassign the slot ranges - if (this._slotsAreEqual(cluster.slots)) { - debug("Nothing to refresh because the new cluster map is equal to the previous one.") - } else { - debug("Refreshing the slots of the subscriber group."); - - //Rebuild the slots index - this.subscriberToSlotsIndex = new Map(); - - for (let slot = 0; slot < cluster.slots.length; slot++) { - const node: string = cluster.slots[slot][0]; - - if (!this.subscriberToSlotsIndex.has(node)) { - this.subscriberToSlotsIndex.set(node, []); - } - this.subscriberToSlotsIndex.get(node).push(Number(slot)) - } + this._resubscribe(); + this.subscriberGroupEmitter.emit("subscribersReady"); + } + + /** + * Refreshes the subscriber-related slot ranges + * + * Returns false if no refresh was needed + * + * @param targetSlots + */ + private _refreshSlots(targetSlots: string[][]): boolean { + //If there was an actual change, then reassign the slot ranges + if (this._slotsAreEqual(targetSlots)) { + debug( + "Nothing to refresh because the new cluster map is equal to the previous one." + ); + + return false; + } - //Update the subscribers from the index - this._resubscribe() + debug("Refreshing the slots of the subscriber group."); - //Update the cached slots map - this.clusterSlots = JSON.parse(JSON.stringify(cluster.slots)); + //Rebuild the slots index + this.subscriberToSlotsIndex = new Map(); - this.cluster.emit("subscribersReady") - return true; - } + for (let slot = 0; slot < targetSlots.length; slot++) { + const node: string = targetSlots[slot][0]; - return false; + if (!this.subscriberToSlotsIndex.has(node)) { + this.subscriberToSlotsIndex.set(node, []); + } + this.subscriberToSlotsIndex.get(node).push(Number(slot)); } + //Update the cached slots map + this.clusterSlots = JSON.parse(JSON.stringify(targetSlots)); + + return true; + } + + /** + * Resubscribes to the previous channels + * + * @private + */ + private _resubscribe() { + if (this.shardedSubscribers) { + this.shardedSubscribers.forEach( + (s: ShardedSubscriber, nodeKey: string) => { + const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey); + if (subscriberSlots) { + //Resubscribe on the underlying connection + subscriberSlots.forEach((ss) => { + //Might return null if being disconnected + const redis = s.getInstance(); + const channels = this.channels.get(ss); + + if (channels && channels.length > 0) { + if (redis.status === "end") { + return; + } - /** - * Resubscribes to the previous channels - * - * @private - */ - private _resubscribe() { - if (this.shardedSubscribers) { - this.shardedSubscribers.forEach((s: ClusterSubscriber, nodeKey: string) => { - const subscriberSlots = this.subscriberToSlotsIndex.get(nodeKey); - if (subscriberSlots) { - //More for debugging purposes - s.associateSlotRange(subscriberSlots); - - //Resubscribe on the underlying connection - subscriberSlots.forEach((ss) => { - - //Might return null if being disconnected - const redis = s.getInstance(); - const channels = this.channels.get(ss); - - if (channels && channels.length > 0) { - //Try to subscribe now - if (redis) { - redis.ssubscribe(channels); - - //If the instance isn't ready yet, then register the re-subscription for later - redis.on("ready", () => { - redis.ssubscribe(channels); - }); - } - } + if (redis.status === "ready") { + redis.ssubscribe(...channels).catch((err) => { + // TODO: Should we emit an error event here? + debug("Failed to ssubscribe on node %s: %s", nodeKey, err); + }); + } else { + redis.once("ready", () => { + redis.ssubscribe(...channels).catch((err) => { + // TODO: Should we emit an error event here? + debug( + "Failed to ssubscribe on node %s: %s", + nodeKey, + err + ); }); + }); } + } }); + } } + ); } - - /** - * Deep equality of the cluster slots objects - * - * @param other - * @private - */ - private _slotsAreEqual(other: string[][]) { - if ( this.clusterSlots === undefined ) - return false; - else - return JSON.stringify(this.clusterSlots) === JSON.stringify(other) + } + + /** + * Deep equality of the cluster slots objects + * + * @param other + * @private + */ + private _slotsAreEqual(other: string[][]) { + if (this.clusterSlots === undefined) { + return false; + } else { + return JSON.stringify(this.clusterSlots) === JSON.stringify(other); } - - -} \ No newline at end of file + } +} diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts new file mode 100644 index 00000000..9e52dbe6 --- /dev/null +++ b/lib/cluster/ShardedSubscriber.ts @@ -0,0 +1,112 @@ +import EventEmitter = require("events"); +import { getConnectionName, getNodeKey, RedisOptions } from "./util"; +import { Debug } from "../utils"; +import Redis from "../Redis"; +const debug = Debug("cluster:subscriberGroup:shardedSubscriber"); + +export default class ShardedSubscriber { + private readonly nodeKey: string; + private started = false; + private instance: Redis | null = null; + + // Store listener references for cleanup + private readonly onEnd: () => void; + private readonly onError: (error: Error) => void; + private readonly onMoved: () => void; + private readonly messageListeners: Map void> = + new Map(); + + constructor(private readonly emitter: EventEmitter, options: RedisOptions) { + this.instance = new Redis({ + port: options.port, + host: options.host, + username: options.username, + password: options.password, + enableReadyCheck: false, + offlineQueue: true, + connectionName: getConnectionName("ssubscriber", options.connectionName), + lazyConnect: true, + tls: options.tls, + /** + * Disable auto reconnection for subscribers. + * The ClusterSubscriberGroup will handle the reconnection. + */ + retryStrategy: null, + }); + + this.nodeKey = getNodeKey(options); + + // Define listeners as instance methods so we can remove them later + this.onEnd = () => { + this.started = false; + this.emitter.emit("-node", this.instance, this.nodeKey); + }; + + this.onError = (error: Error) => { + this.emitter.emit("nodeError", error, this.nodeKey); + }; + + this.onMoved = () => { + this.emitter.emit("moved"); + }; + + // Register listeners + this.instance.once("end", this.onEnd); + this.instance.on("error", this.onError); + this.instance.on("moved", this.onMoved); + + for (const event of ["smessage", "smessageBuffer"]) { + const listener = (...args: any[]) => { + this.emitter.emit(event, ...args); + }; + this.messageListeners.set(event, listener); + this.instance.on(event, listener); + } + } + + async start(): Promise { + if (this.started) { + debug("already started %s", this.nodeKey); + return; + } + + try { + await this.instance.connect(); + debug("started %s", this.nodeKey); + this.started = true; + } catch (err) { + debug("failed to start %s: %s", this.nodeKey, err); + this.started = false; + throw err; // Re-throw so caller knows it failed + } + } + + stop(): void { + this.started = false; + + if (this.instance) { + // Remove all listeners before disconnecting + this.instance.off("end", this.onEnd); + this.instance.off("error", this.onError); + this.instance.off("moved", this.onMoved); + + for (const [event, listener] of this.messageListeners) { + this.instance.off(event, listener); + } + this.messageListeners.clear(); + + this.instance.disconnect(); + this.instance = null; + } + + debug("stopped %s", this.nodeKey); + } + + isStarted(): boolean { + return this.started; + } + + getInstance(): Redis | null { + return this.instance; + } +} diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index b598a161..7fa951ad 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -26,7 +26,7 @@ import ClusterSubscriber from "./ClusterSubscriber"; import ConnectionPool from "./ConnectionPool"; import DelayQueue from "./DelayQueue"; import { - getConnectionName, getNodeKey, + getConnectionName, getUniqueHostnamesFromOptions, groupSrvRecords, NodeKey, @@ -105,6 +105,7 @@ class Cluster extends Commander { private _autoPipelines: Map = new Map(); private _runningAutoPipelines: Set = new Set(); private _readyDelayedCallbacks: Callback[] = []; + private subscriberGroupEmitter: EventEmitter | null; /** * Every time Cluster#connect() is called, this value will be @@ -125,8 +126,9 @@ class Cluster extends Commander { this.startupNodes = startupNodes; this.options = defaults({}, options, DEFAULT_CLUSTER_OPTIONS, this.options); - if (this.options.shardedSubscribers == true) - this.shardedSubscribers = new ClusterSubscriberGroup(this, this.refreshSlotsCache.bind(this)); + if (this.options.shardedSubscribers) { + this.createShardedSubscriberGroup(); + } if ( this.options.redisOptions && @@ -222,6 +224,15 @@ class Cluster extends Commander { } this.connectionPool.reset(nodes); + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } + const readyHandler = () => { this.setStatus("ready"); this.retryAttempts = 0; @@ -276,7 +287,10 @@ class Cluster extends Commander { this.subscriber.start(); if (this.options.shardedSubscribers) { - this.shardedSubscribers.start(); + this.shardedSubscribers.start().catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); } }) .catch((err) => { @@ -567,25 +581,42 @@ class Cluster extends Commander { Command.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) || Command.checkFlag("EXIT_SUBSCRIBER_MODE", command.name) ) { - if (_this.options.shardedSubscribers == true && - (command.name == "ssubscribe" || command.name == "sunsubscribe")) { + if ( + _this.options.shardedSubscribers && + (command.name == "ssubscribe" || command.name == "sunsubscribe") + ) { + const sub = + _this.shardedSubscribers.getResponsibleSubscriber(targetSlot); + + if (!sub) { + command.reject( + new AbortError(`No sharded subscriber for slot: ${targetSlot}`) + ); + return; + } - const sub: ClusterSubscriber = _this.shardedSubscribers.getResponsibleSubscriber(targetSlot); let status = -1; - if (command.name == "ssubscribe") + if (command.name == "ssubscribe") { status = _this.shardedSubscribers.addChannels(command.getKeys()); + } - if ( command.name == "sunsubscribe") - status = _this.shardedSubscribers.removeChannels(command.getKeys()); + if (command.name == "sunsubscribe") { + status = _this.shardedSubscribers.removeChannels( + command.getKeys() + ); + } if (status !== -1) { redis = sub.getInstance(); } else { - command.reject(new AbortError("Can't add or remove the given channels. Are they in the same slot?")); + command.reject( + new AbortError( + "Possible CROSSSLOT error: All channels must hash to the same slot" + ) + ); } - } - else { + } else { redis = _this.subscriber.getInstance(); } @@ -804,6 +835,10 @@ class Cluster extends Commander { }); }, retryDelay); } else { + if (this.options.shardedSubscribers) { + this.subscriberGroupEmitter?.removeAllListeners(); + } + this.setStatus("end"); this.flushQueue(new Error("None of startup nodes is available")); } @@ -952,6 +987,15 @@ class Cluster extends Commander { } this.connectionPool.reset(nodes); + + if (this.options.shardedSubscribers) { + this.shardedSubscribers + .reset(this.slots, this.connectionPool.getNodes("all")) + .catch((err) => { + // TODO should we emit an error event here? + debug("Error while starting subscribers: %s", err); + }); + } callback(); }, this.options.slotsRefreshTimeout) ); @@ -1105,6 +1149,46 @@ class Cluster extends Commander { ...options, }); } + + private createShardedSubscriberGroup() { + this.subscriberGroupEmitter = new EventEmitter(); + + this.shardedSubscribers = new ClusterSubscriberGroup( + this.subscriberGroupEmitter + ); + + this.subscriberGroupEmitter.on("-node", (redis, nodeKey) => { + this.emit("-node", redis, nodeKey); + + this.refreshSlotsCache(); + }); + + this.subscriberGroupEmitter.on("moved", () => { + this.refreshSlotsCache(); + }); + + this.subscriberGroupEmitter.on("-subscriber", () => { + this.emit("-subscriber"); + }); + + this.subscriberGroupEmitter.on("+subscriber", () => { + this.emit("+subscriber"); + }); + + this.subscriberGroupEmitter.on("nodeError", (error, nodeKey) => { + this.emit("nodeError", error, nodeKey); + }); + + this.subscriberGroupEmitter.on("subscribersReady", () => { + this.emit("subscribersReady"); + }); + + for (const event of ["smessage", "smessageBuffer"]) { + this.subscriberGroupEmitter.on(event, (arg1, arg2, arg3) => { + this.emit(event, arg1, arg2, arg3); + }); + } + } } interface Cluster extends EventEmitter {} diff --git a/test/cluster/cluster_subscriber_group.ts b/test/cluster/cluster_subscriber_group.ts index e1d822ef..9a87ba48 100644 --- a/test/cluster/cluster_subscriber_group.ts +++ b/test/cluster/cluster_subscriber_group.ts @@ -79,7 +79,7 @@ describe("cluster:ClusterSubscriberGroup", () => { //Should not be called expect(true).to.equal(false); }).catch( (err) => { - expect(err.toString().conaints("CROSSSLOT Keys in request don't hash to the same slot")).to.be.true; + expect(err.toString().conaints("CROSSSLOT")).to.be.true; }); //Subscribe to the channels on the same slot diff --git a/test/functional/cluster/spub_ssub.ts b/test/functional/cluster/spub_ssub.ts index 92fb8d6f..e5fcd7ba 100644 --- a/test/functional/cluster/spub_ssub.ts +++ b/test/functional/cluster/spub_ssub.ts @@ -81,7 +81,8 @@ describe("cluster:spub/ssub", function () { }); }); - it("should re-ssubscribe after reconnection", (done) => { + // This is no longer true, since we do NOT reconnect but recreate the subscriber + it.skip("should re-ssubscribe after reconnection", (done) => { new MockServer(30001, function (argv) { if (argv[0] === "cluster" && argv[1] === "SLOTS") { return [[0, 16383, ["127.0.0.1", 30001]]]; diff --git a/test/scenario/sharded-pub-sub.test.ts b/test/scenario/sharded-pub-sub.test.ts index b9db49aa..44dd6726 100644 --- a/test/scenario/sharded-pub-sub.test.ts +++ b/test/scenario/sharded-pub-sub.test.ts @@ -24,29 +24,42 @@ describe("Sharded Pub/Sub E2E", () => { }); describe("Single Subscriber", () => { - let subscriber: Cluster; - let publisher: Cluster; - let messageTracker: MessageTracker; + let cleanup: (() => Promise) | null = null; - beforeEach(async () => { - messageTracker = new MessageTracker(CHANNELS); - subscriber = createClusterTestClient(config.clientConfig, { + const setup = async (subscriberOverrides = {}, publisherOverrides = {}) => { + const messageTracker = new MessageTracker(CHANNELS); + const subscriber = createClusterTestClient(config.clientConfig, { shardedSubscribers: true, + ...subscriberOverrides, }); - publisher = createClusterTestClient(config.clientConfig, { - shardedSubscribers: true, - }); - await Promise.all([ - waitClientReady(subscriber), - waitClientReady(publisher), - ]); - }); + const publisher = createClusterTestClient( + config.clientConfig, + publisherOverrides + ); + + // Return cleanup function along with the resources + cleanup = async () => { + await Promise.all([subscriber.quit(), publisher.quit()]); + }; + + return { subscriber, publisher, messageTracker }; + }; afterEach(async () => { - await Promise.all([subscriber.quit(), publisher.quit()]); + if (cleanup) { + try { + await cleanup(); + } catch { + // Ignore errors during cleanup + } finally { + cleanup = null; + } + } }); it("should receive messages published to multiple channels", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -76,6 +89,8 @@ describe("Sharded Pub/Sub E2E", () => { }); it("should resume publishing and receiving after failover", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -110,15 +125,21 @@ describe("Sharded Pub/Sub E2E", () => { publishAbort.abort(); await publishResult; - for (const channel of CHANNELS) { - const sent = messageTracker.getChannelStats(channel)!.sent; - const received = messageTracker.getChannelStats(channel)!.received; + const totalReceived = CHANNELS.reduce((acc, channel) => { + return acc + messageTracker.getChannelStats(channel)!.received; + }, 0); + const totalSent = CHANNELS.reduce((acc, channel) => { + return acc + messageTracker.getChannelStats(channel)!.sent; + }, 0); - assert.ok( - received <= sent, - `Channel ${channel}: received (${received}) should be <= sent (${sent})` - ); - } + assert.ok( + totalReceived <= totalSent, + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); + assert.ok( + totalReceived > 0, + `Total received (${totalReceived}) should be > 0` + ); // Wait for 2 seconds before resuming publishing await wait(2_000); @@ -156,6 +177,8 @@ describe("Sharded Pub/Sub E2E", () => { }); it("should NOT receive messages after sunsubscribe", async () => { + const { subscriber, publisher, messageTracker } = await setup(); + for (const channel of CHANNELS) { await subscriber.ssubscribe(channel); } @@ -250,9 +273,7 @@ describe("Sharded Pub/Sub E2E", () => { subscriber2 = createClusterTestClient(config.clientConfig, { shardedSubscribers: true, }); - publisher = createClusterTestClient(config.clientConfig, { - shardedSubscribers: true, - }); + publisher = createClusterTestClient(config.clientConfig); await Promise.all([ waitClientReady(subscriber1), waitClientReady(subscriber2), @@ -344,21 +365,25 @@ describe("Sharded Pub/Sub E2E", () => { publishAbort.abort(); await publishResult; - for (const channel of CHANNELS) { - const sent = messageTracker1.getChannelStats(channel)!.sent; - const received1 = messageTracker1.getChannelStats(channel)!.received; - - const received2 = messageTracker2.getChannelStats(channel)!.received; - - assert.ok( - received1 <= sent, - `Channel ${channel}: received (${received1}) should be <= sent (${sent})` + const totalReceived = CHANNELS.reduce((acc, channel) => { + return ( + acc + + messageTracker1.getChannelStats(channel)!.received + + messageTracker2.getChannelStats(channel)!.received ); - assert.ok( - received2 <= sent, - `Channel ${channel}: received2 (${received2}) should be <= sent (${sent})` - ); - } + }, 0); + const totalSent = CHANNELS.reduce((acc, channel) => { + return acc + messageTracker1.getChannelStats(channel)!.sent; + }, 0); + + assert.ok( + totalReceived <= (totalSent * 2), + `Total received (${totalReceived}) should be <= total sent (${totalSent})` + ); + assert.ok( + totalReceived > 0, + `Total received (${totalReceived}) should be > 0` + ); // Wait for 2 seconds before resuming publishing await wait(2_000); From ed1c8ee2db2519e75a52caacdfdfed8a2b48c822 Mon Sep 17 00:00:00 2001 From: Pavel Pashov Date: Mon, 1 Dec 2025 17:48:41 +0200 Subject: [PATCH 2/2] fix: add exponential backoff for sharded subscriber reconnection --- lib/cluster/ClusterSubscriberGroup.ts | 213 ++++++++++++++++++++------ lib/cluster/ShardedSubscriber.ts | 46 +++--- lib/cluster/index.ts | 11 ++ 3 files changed, 193 insertions(+), 77 deletions(-) diff --git a/lib/cluster/ClusterSubscriberGroup.ts b/lib/cluster/ClusterSubscriberGroup.ts index 476d6078..8f034c25 100644 --- a/lib/cluster/ClusterSubscriberGroup.ts +++ b/lib/cluster/ClusterSubscriberGroup.ts @@ -20,6 +20,16 @@ export default class ClusterSubscriberGroup { // Simple [min, max] slot ranges aren't enough because you can migrate single slots private subscriberToSlotsIndex: Map = new Map(); private channels: Map> = new Map(); + private failedAttemptsByNode: Map = new Map(); + + // Only latest pending reset kept; throttled by refreshSlotsCache's isRefreshing + backoff delay + private isResetting = false; + private pendingReset: { slots: string[][]; nodes: any[] } | null = null; + + // Retry strategy + private static readonly MAX_RETRY_ATTEMPTS = 10; + private static readonly MAX_BACKOFF_MS = 2000; + private static readonly BASE_BACKOFF_MS = 100; /** * Register callbacks @@ -110,7 +120,16 @@ export default class ClusterSubscriberGroup { const startPromises = []; for (const s of this.shardedSubscribers.values()) { if (!s.isStarted()) { - startPromises.push(s.start()); + startPromises.push( + s + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(s.getNodeKey()); + }) + .catch((err) => { + this.handleSubscriberConnectFailed(err, s.getNodeKey()); + }) + ); } } return Promise.all(startPromises); @@ -123,70 +142,99 @@ export default class ClusterSubscriberGroup { clusterSlots: string[][], clusterNodes: any[] ): Promise { - // Update the slots cache and continue if there was a change - if (!this._refreshSlots(clusterSlots)) { + if (this.isResetting) { + this.pendingReset = { slots: clusterSlots, nodes: clusterNodes }; return; } - // For each of the sharded subscribers - for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { - if ( - // If the subscriber is still responsible for a slot range and is running then keep it - this.subscriberToSlotsIndex.has(nodeKey) && - shardedSubscriber.isStarted() - ) { - continue; - } - - // Otherwise stop the subscriber and remove it - shardedSubscriber.stop(); - this.shardedSubscribers.delete(nodeKey); - - this.subscriberGroupEmitter.emit("-subscriber"); - } + this.isResetting = true; - const startPromises = []; - // For each node in slots cache - for (const [nodeKey, _] of this.subscriberToSlotsIndex) { - // If we already have a subscriber for this node then keep it - if (this.shardedSubscribers.has(nodeKey)) { - continue; + try { + const hasTopologyChanged = this._refreshSlots(clusterSlots); + const hasFailedSubscribers = this.hasUnhealthySubscribers(); + + if (!hasTopologyChanged && !hasFailedSubscribers) { + debug( + "No topology change detected or failed subscribers. Skipping reset." + ); + return; } - // Otherwise create a new subscriber - const redis = clusterNodes.find((node) => { - return getNodeKey(node.options) === nodeKey; - }); + // For each of the sharded subscribers + for (const [nodeKey, shardedSubscriber] of this.shardedSubscribers) { + if ( + // If the subscriber is still responsible for a slot range and is running then keep it + this.subscriberToSlotsIndex.has(nodeKey) && + shardedSubscriber.isStarted() + ) { + debug("Skipping deleting subscriber for %s", nodeKey); + continue; + } + + debug("Removing subscriber for %s", nodeKey); + // Otherwise stop the subscriber and remove it + shardedSubscriber.stop(); + this.shardedSubscribers.delete(nodeKey); - if (!redis) { - debug("Failed to find node for key %s", nodeKey); - continue; + this.subscriberGroupEmitter.emit("-subscriber"); } - const sub = new ShardedSubscriber( - this.subscriberGroupEmitter, - redis.options - ); + const startPromises = []; + // For each node in slots cache + for (const [nodeKey, _] of this.subscriberToSlotsIndex) { + // If we already have a subscriber for this node then keep it + if (this.shardedSubscribers.has(nodeKey)) { + debug("Skipping creating new subscriber for %s", nodeKey); + continue; + } - this.shardedSubscribers.set(nodeKey, sub); + debug("Creating new subscriber for %s", nodeKey); + // Otherwise create a new subscriber + const redis = clusterNodes.find((node) => { + return getNodeKey(node.options) === nodeKey; + }); - startPromises.push(sub.start()); + if (!redis) { + debug("Failed to find node for key %s", nodeKey); + continue; + } - this.subscriberGroupEmitter.emit("+subscriber"); - } + const sub = new ShardedSubscriber( + this.subscriberGroupEmitter, + redis.options + ); + + this.shardedSubscribers.set(nodeKey, sub); + + startPromises.push( + sub + .start() + .then(() => { + this.handleSubscriberConnectSucceeded(nodeKey); + }) + .catch((error) => { + this.handleSubscriberConnectFailed(error, nodeKey); + }) + ); + + this.subscriberGroupEmitter.emit("+subscriber"); + } - // It's vital to await the start promises before resubscribing - // Otherwise we might try to resubscribe to a subscriber that is not yet connected - // This can cause a race condition - try { + // It's vital to await the start promises before resubscribing + // Otherwise we might try to resubscribe to a subscriber that is not yet connected + // This can cause a race condition await Promise.all(startPromises); - } catch (err) { - debug("Error while starting subscribers: %s", err); - this.subscriberGroupEmitter.emit("error", err); - } - this._resubscribe(); - this.subscriberGroupEmitter.emit("subscribersReady"); + this._resubscribe(); + this.subscriberGroupEmitter.emit("subscribersReady"); + } finally { + this.isResetting = false; + if (this.pendingReset) { + const { slots, nodes } = this.pendingReset; + this.pendingReset = null; + await this.reset(slots, nodes); + } + } } /** @@ -286,4 +334,69 @@ export default class ClusterSubscriberGroup { return JSON.stringify(this.clusterSlots) === JSON.stringify(other); } } + + /** + * Checks if any subscribers are in an unhealthy state. + * + * A subscriber is considered unhealthy if: + * - It exists but is not started (failed/disconnected) + * - It's missing entirely for a node that should have one + * + * @returns true if any subscribers need to be recreated + */ + private hasUnhealthySubscribers(): boolean { + const hasFailedSubscribers = Array.from( + this.shardedSubscribers.values() + ).some((sub) => !sub.isStarted()); + + const hasMissingSubscribers = Array.from( + this.subscriberToSlotsIndex.keys() + ).some((nodeKey) => !this.shardedSubscribers.has(nodeKey)); + + return hasFailedSubscribers || hasMissingSubscribers; + } + + /** + * Handles failed subscriber connections by emitting an event to refresh the slots cache + * after a backoff period. + * + * @param error + * @param nodeKey + */ + private handleSubscriberConnectFailed = (error: Error, nodeKey: string) => { + const currentAttempts = this.failedAttemptsByNode.get(nodeKey) || 0; + const failedAttempts = currentAttempts + 1; + this.failedAttemptsByNode.set(nodeKey, failedAttempts); + + const attempts = Math.min( + failedAttempts, + ClusterSubscriberGroup.MAX_RETRY_ATTEMPTS + ); + const backoff = Math.min( + ClusterSubscriberGroup.BASE_BACKOFF_MS * 2 ** attempts, + ClusterSubscriberGroup.MAX_BACKOFF_MS + ); + const jitter = Math.floor((Math.random() - 0.5) * (backoff * 0.5)); + const delay = Math.max(0, backoff + jitter); + + debug( + "Failed to connect subscriber for %s. Refreshing slots in %dms", + nodeKey, + delay + ); + + this.subscriberGroupEmitter.emit("subscriberConnectFailed", { + delay, + error, + }); + }; + + /** + * Handles successful subscriber connections by resetting the failed attempts counter. + * + * @param nodeKey + */ + private handleSubscriberConnectSucceeded = (nodeKey: string) => { + this.failedAttemptsByNode.delete(nodeKey); + }; } diff --git a/lib/cluster/ShardedSubscriber.ts b/lib/cluster/ShardedSubscriber.ts index 9e52dbe6..36b07d10 100644 --- a/lib/cluster/ShardedSubscriber.ts +++ b/lib/cluster/ShardedSubscriber.ts @@ -10,9 +10,6 @@ export default class ShardedSubscriber { private instance: Redis | null = null; // Store listener references for cleanup - private readonly onEnd: () => void; - private readonly onError: (error: Error) => void; - private readonly onMoved: () => void; private readonly messageListeners: Map void> = new Map(); @@ -36,20 +33,6 @@ export default class ShardedSubscriber { this.nodeKey = getNodeKey(options); - // Define listeners as instance methods so we can remove them later - this.onEnd = () => { - this.started = false; - this.emitter.emit("-node", this.instance, this.nodeKey); - }; - - this.onError = (error: Error) => { - this.emitter.emit("nodeError", error, this.nodeKey); - }; - - this.onMoved = () => { - this.emitter.emit("moved"); - }; - // Register listeners this.instance.once("end", this.onEnd); this.instance.on("error", this.onError); @@ -64,6 +47,19 @@ export default class ShardedSubscriber { } } + private onEnd = () => { + this.started = false; + this.emitter.emit("-node", this.instance, this.nodeKey); + }; + + private onError = (error: Error) => { + this.emitter.emit("nodeError", error, this.nodeKey); + }; + + private onMoved = () => { + this.emitter.emit("moved"); + }; + async start(): Promise { if (this.started) { debug("already started %s", this.nodeKey); @@ -85,17 +81,9 @@ export default class ShardedSubscriber { this.started = false; if (this.instance) { - // Remove all listeners before disconnecting - this.instance.off("end", this.onEnd); - this.instance.off("error", this.onError); - this.instance.off("moved", this.onMoved); - - for (const [event, listener] of this.messageListeners) { - this.instance.off(event, listener); - } - this.messageListeners.clear(); - this.instance.disconnect(); + this.instance.removeAllListeners(); + this.messageListeners.clear(); this.instance = null; } @@ -109,4 +97,8 @@ export default class ShardedSubscriber { getInstance(): Redis | null { return this.instance; } + + getNodeKey(): string { + return this.nodeKey; + } } diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 7fa951ad..1902c72b 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -1163,6 +1163,17 @@ class Cluster extends Commander { this.refreshSlotsCache(); }); + this.subscriberGroupEmitter.on( + "subscriberConnectFailed", + ({ delay, error }) => { + this.emit("error", error); + + setTimeout(() => { + this.refreshSlotsCache(); + }, delay); + } + ); + this.subscriberGroupEmitter.on("moved", () => { this.refreshSlotsCache(); });