Skip to content

Commit 16ce7dd

Browse files
authored
Add support for calling READONLY on replicas (#255)
* Add support for calling READONLY on replicas Signed-off-by: Adam Fowler <adamfowler71@gmail.com> * Add documentation fix Signed-off-by: Adam Fowler <adamfowler71@gmail.com> --------- Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent ee9ed4d commit 16ce7dd

12 files changed

+63
-17
lines changed

Sources/Valkey/Cluster/ValkeyClusterClientStateMachine.swift

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ where
283283
newShards.lazy.flatMap { $0.nodes.lazy.map { ValkeyNodeDescription(description: $0) } },
284284
removeUnmentionedPools: true
285285
)
286-
assert(poolUpdate.poolsToRun.isEmpty)
287286

288287
var result = ClusterDiscoverySucceededAction(
289288
createTimer: .init(

Sources/Valkey/Cluster/ValkeyClusterNodeClientFactory.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ package struct ValkeyClusterNodeClientFactory: ValkeyNodeConnectionPoolFactory {
5050
address,
5151
connectionIDGenerator: self.connectionIDGenerator,
5252
connectionFactory: self.connectionFactory,
53+
readOnly: nodeDescription.readOnly,
5354
eventLoopGroup: self.eventLoopGroup,
5455
logger: self.logger
5556
)

Sources/Valkey/Cluster/ValkeyNodeDescription.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ package struct ValkeyNodeDescription: Identifiable, Hashable, Sendable {
4141
/// This property is required and is used as part of the node's unique identifier.
4242
package var port: Int
4343

44+
/// Is node a readonly replica
45+
package var readOnly: Bool
46+
4447
/// Creates a node description from any type conforming to the `ValkeyNodeDescriptionProtocol`.
4548
///
4649
/// This initializer allows for easy conversion from various node description types
@@ -51,6 +54,7 @@ package struct ValkeyNodeDescription: Identifiable, Hashable, Sendable {
5154
package init(description: any ValkeyNodeDescriptionProtocol) {
5255
self.endpoint = description.endpoint
5356
self.port = description.port
57+
self.readOnly = description.readOnly
5458
}
5559

5660
/// Creates a node description from a cluster node description.
@@ -64,6 +68,7 @@ package struct ValkeyNodeDescription: Identifiable, Hashable, Sendable {
6468
package init(description: ValkeyClusterDescription.Node) {
6569
self.endpoint = description.endpoint
6670
self.port = description.tlsPort ?? description.port ?? 6379
71+
self.readOnly = description.role == .replica
6772
}
6873

6974
/// Creates a node description from a redirection error.
@@ -74,6 +79,7 @@ package struct ValkeyNodeDescription: Identifiable, Hashable, Sendable {
7479
package init(redirectionError: ValkeyClusterRedirectionError) {
7580
self.endpoint = redirectionError.endpoint
7681
self.port = redirectionError.port
82+
self.readOnly = false
7783
}
7884

7985
/// Determines whether this node description matches a given cluster node description.

Sources/Valkey/Cluster/ValkeyNodeDescriptionProtocol.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,4 +30,7 @@ public protocol ValkeyNodeDescriptionProtocol: Sendable, Equatable {
3030

3131
/// The port number on which the Valkey service is listening.
3232
var port: Int { get }
33+
34+
/// Is node a readonly replica.
35+
var readOnly: Bool { get }
3336
}

Sources/Valkey/Cluster/ValkeyNodeDiscovery.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public struct ValkeyStaticNodeDiscovery: ValkeyNodeDiscovery {
4545
public struct NodeDescription: ValkeyNodeDescriptionProtocol {
4646
public var endpoint: String
4747
public var port: Int
48+
public var readOnly: Bool { false }
4849

4950
/// Initializes a `NodeDescription` with a host and optional IP.
5051
///

Sources/Valkey/Connection/ValkeyChannelHandler.swift

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
5757
@usableFromInline
5858
let blockingCommandTimeout: TimeAmount
5959
let clientName: String?
60+
let readOnly: Bool
6061
}
6162
@usableFromInline
6263
struct PendingCommand {
@@ -286,10 +287,15 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
286287
let clientInfoLibName = CLIENT.SETINFO(attr: .libname(valkeySwiftLibraryName))
287288
let clientInfoLibVersion = CLIENT.SETINFO(attr: .libver(valkeySwiftLibraryVersion))
288289

290+
var numberOfPendingCommands = 2
289291
self.encoder.reset()
290292
helloCommand.encode(into: &self.encoder)
291293
clientInfoLibName.encode(into: &self.encoder)
292294
clientInfoLibVersion.encode(into: &self.encoder)
295+
if self.configuration.readOnly {
296+
numberOfPendingCommands += 1
297+
READONLY().encode(into: &self.encoder)
298+
}
293299

294300
let promise = eventLoop.makePromise(of: RESPToken.self)
295301

@@ -300,10 +306,7 @@ final class ValkeyChannelHandler: ChannelInboundHandler {
300306
self.stateMachine.setConnected(
301307
context: context,
302308
pendingHelloCommand: .init(promise: .nio(promise), requestID: 0, deadline: deadline),
303-
pendingCommands: [
304-
.init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libname
305-
.init(promise: .forget, requestID: 0, deadline: deadline), // CLIENT.SETINFO libver
306-
]
309+
pendingCommands: .init(repeating: .init(promise: .forget, requestID: 0, deadline: deadline), count: numberOfPendingCommands)
307310
)
308311
}
309312

@@ -535,7 +538,8 @@ extension ValkeyChannelHandler.Configuration {
535538
authentication: other.authentication,
536539
commandTimeout: .init(other.commandTimeout),
537540
blockingCommandTimeout: .init(other.blockingCommandTimeout),
538-
clientName: other.clientName
541+
clientName: other.clientName,
542+
readOnly: other.readOnly
539543
)
540544
}
541545
}

Sources/Valkey/Connection/ValkeyConnectionConfiguration.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ public struct ValkeyConnectionConfiguration: Sendable {
116116
/// Default value is `nil` (no client name is set).
117117
public var clientName: String?
118118

119+
/// Is connection to be flagged as readonly.
120+
///
121+
/// Readonly connections can run readonly commands on replica nodes
122+
public var readOnly: Bool
123+
119124
#if DistributedTracingSupport
120125
/// The distributed tracing configuration to use for this connection.
121126
/// Defaults to using the globally bootstrapped tracer with OpenTelemetry semantic conventions.
@@ -133,18 +138,21 @@ public struct ValkeyConnectionConfiguration: Sendable {
133138
/// - blockingCommandTimeout: Maximum time to wait for a response to blocking commands. Defaults to 120 seconds.
134139
/// - tls: TLS configuration for secure connections. Defaults to `.disable` for unencrypted connections.
135140
/// - clientName: Optional name to identify this client connection on the server. Defaults to `nil`.
141+
/// - readOnly: Is the connection a readonly connection
136142
public init(
137143
authentication: Authentication? = nil,
138144
commandTimeout: Duration = .seconds(30),
139145
blockingCommandTimeout: Duration = .seconds(120),
140146
tls: TLS = .disable,
141-
clientName: String? = nil
147+
clientName: String? = nil,
148+
readOnly: Bool = false
142149
) {
143150
self.authentication = authentication
144151
self.commandTimeout = commandTimeout
145152
self.blockingCommandTimeout = blockingCommandTimeout
146153
self.tls = tls
147154
self.clientName = clientName
155+
self.readOnly = readOnly
148156
}
149157
}
150158

Sources/Valkey/Connection/ValkeyConnectionFactory.swift

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,14 @@ package final class ValkeyConnectionFactory: Sendable {
5454

5555
package func makeConnection(
5656
address: ValkeyServerAddress,
57+
readOnly: Bool,
5758
connectionID: Int,
5859
eventLoop: any EventLoop,
5960
logger: Logger
6061
) async throws -> ValkeyConnection {
6162
switch self.mode {
6263
case .default:
63-
let connectionConfig = try await self.makeConnectionConfiguration()
64+
let connectionConfig = try await self.makeConnectionConfiguration(readOnly: readOnly)
6465
return try await ValkeyConnection.connect(
6566
address: address,
6667
connectionID: connectionID,
@@ -70,7 +71,7 @@ package final class ValkeyConnectionFactory: Sendable {
7071
)
7172

7273
case .custom(let customHandler):
73-
async let connectionConfigPromise = self.makeConnectionConfiguration()
74+
async let connectionConfigPromise = self.makeConnectionConfiguration(readOnly: readOnly)
7475
let channel = try await customHandler(address, eventLoop)
7576
let connectionConfig = try await connectionConfigPromise
7677

@@ -94,7 +95,7 @@ package final class ValkeyConnectionFactory: Sendable {
9495
}
9596
}
9697

97-
func makeConnectionConfiguration() async throws -> ValkeyConnectionConfiguration {
98+
func makeConnectionConfiguration(readOnly: Bool) async throws -> ValkeyConnectionConfiguration {
9899
let tls: ValkeyConnectionConfiguration.TLS =
99100
switch self.configuration.tls.base {
100101
case .disable:
@@ -110,7 +111,8 @@ package final class ValkeyConnectionFactory: Sendable {
110111
commandTimeout: self.configuration.commandTimeout,
111112
blockingCommandTimeout: self.configuration.blockingCommandTimeout,
112113
tls: tls,
113-
clientName: nil
114+
clientName: nil,
115+
readOnly: readOnly
114116
)
115117

116118
#if DistributedTracingSupport

Sources/Valkey/Node/ValkeyNodeClient.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,10 @@ package final class ValkeyNodeClient: Sendable {
4646
public let serverAddress: ValkeyServerAddress
4747
/// Connection pool
4848
let connectionPool: Pool
49-
49+
/// Connection factory
5050
let connectionFactory: ValkeyConnectionFactory
51+
/// Should we create readonly connections
52+
let readOnly: Bool
5153
/// configuration
5254
public var configuration: ValkeyClientConfiguration { self.connectionFactory.configuration }
5355
/// EventLoopGroup to use
@@ -70,6 +72,7 @@ package final class ValkeyNodeClient: Sendable {
7072
_ address: ValkeyServerAddress,
7173
connectionIDGenerator: ConnectionIDGenerator,
7274
connectionFactory: ValkeyConnectionFactory,
75+
readOnly: Bool,
7376
eventLoopGroup: any EventLoopGroup,
7477
logger: Logger
7578
) {
@@ -80,6 +83,7 @@ package final class ValkeyNodeClient: Sendable {
8083
poolConfiguration.maximumConnectionSoftLimit = connectionFactory.configuration.connectionPool.maximumConnectionCount
8184
poolConfiguration.maximumConnectionHardLimit = connectionFactory.configuration.connectionPool.maximumConnectionCount
8285

86+
self.readOnly = readOnly
8387
self.connectionPool = .init(
8488
configuration: poolConfiguration,
8589
idGenerator: connectionIDGenerator,
@@ -93,6 +97,7 @@ package final class ValkeyNodeClient: Sendable {
9397

9498
let connection = try await connectionFactory.makeConnection(
9599
address: address,
100+
readOnly: readOnly,
96101
connectionID: connectionID,
97102
eventLoop: eventLoopGroup.any(),
98103
logger: logger

Sources/Valkey/Node/ValkeyNodeClientFactory.swift

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
2121
let connectionIDGenerator = ConnectionIDGenerator()
2222
let connectionFactory: ValkeyConnectionFactory
2323

24+
@usableFromInline
25+
package struct NodeDescription {
26+
let address: ValkeyServerAddress
27+
let readOnly: Bool
28+
}
29+
2430
/// Creates a new `ValkeyClientFactory` instance.
2531
///
2632
/// - Parameters:
@@ -44,11 +50,12 @@ package struct ValkeyNodeClientFactory: ValkeyNodeConnectionPoolFactory {
4450
/// - Parameter nodeDescription: Description of the node to connect to.
4551
/// - Returns: A configured `ValkeyNode` instance ready to connect to the specified node.
4652
@usableFromInline
47-
package func makeConnectionPool(nodeDescription: ValkeyServerAddress) -> ValkeyNodeClient {
53+
package func makeConnectionPool(nodeDescription: NodeDescription) -> ValkeyNodeClient {
4854
ValkeyNodeClient(
49-
nodeDescription,
55+
nodeDescription.address,
5056
connectionIDGenerator: self.connectionIDGenerator,
5157
connectionFactory: self.connectionFactory,
58+
readOnly: nodeDescription.readOnly,
5259
eventLoopGroup: self.eventLoopGroup,
5360
logger: self.logger
5461
)

0 commit comments

Comments
 (0)