Skip to content

Commit 18cc4a6

Browse files
authored
Latest changes from PostgresNIO connection pool (#256)
Signed-off-by: Adam Fowler <adamfowler71@gmail.com>
1 parent 16ce7dd commit 18cc4a6

11 files changed

+71
-38
lines changed

Sources/Valkey/Node/ValkeyNodeClient.swift

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -147,15 +147,10 @@ extension ValkeyNodeClient {
147147
isolation: isolated (any Actor)? = #isolation,
148148
operation: (ValkeyConnection) async throws -> sending Value
149149
) async throws -> Value {
150-
let connection = try await self.leaseConnection()
150+
let lease = try await self.connectionPool.leaseConnection()
151+
defer { lease.release() }
151152

152-
defer { self.connectionPool.releaseConnection(connection) }
153-
154-
return try await operation(connection)
155-
}
156-
157-
private func leaseConnection() async throws -> ValkeyConnection {
158-
try await self.connectionPool.leaseConnection()
153+
return try await operation(lease.connection)
159154
}
160155
}
161156

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
public struct ConnectionLease<Connection: PooledConnection>: Sendable {
2+
public var connection: Connection
3+
4+
@usableFromInline
5+
let _release: @Sendable (Connection) -> Void
6+
7+
@inlinable
8+
public init(connection: Connection, release: @escaping @Sendable (Connection) -> Void) {
9+
self.connection = connection
10+
self._release = release
11+
}
12+
13+
@inlinable
14+
public func release() {
15+
self._release(self.connection)
16+
}
17+
}

Sources/ValkeyConnectionPool/ConnectionPool.swift

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public protocol ConnectionRequestProtocol: Sendable {
8787

8888
/// A function that is called with a connection or a
8989
/// `PoolError`.
90-
func complete(with: Result<Connection, ConnectionPoolError>)
90+
func complete(with: Result<ConnectionLease<Connection>, ConnectionPoolError>)
9191
}
9292

9393
@available(macOS 13.0, iOS 16.0, tvOS 16.0, watchOS 9.0, *)
@@ -275,6 +275,7 @@ where
275275
}
276276
}
277277

