diff --git a/Sources/Valkey/Commands/ClusterCommands.swift b/Sources/Valkey/Commands/ClusterCommands.swift index f380250d..b163a3c9 100644 --- a/Sources/Valkey/Commands/ClusterCommands.swift +++ b/Sources/Valkey/Commands/ClusterCommands.swift @@ -365,8 +365,6 @@ public enum CLUSTER { /// Returns a list of all TCP links to and from peer nodes. @_documentation(visibility: internal) public struct LINKS: ValkeyCommand { - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER LINKS" } @inlinable public init() { @@ -789,8 +787,6 @@ public enum CLUSTER { } } } - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER SLOT-STATS" } public var filter: Filter @@ -807,8 +803,6 @@ public enum CLUSTER { /// Returns the mapping of cluster slots to nodes. @_documentation(visibility: internal) public struct SLOTS: ValkeyCommand { - public typealias Response = RESPToken.Array - @inlinable public static var name: String { "CLUSTER SLOTS" } @inlinable public init() { @@ -1068,7 +1062,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: An array of cluster links and their attributes. @inlinable @discardableResult - public func clusterLinks() async throws -> RESPToken.Array { + public func clusterLinks() async throws -> CLUSTER.LINKS.Response { try await execute(CLUSTER.LINKS()) } @@ -1229,7 +1223,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: Array of nested arrays, where the inner array element represents a slot and its respective usage statistics. @inlinable @discardableResult - public func clusterSlotStats(filter: CLUSTER.SLOTSTATS.Filter) async throws -> RESPToken.Array { + public func clusterSlotStats(filter: CLUSTER.SLOTSTATS.Filter) async throws -> CLUSTER.SLOTSTATS.Response { try await execute(CLUSTER.SLOTSTATS(filter: filter)) } @@ -1244,7 +1238,7 @@ extension ValkeyClientProtocol { /// - Response: [Array]: Nested list of slot ranges with networking information. @inlinable @discardableResult - public func clusterSlots() async throws -> RESPToken.Array { + public func clusterSlots() async throws -> CLUSTER.SLOTS.Response { try await execute(CLUSTER.SLOTS()) } diff --git a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift index ee79bafb..71f8ac6e 100644 --- a/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift +++ b/Sources/Valkey/Commands/Custom/ClusterCustomCommands.swift @@ -19,10 +19,22 @@ extension CLUSTER.MYSHARDID { public typealias Response = String } +extension CLUSTER.LINKS { + public typealias Response = [ValkeyClusterLink] +} + extension CLUSTER.SHARDS { public typealias Response = ValkeyClusterDescription } +extension CLUSTER.SLOTSTATS { + public typealias Response = [ValkeyClusterSlotStats] +} + +extension CLUSTER.SLOTS { + public typealias Response = [ValkeyClusterSlotRange] +} + package struct ValkeyClusterParseError: Error, Equatable { package enum Reason: Error { case clusterDescriptionTokenIsNotAnArray @@ -35,6 +47,19 @@ package struct ValkeyClusterParseError: Error, Equatable { case missingRequiredValueForNode case shardIsMissingHashSlots case shardIsMissingNode + case clusterLinksTokenIsNotAnArray + case clusterLinkTokenIsNotAnArrayOrMap + case missingRequiredValueForLink + case invalidLinkDirection + case clusterSlotStatsTokenIsNotAnArray + case clusterSlotStatsTokenIsNotAnArrayOrMap + case missingRequiredValueForSlotStats + case clusterSlotsTokenIsNotAnArray + case clusterSlotRangeTokenIsNotAnArray + case clusterSlotNodeTokenIsNotAnArray + case missingRequiredValueForSlotRange + case missingRequiredValueForSlotNode + case clusterSlotNodeMetadataIsNotAnArrayOrMap } package var reason: Reason @@ -206,6 +231,198 @@ public struct ValkeyClusterDescription: Hashable, Sendable, RESPTokenDecodable { } } +/// A cluster link between nodes in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterLinks()``. +public struct ValkeyClusterLink: Hashable, Sendable, RESPTokenDecodable { + /// Direction of the cluster link. + public struct Direction: Sendable, Hashable, RawRepresentable { + /// The link is established by the local node to the peer. + public static let to = Direction(base: .to) + /// The link is accepted by the local node from the peer. + public static let from = Direction(base: .from) + + public init?(rawValue: String) { + guard let base = Base(rawValue: rawValue) else { + return nil + } + self.base = base + } + + public var rawValue: String { + self.base.rawValue + } + + enum Base: String { + case to + case from + } + + private(set) var base: Base + + init(base: Base) { + self.base = base + } + } + + /// The direction of the link (to or from) + public var direction: Direction? + /// The node ID of the peer + public var node: String? + /// Creation time of the link + public var createTime: Int? + /// Events currently registered for the link (e.g., "r", "w", "rw") + public var events: String? + /// Allocated size of the link's send buffer + public var sendBufferAllocated: Int? + /// Size of the portion of the link's send buffer currently holding data + public var sendBufferUsed: Int? + + /// Creates a new cluster link + /// - Parameters: + /// - direction: The direction of the link + /// - node: The node ID of the peer + /// - createTime: Creation time of the link + /// - events: Events registered for the link + /// - sendBufferAllocated: Allocated send buffer size + /// - sendBufferUsed: Used send buffer size + public init( + direction: Direction? = nil, + node: String? = nil, + createTime: Int? = nil, + events: String? = nil, + sendBufferAllocated: Int? = nil, + sendBufferUsed: Int? = nil + ) { + self.direction = direction + self.node = node + self.createTime = createTime + self.events = events + self.sendBufferAllocated = sendBufferAllocated + self.sendBufferUsed = sendBufferUsed + } + + /// Creates a cluster link from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterLink(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// Slot usage statistics for a hash slot in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlotStats(filter:)``. +public struct ValkeyClusterSlotStats: Hashable, Sendable, RESPTokenDecodable { + /// The hash slot number + public var slot: Int + /// Number of keys in the slot + public var keyCount: Int? + /// CPU time consumed by the slot in microseconds + public var cpuUsec: Int? + /// Network bytes read for the slot + public var networkBytesIn: Int? + /// Network bytes written for the slot + public var networkBytesOut: Int? + + /// Creates a new cluster slot stats + /// - Parameters: + /// - slot: The hash slot number + /// - keyCount: Number of keys in the slot + /// - cpuUsec: CPU time consumed in microseconds + /// - networkBytesIn: Network bytes read + /// - networkBytesOut: Network bytes written + public init( + slot: Int, + keyCount: Int? = nil, + cpuUsec: Int? = nil, + networkBytesIn: Int? = nil, + networkBytesOut: Int? = nil + ) { + self.slot = slot + self.keyCount = keyCount + self.cpuUsec = cpuUsec + self.networkBytesIn = networkBytesIn + self.networkBytesOut = networkBytesOut + } + + /// Creates a cluster slot stats from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotStats(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + +/// A slot range mapping in a Valkey cluster. +/// +/// A description is returned when you call ``ValkeyClientProtocol/clusterSlots()``. +public struct ValkeyClusterSlotRange: Hashable, Sendable, RESPTokenDecodable { + /// A node serving a slot range in a Valkey cluster. + public struct Node: Hashable, Sendable { + /// The IP address of the node + public var ip: String + /// The port of the node + public var port: Int + /// The node ID + public var nodeId: String + /// Additional networking metadata + public var metadata: [String: String] + + /// Creates a new cluster slot node + /// - Parameters: + /// - ip: The IP address + /// - port: The port + /// - nodeId: The node ID + /// - metadata: Additional networking metadata + public init( + ip: String, + port: Int, + nodeId: String, + metadata: [String: String] = [:] + ) { + self.ip = ip + self.port = port + self.nodeId = nodeId + self.metadata = metadata + } + } + + /// The start slot of the range + public var startSlot: Int + /// The end slot of the range + public var endSlot: Int + /// The nodes serving this slot range + public var nodes: [Node] + + /// Creates a new cluster slot range + /// - Parameters: + /// - startSlot: The start slot + /// - endSlot: The end slot + /// - nodes: The nodes serving this range + public init(startSlot: Int, endSlot: Int, nodes: [Node]) { + self.startSlot = startSlot + self.endSlot = endSlot + self.nodes = nodes + } + + /// Creates a cluster slot range from the response token you provide. + /// - Parameter respToken: The response token. + public init(fromRESP respToken: RESPToken) throws { + do { + self = try Self.makeClusterSlotRange(respToken: respToken) + } catch { + throw ValkeyClusterParseError(reason: error, token: respToken) + } + } +} + extension ValkeyClusterDescription { fileprivate static func makeClusterDescription(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterDescription { guard case .array(let shardsToken) = respToken.value else { @@ -409,6 +626,297 @@ extension ValkeyClusterDescription.Node { } } +extension ValkeyClusterLink { + fileprivate static func makeClusterLink(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterLink { + switch respToken.value { + case .array(let array): + return try Self.makeFromTokenSequence(MapStyleArray(underlying: array)) + + case .map(let map): + let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in + if let key = try? String(fromRESP: keyNode) { + return (key, value) + } else { + return nil + } + } + return try Self.makeFromTokenSequence(mapped) + + default: + throw .clusterLinkTokenIsNotAnArrayOrMap + } + } + + fileprivate static func makeFromTokenSequence( + _ sequence: TokenSequence + ) throws(ValkeyClusterParseError.Reason) -> Self where TokenSequence.Element == (String, RESPToken) { + var direction: ValkeyClusterLink.Direction? + var node: String? + var createTime: Int64? + var events: String? + var sendBufferAllocated: Int64? + var sendBufferUsed: Int64? + + for (key, value) in sequence { + switch key { + case "direction": + guard let directionString = try? String(fromRESP: value), + let directionValue = ValkeyClusterLink.Direction(rawValue: directionString) + else { + throw .invalidLinkDirection + } + direction = directionValue + + case "node": + node = try? String(fromRESP: value) + + case "create-time": + createTime = try? Int64(fromRESP: value) + + case "events": + events = try? String(fromRESP: value) + + case "send-buffer-allocated": + sendBufferAllocated = try? Int64(fromRESP: value) + + case "send-buffer-used": + sendBufferUsed = try? Int64(fromRESP: value) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + return ValkeyClusterLink( + direction: direction, + node: node, + createTime: createTime.map { Int($0) }, + events: events, + sendBufferAllocated: sendBufferAllocated.map { Int($0) }, + sendBufferUsed: sendBufferUsed.map { Int($0) } + ) + } +} + +extension ValkeyClusterSlotStats { + fileprivate static func makeClusterSlotStats(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotStats { + guard case .array(let array) = respToken.value else { + throw .clusterSlotStatsTokenIsNotAnArray + } + + guard array.count >= 2 else { + throw .missingRequiredValueForSlotStats + } + + var iterator = array.makeIterator() + + // First element: slot number + guard let slotToken = iterator.next(), + case .number(let slotNumber) = slotToken.value + else { + throw .missingRequiredValueForSlotStats + } + + // Second element: statistics map + guard let statsToken = iterator.next() else { + throw .missingRequiredValueForSlotStats + } + + return try Self.makeFromStatsToken(slot: Int(slotNumber), statsToken: statsToken) + } + + fileprivate static func makeFromStatsToken(slot: Int, statsToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> Self { + var keyCount: Int64? + var cpuUsec: Int64? + var networkBytesIn: Int64? + var networkBytesOut: Int64? + + switch statsToken.value { + case .map(let map): + // For RESP3, handle RESPToken stats as map + let mapped = map.lazy.compactMap { (keyNode, value) -> (String, RESPToken)? in + if let key = try? String(fromRESP: keyNode) { + return (key, value) + } else { + return nil + } + } + for (key, value) in mapped { + switch key { + case "key-count": + keyCount = try? Int64(fromRESP: value) + + case "cpu-usec": + cpuUsec = try? Int64(fromRESP: value) + + case "network-bytes-in": + networkBytesIn = try? Int64(fromRESP: value) + + case "network-bytes-out": + networkBytesOut = try? Int64(fromRESP: value) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + case .array(let array): + // // For RESP2, handle RESPToken stats as key-value pairs in array format + let mapArray = MapStyleArray(underlying: array) + for (key, valueToken) in mapArray { + switch key { + case "key-count": + keyCount = try? Int64(fromRESP: valueToken) + + case "cpu-usec": + cpuUsec = try? Int64(fromRESP: valueToken) + + case "network-bytes-in": + networkBytesIn = try? Int64(fromRESP: valueToken) + + case "network-bytes-out": + networkBytesOut = try? Int64(fromRESP: valueToken) + + default: + // ignore unexpected keys for forward compatibility + continue + } + } + + default: + throw .clusterSlotStatsTokenIsNotAnArrayOrMap + } + + return ValkeyClusterSlotStats( + slot: slot, + keyCount: keyCount.map { Int($0) }, + cpuUsec: cpuUsec.map { Int($0) }, + networkBytesIn: networkBytesIn.map { Int($0) }, + networkBytesOut: networkBytesOut.map { Int($0) } + ) + } +} + +extension ValkeyClusterSlotRange { + fileprivate static func makeClusterSlotRange(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotRange { + guard case .array(let array) = respToken.value else { + throw .clusterSlotRangeTokenIsNotAnArray + } + + guard array.count >= 3 else { + throw .missingRequiredValueForSlotRange + } + + var iterator = array.makeIterator() + + // First element: start slot + guard let startSlotToken = iterator.next(), + case .number(let startSlotNumber) = startSlotToken.value + else { + throw .missingRequiredValueForSlotRange + } + + // Second element: end slot + guard let endSlotToken = iterator.next(), + case .number(let endSlotNumber) = endSlotToken.value + else { + throw .missingRequiredValueForSlotRange + } + + let startSlot = Int(startSlotNumber) + let endSlot = Int(endSlotNumber) + + // Remaining elements are nodes + var nodes: [Node] = [] + while let nodeToken = iterator.next() { + let node = try Node.makeSlotNode(respToken: nodeToken) + nodes.append(node) + } + + return ValkeyClusterSlotRange( + startSlot: startSlot, + endSlot: endSlot, + nodes: nodes + ) + } +} + +extension ValkeyClusterSlotRange.Node { + fileprivate static func makeSlotNode(respToken: RESPToken) throws(ValkeyClusterParseError.Reason) -> ValkeyClusterSlotRange.Node { + guard case .array(let array) = respToken.value else { + throw .clusterSlotNodeTokenIsNotAnArray + } + + // IP, Port and Node Id are expected, additional metadata is optional + guard array.count >= 3 else { + throw .missingRequiredValueForSlotNode + } + + var iterator = array.makeIterator() + + // First element: IP address + guard let ipToken = iterator.next(), + let ip = try? String(fromRESP: ipToken) + else { + throw .missingRequiredValueForSlotNode + } + + // Second element: port + guard let portToken = iterator.next(), + case .number(let portNumber) = portToken.value + else { + throw .missingRequiredValueForSlotNode + } + let port = Int(portNumber) + + // Third element: node ID + guard let nodeIdToken = iterator.next(), + let nodeId = try? String(fromRESP: nodeIdToken) + else { + throw .missingRequiredValueForSlotNode + } + + var metadata: [String: String] = [:] + + // Any additional elements are treated as metadata + while let metadataToken = iterator.next() { + switch metadataToken.value { + case .map(let map): + // Handle metadata as a map + for (keyToken, valueToken) in map { + if let key = try? String(fromRESP: keyToken), + let value = try? String(fromRESP: valueToken) + { + metadata[key] = value + } + } + case .array(let array): + // Skip empty arrays (indicates no additional metadata) + guard array.count > 0 else { continue } + + // Handle metadata as key-value pairs in array format (using MapStyleArray) + let mapArray = MapStyleArray(underlying: array) + for (key, valueToken) in mapArray { + if let value = try? String(fromRESP: valueToken) { + metadata[key] = value + } + } + default: + throw .clusterSlotNodeMetadataIsNotAnArrayOrMap + } + } + + return ValkeyClusterSlotRange.Node( + ip: ip, + port: port, + nodeId: nodeId, + metadata: metadata + ) + } +} + struct MapStyleArray: Sequence { var underlying: RESPToken.Array diff --git a/Sources/ValkeyConnectionPool/ConnectionPool.swift b/Sources/ValkeyConnectionPool/ConnectionPool.swift index dab41e13..d86e3972 100644 --- a/Sources/ValkeyConnectionPool/ConnectionPool.swift +++ b/Sources/ValkeyConnectionPool/ConnectionPool.swift @@ -593,6 +593,7 @@ extension DiscardingTaskGroup: TaskGroupProtocol { } } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) extension TaskGroup: TaskGroupProtocol { @inlinable mutating func addTask_(operation: @escaping @Sendable () async -> Void) { diff --git a/Sources/ValkeyConnectionPool/ConnectionRequest.swift b/Sources/ValkeyConnectionPool/ConnectionRequest.swift index 7ba53c0a..ebbf345b 100644 --- a/Sources/ValkeyConnectionPool/ConnectionRequest.swift +++ b/Sources/ValkeyConnectionPool/ConnectionRequest.swift @@ -1,3 +1,4 @@ +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct ConnectionRequest: ConnectionRequestProtocol { public typealias ID = Int diff --git a/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift b/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift index f36c1574..a2ef64d4 100644 --- a/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift +++ b/Sources/_ValkeyCommandsBuilder/ValkeyCommandsRender.swift @@ -16,6 +16,9 @@ private let disableResponseCalculationCommands: Set = [ "BZMPOP", "BZPOPMAX", "BZPOPMIN", + "CLUSTER SLOTS", + "CLUSTER SLOT-STATS", + "CLUSTER LINKS", "CLUSTER GETKEYSINSLOT", "CLUSTER MYID", "CLUSTER MYSHARDID", diff --git a/Tests/IntegrationTests/ClientIntegrationTests.swift b/Tests/IntegrationTests/ClientIntegrationTests.swift index 9d3182c5..fb821dde 100644 --- a/Tests/IntegrationTests/ClientIntegrationTests.swift +++ b/Tests/IntegrationTests/ClientIntegrationTests.swift @@ -627,4 +627,93 @@ struct ClientIntegratedTests { } } + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) + @available(valkeySwift 1.0, *) + func testClusterLinks() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in + let clusterLinks = try await client.clusterLinks() + #expect(!clusterLinks.isEmpty && clusterLinks.count > 0) + for clusterLink in clusterLinks { + if let direction = clusterLink.direction { + #expect(direction == .from || direction == .to) + } + if let node = clusterLink.node { + #expect(!node.isEmpty) + } + if let createTime = clusterLink.createTime { + #expect(createTime > 0) + } + if let events = clusterLink.events { + #expect(!events.isEmpty) + } + if let sendBufferAllocated = clusterLink.sendBufferAllocated { + #expect(sendBufferAllocated >= 0) + } + if let sendBufferUsed = clusterLink.sendBufferUsed { + #expect(sendBufferUsed >= 0) + } + } + } + } + + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) + @available(valkeySwift 1.0, *) + func testClusterSlotStats() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in + let slotStats = try await client.clusterSlotStats( + filter: .orderby( + CLUSTER.SLOTSTATS.FilterOrderby( + metric: "key-count", + limit: 10, + order: .desc + ) + ) + ) + #expect(!slotStats.isEmpty && slotStats.count == 10) + for slotStat in slotStats { + // slot is a required field, other fields are optional + #expect(slotStat.slot >= 0 && slotStat.slot <= 16383) + if let keyCount = slotStat.keyCount { + #expect(keyCount >= 0) + } + if let cpuUsec = slotStat.cpuUsec { + #expect(cpuUsec >= 0) + } + if let networkBytesIn = slotStat.networkBytesIn { + #expect(networkBytesIn >= 0) + } + if let networkBytesOut = slotStat.networkBytesOut { + #expect(networkBytesOut >= 0) + } + } + } + } + + @Test(.disabled(if: clusterFirstNodeHostname == nil, "VALKEY_NODE1_HOSTNAME environment variable is not set.")) + @available(valkeySwift 1.0, *) + func testClusterSlots() async throws { + var logger = Logger(label: "Valkey") + logger.logLevel = .debug + try await withValkeyConnection(.hostname(clusterFirstNodeHostname!, port: clusterFirstNodePort ?? 36001), logger: logger) { client in + let clusterSlots = try await client.clusterSlots() + for clusterSlot in clusterSlots { + #expect(clusterSlot.startSlot >= 0 && clusterSlot.startSlot <= 16383) + #expect(clusterSlot.endSlot >= 0 && clusterSlot.endSlot <= 16383) + for node in clusterSlot.nodes { + #expect(!node.ip.isEmpty) + #expect(node.port >= 0 && node.port <= 65535) + #expect(!node.nodeId.isEmpty) + } + } + } + } + } + +private let clusterFirstNodeHostname: String? = ProcessInfo.processInfo.environment["VALKEY_NODE1_HOSTNAME"] +private let clusterFirstNodePort: Int? = ProcessInfo.processInfo.environment["VALKEY_NODE1_PORT"].flatMap { Int($0) } diff --git a/docker-compose.cluster.yml b/docker-compose.cluster.yml index 7fbc7c72..2dd271dd 100644 --- a/docker-compose.cluster.yml +++ b/docker-compose.cluster.yml @@ -12,48 +12,53 @@ services: # for more information. valkey_1: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_1 - command: valkey-server --port 36001 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36001:36001" + command: valkey-server --bind 0.0.0.0 --port 36001 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_2: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_2 - command: valkey-server --port 36002 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36002:36002" + command: valkey-server --bind 0.0.0.0 --port 36002 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_3: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_3 - command: valkey-server --port 36003 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36003:36003" + command: valkey-server --bind 0.0.0.0 --port 36003 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_4: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_4 - command: valkey-server --port 36004 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36004:36004" + command: valkey-server --bind 0.0.0.0 --port 36004 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_5: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_5 - command: valkey-server --port 36005 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36005:36005" + command: valkey-server --bind 0.0.0.0 --port 36005 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes valkey_6: image: 'valkey/valkey:latest' - network_mode: "host" container_name: valkey_6 - command: valkey-server --port 36006 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes + ports: + - "36006:36006" + command: valkey-server --bind 0.0.0.0 --port 36006 --cluster-enabled yes --cluster-config-file nodes.conf --cluster-node-timeout 5000 --appendonly yes # Ephemeral container to create the valkey cluster connections. # Once the setup is done, this container shuts down # and the cluster can be used by the service app container cluster_initiator: image: 'valkey/valkey:latest' - network_mode: "host" container_name: cluster_initiator - command: valkey-cli --cluster create localhost:36001 localhost:36002 localhost:36003 localhost:36004 localhost:36005 localhost:36006 --cluster-replicas 1 --cluster-yes + command: valkey-cli --cluster create valkey_1:36001 valkey_2:36002 valkey_3:36003 valkey_4:36004 valkey_5:36005 valkey_6:36006 --cluster-replicas 1 --cluster-yes tty: true depends_on: - valkey_1