278+
@inlinable
278279
public func run() async {
279280
await withTaskCancellationHandler {
280281
if #available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *) {
@@ -319,13 +320,15 @@ where
319320
}
320321

321322
@available(macOS 14.0, iOS 17.0, tvOS 17.0, watchOS 10.0, *)
322-
private func run(in taskGroup: inout DiscardingTaskGroup) async {
323+
@inlinable
324+
/* private */ func run(in taskGroup: inout DiscardingTaskGroup) async {
323325
for await event in self.eventStream {
324326
self.runEvent(event, in: &taskGroup)
325327
}
326328
}
327329

328-
private func run(in taskGroup: inout TaskGroup<Void>) async {
330+
@inlinable
331+
/* private */ func run(in taskGroup: inout TaskGroup<Void>) async {
329332
var running = 0
330333
for await event in self.eventStream {
331334
running += 1
@@ -338,7 +341,8 @@ where
338341
}
339342
}
340343

341-
private func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) {
344+
@inlinable
345+
/* private */ func runEvent(_ event: NewPoolActions, in taskGroup: inout some TaskGroupProtocol) {
342346
switch event {
343347
case .makeConnection(let request):
344348
self.makeConnection(for: request, in: &taskGroup)
@@ -405,8 +409,11 @@ where
405409
/*private*/ func runRequestAction(_ action: StateMachine.RequestAction) {
406410
switch action {
407411
case .leaseConnection(let requests, let connection):
412+
let lease = ConnectionLease(connection: connection) { connection in
413+
self.releaseConnection(connection)
414+
}
408415
for request in requests {
409-
request.complete(with: .success(connection))
416+
request.complete(with: .success(lease))
410417
}
411418

412419
case .failRequest(let request, let error):
@@ -430,7 +437,7 @@ where
430437
self.connectionEstablished(bundle)
431438

432439
// after the connection has been established, we keep the task open. This ensures
433-
// that the pools run method cannot be exited before all connections have been
440+
// that the pools run method can not be exited before all connections have been
434441
// closed.
435442
await withCheckedContinuation { (continuation: CheckedContinuation<Void, Never>) in
436443
bundle.connection.onClose {
@@ -458,7 +465,7 @@ where
458465
}
459466

460467
@inlinable
461-
/*private*/ func connectionEstablishFailed(_ error: any Error, for request: StateMachine.ConnectionRequest) {
468+
/*private*/ func connectionEstablishFailed(_ error: Error, for request: StateMachine.ConnectionRequest) {
462469
self.observabilityDelegate.connectFailed(id: request.connectionID, error: error)
463470

464471
self.modifyStateAndRunActions { state in
@@ -586,7 +593,6 @@ extension DiscardingTaskGroup: TaskGroupProtocol {
586593
}
587594
}
588595

589-
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
590596
extension TaskGroup<Void>: TaskGroupProtocol {
591597
@inlinable
592598
mutating func addTask_(operation: @escaping @Sendable () async -> Void) {
Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
public struct ConnectionPoolError: Error, Hashable {
2-
enum Base: Error, Hashable {
2+
@usableFromInline
3+
enum Base: Error, Hashable, Sendable {
34
case requestCancelled
45
case poolShutdown
56
}
67

7-
private let base: Base
8+
@usableFromInline
9+
let base: Base
810

11+
@inlinable
912
init(_ base: Base) { self.base = base }
1013

1114
/// The connection requests got cancelled
12-
public static let requestCancelled = ConnectionPoolError(.requestCancelled)
15+
@inlinable
16+
public static var requestCancelled: Self {
17+
ConnectionPoolError(.requestCancelled)
18+
}
1319
/// The connection requests can't be fulfilled as the pool has already been shutdown
14-
public static let poolShutdown = ConnectionPoolError(.poolShutdown)
20+
@inlinable
21+
public static var poolShutdown: Self {
22+
ConnectionPoolError(.poolShutdown)
23+
}
1524
}

Sources/ValkeyConnectionPool/ConnectionPoolObservabilityDelegate.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public protocol ConnectionPoolObservabilityDelegate: Sendable {
1515
/// time and is reported via ````. The
1616
func connectSucceeded(id: ConnectionID, streamCapacity: UInt16)
1717

18-
/// The utilization of the connection changed; a stream may have been used, returned or the
18+
/// The utlization of the connection changed; a stream may have been used, returned or the
1919
/// maximum number of concurrent streams available on the connection changed.
2020
func connectionUtilizationChanged(id: ConnectionID, streamsUsed: UInt16, streamCapacity: UInt16)
2121

Sources/ValkeyConnectionPool/ConnectionRequest.swift

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
1-
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
21
public struct ConnectionRequest<Connection: PooledConnection>: ConnectionRequestProtocol {
32
public typealias ID = Int
43

54
public var id: ID
65

76
@usableFromInline
8-
private(set) var continuation: CheckedContinuation<Connection, any Error>
7+
private(set) var continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
98

109
@inlinable
1110
init(
1211
id: Int,
13-
continuation: CheckedContinuation<Connection, any Error>
12+
continuation: CheckedContinuation<ConnectionLease<Connection>, any Error>
1413
) {
1514
self.id = id
1615
self.continuation = continuation
1716
}
1817

19-
public func complete(with result: Result<Connection, ConnectionPoolError>) {
18+
public func complete(with result: Result<ConnectionLease<Connection>, ConnectionPoolError>) {
2019
self.continuation.resume(with: result)
2120
}
2221
}
@@ -46,15 +45,15 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
4645
}
4746

4847
@inlinable
49-
public func leaseConnection() async throws -> Connection {
48+
public func leaseConnection() async throws -> ConnectionLease<Connection> {
5049
let requestID = requestIDGenerator.next()
5150

5251
let connection = try await withTaskCancellationHandler {
5352
if Task.isCancelled {
5453
throw CancellationError()
5554
}
5655

57-
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Connection, Error>) in
56+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ConnectionLease<Connection>, Error>) in
5857
let request = Request(
5958
id: requestID,
6059
continuation: continuation
@@ -71,8 +70,8 @@ extension ConnectionPool where Request == ConnectionRequest<Connection> {
7170

7271
@inlinable
7372
public func withConnection<Result>(_ closure: (Connection) async throws -> Result) async throws -> Result {
74-
let connection = try await self.leaseConnection()
75-
defer { self.releaseConnection(connection) }
76-
return try await closure(connection)
73+
let lease = try await self.leaseConnection()
74+
defer { lease.release() }
75+
return try await closure(lease.connection)
7776
}
7877
}

Sources/ValkeyConnectionPool/NIOLock.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
// This source file is part of the SwiftNIO open source project
1414
//
15-
// Copyright (c) 2017-2022 the SwiftNIO project authors
15+
// Copyright (c) 2017-2022 Apple Inc. and the SwiftNIO project authors
1616
// Licensed under Apache License v2.0
1717
//
1818
// See LICENSE.txt for license information
@@ -145,7 +145,7 @@ final class LockStorage<Value>: ManagedBuffer<Value, LockPrimitive> {
145145
let buffer = Self.create(minimumCapacity: 1) { _ in
146146
value
147147
}
148-
// Intentionally using a force cast here to avoid a miss compilation in 5.10.
148+
// Intentionally using a force cast here to avoid a miss compiliation in 5.10.
149149
// This is as fast as an unsafeDownCast since ManagedBuffer is inlined and the optimizer
150150
// can eliminate the upcast/downcast pair
151151
let storage = buffer as! Self

Sources/ValkeyConnectionPool/NIOLockedValueBox.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
// This source file is part of the SwiftNIO open source project
1414
//
15-
// Copyright (c) 2022 the SwiftNIO project authors
15+
// Copyright (c) 2022 Apple Inc. and the SwiftNIO project authors
1616
// Licensed under Apache License v2.0
1717
//
1818
// See LICENSE.txt for license information
@@ -51,7 +51,7 @@ struct NIOLockedValueBox<Value> {
5151

5252
/// Provides an unsafe view over the lock and its value.
5353
///
54-
/// This can be beneficial when you require fine-grained control over the lock in some
54+
/// This can be beneficial when you require fine grained control over the lock in some
5555
/// situations but don't want lose the benefits of ``withLockedValue(_:)`` in others by
5656
/// switching to ``NIOLock``.
5757
var unsafe: Unsafe {

Sources/ValkeyConnectionPool/PoolStateMachine+ConnectionGroup.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -517,13 +517,13 @@ extension PoolStateMachine {
517517
@inlinable
518518
mutating func closeConnectionIfIdle(_ connectionID: Connection.ID) -> CloseAction? {
519519
guard let index = self.connections.firstIndex(where: { $0.id == connectionID }) else {
520-
// because of a race, this connection (connection close runs against trigger of timeout)
520+
// because of a race this connection (connection close runs against trigger of timeout)
521521
// was already removed from the state machine.
522522
return nil
523523
}
524524

525525
if index < self.minimumConcurrentConnections {
526-
// because of a race, a connection might receive an idle timeout after it was moved into
526+
// because of a race a connection might receive a idle timeout after it was moved into
527527
// the persisted connections. If a connection is now persisted, we now need to ignore
528528
// the trigger
529529
return nil

Sources/ValkeyConnectionPool/PoolStateMachine.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ struct PoolConfiguration: Sendable {
1717
@usableFromInline
1818
var minimumConnectionCount: Int = 0
1919

20-
/// The maximum number of connections for this pool, to be preserved.
20+
/// The maximum number of connections to for this pool, to be preserved.
2121
@usableFromInline
2222
var maximumConnectionSoftLimit: Int = 10
2323

@@ -434,6 +434,7 @@ struct PoolStateMachine<
434434
fatalError("Unimplemented")
435435
}
436436

437+
@usableFromInline
437438
mutating func triggerForceShutdown() -> Action {
438439
switch self.poolState {
439440
case .running:

0 commit comments

Comments
 (0)