From 8b9544d11606b6c7487f5cfd0ea337401566d535 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 13 May 2025 09:53:49 +0100 Subject: [PATCH 1/4] End publish listener on connection close if clean session is set to true --- .../AsyncAwaitSupport/MQTTClient+async.swift | 7 + .../MQTTClientV5+async.swift | 7 + Sources/MQTTNIO/MQTTClient.swift | 8 +- Sources/MQTTNIO/MQTTConnection.swift | 8 +- Sources/MQTTNIO/MQTTListeners.swift | 4 +- Tests/MQTTNIOTests/MQTTNIOTests+async.swift | 385 +++++++++++------- 6 files changed, 266 insertions(+), 153 deletions(-) diff --git a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift index 67dd4c0..dcfdfac 100644 --- a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift +++ b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift @@ -173,6 +173,7 @@ public class MQTTPublishListener: AsyncSequence { let name = UUID().uuidString self.client = client self.name = name + let cleanSession = client.connection?.cleanSession ?? true self.stream = AsyncStream { cont in client.addPublishListener(named: name) { result in cont.yield(result) @@ -180,11 +181,17 @@ public class MQTTPublishListener: AsyncSequence { client.addShutdownListener(named: name) { _ in cont.finish() } + client.addCloseListener(named: name) { connectResult in + if cleanSession { + cont.finish() + } + } } } deinit { self.client.removePublishListener(named: self.name) + self.client.removeCloseListener(named: self.name) self.client.removeShutdownListener(named: self.name) } diff --git a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift index a9490ab..ee505df 100644 --- a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift +++ b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift @@ -166,6 +166,7 @@ public class MQTTPublishIdListener: AsyncSequence { let name = UUID().uuidString self.client = client.client self.name = name + let cleanSession = client.client.connection?.cleanSession ?? true self.stream = AsyncStream { cont in client.addPublishListener(named: name, subscriptionId: subscriptionId) { result in cont.yield(result) @@ -173,11 +174,17 @@ public class MQTTPublishIdListener: AsyncSequence { client.client.addShutdownListener(named: name) { _ in cont.finish() } + client.client.addCloseListener(named: name) { connectResult in + if cleanSession { + cont.finish() + } + } } } deinit { self.client.removePublishListener(named: self.name) + self.client.removeCloseListener(named: self.name) self.client.removeShutdownListener(named: self.name) } diff --git a/Sources/MQTTNIO/MQTTClient.swift b/Sources/MQTTNIO/MQTTClient.swift index 98a4f28..7cbf559 100644 --- a/Sources/MQTTNIO/MQTTClient.swift +++ b/Sources/MQTTNIO/MQTTClient.swift @@ -463,9 +463,9 @@ public final class MQTTClient { } var connectionParameters = ConnectionParameters() - let publishListeners = MQTTListeners() - let closeListeners = MQTTListeners() - let shutdownListeners = MQTTListeners() + let publishListeners = MQTTListeners>() + let closeListeners = MQTTListeners>() + let shutdownListeners = MQTTListeners>() private var _connection: MQTTConnection? private var lock = NIOLock() } @@ -478,7 +478,7 @@ extension MQTTClient { ) -> EventLoopFuture { let pingInterval = self.configuration.pingInterval ?? TimeAmount.seconds(max(Int64(packet.keepAliveSeconds - 5), 5)) - let connectFuture = MQTTConnection.create(client: self, pingInterval: pingInterval) + let connectFuture = MQTTConnection.create(client: self, cleanSession: packet.cleanSession, pingInterval: pingInterval) let eventLoop = connectFuture.eventLoop return connectFuture diff --git a/Sources/MQTTNIO/MQTTConnection.swift b/Sources/MQTTNIO/MQTTConnection.swift index 942d711..a718150 100644 --- a/Sources/MQTTNIO/MQTTConnection.swift +++ b/Sources/MQTTNIO/MQTTConnection.swift @@ -30,19 +30,21 @@ import NIOSSL final class MQTTConnection { let channel: Channel + let cleanSession: Bool let timeout: TimeAmount? let taskHandler: MQTTTaskHandler - private init(channel: Channel, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) { + private init(channel: Channel, cleanSession: Bool, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) { self.channel = channel + self.cleanSession = cleanSession self.timeout = timeout self.taskHandler = taskHandler } - static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture { + static func create(client: MQTTClient, cleanSession: Bool, pingInterval: TimeAmount) -> EventLoopFuture { let taskHandler = MQTTTaskHandler(client: client) return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler) - .map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) } + .map { MQTTConnection(channel: $0, cleanSession: cleanSession, timeout: client.configuration.timeout, taskHandler: taskHandler) } } static func createBootstrap(client: MQTTClient, pingInterval: TimeAmount, taskHandler: MQTTTaskHandler) -> EventLoopFuture { diff --git a/Sources/MQTTNIO/MQTTListeners.swift b/Sources/MQTTNIO/MQTTListeners.swift index d20dade..7ae1dcf 100644 --- a/Sources/MQTTNIO/MQTTListeners.swift +++ b/Sources/MQTTNIO/MQTTListeners.swift @@ -15,9 +15,9 @@ import NIO import NIOConcurrencyHelpers final class MQTTListeners { - typealias Listener = (Result) -> Void + typealias Listener = (ReturnType) -> Void - func notify(_ result: Result) { + func notify(_ result: ReturnType) { let listeners = self.lock.withLock { return self.listeners } diff --git a/Tests/MQTTNIOTests/MQTTNIOTests+async.swift b/Tests/MQTTNIOTests/MQTTNIOTests+async.swift index e732fe1..6072b01 100644 --- a/Tests/MQTTNIOTests/MQTTNIOTests+async.swift +++ b/Tests/MQTTNIOTests/MQTTNIOTests+async.swift @@ -33,156 +33,170 @@ final class AsyncMQTTNIOTests: XCTestCase { return logger }() - func createClient(identifier: String, version: MQTTClient.Version = .v3_1_1, timeout: TimeAmount? = .seconds(10)) -> MQTTClient { + func createClient(identifier: String, configuration: MQTTClient.Configuration = .init()) -> MQTTClient { MQTTClient( host: Self.hostname, port: 1883, identifier: identifier, eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup.singleton), logger: Self.logger, - configuration: .init(version: version, timeout: timeout) + configuration: configuration ) } - func testConnect() async throws { - let client = self.createClient(identifier: "testConnect+async") - try await client.connect() - try await client.disconnect() + func withMQTTClient( + identifier: String, + configuration: MQTTClient.Configuration = .init(), + operation: (MQTTClient) async throws -> Void + ) async throws { + let client = createClient(identifier: identifier, configuration: configuration) + do { + try await operation(client) + } catch { + try? await client.shutdown() + throw error + } try await client.shutdown() } + func testConnect() async throws { + try await withMQTTClient(identifier: "testConnect+async") { client in + try await client.connect() + try await client.disconnect() + } + } + func testPublishSubscribe() async throws { - let client = self.createClient(identifier: "testPublish+async") - let client2 = self.createClient(identifier: "testPublish+async2") - let payloadString = "Hello" - try await client.connect() - try await client2.connect() - _ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - let listener = client2.createPublishListener() - for try await event in listener { - switch event { - case .success(let publish): - var buffer = publish.payload - let string = buffer.readString(length: buffer.readableBytes) - XCTAssertEqual(string, payloadString) - return - case .failure(let error): - XCTFail("\(error)") + try await withMQTTClient(identifier: "testPublish+async") { client in + try await withMQTTClient(identifier: "testPublish+async2") { client2 in + let payloadString = "Hello" + try await client.connect() + try await client2.connect() + _ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let listener = client2.createPublishListener() + for try await event in listener { + switch event { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + return + case .failure(let error): + XCTFail("\(error)") + } + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") } + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + try await group.next() + group.cancelAll() } + + try await client2.disconnect() } - group.addTask { - try await Task.sleep(nanoseconds: 5_000_000_000) - XCTFail("Timeout") - } - try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) - try await group.next() - group.cancelAll() + try await client.disconnect() } - - try await client.disconnect() - try await client2.disconnect() - try await client.shutdown() - try await client2.shutdown() } func testPing() async throws { - let client = MQTTClient( - host: Self.hostname, - port: 1883, - identifier: "TestPing", - eventLoopGroupProvider: .shared(MultiThreadedEventLoopGroup.singleton), - logger: Self.logger, - configuration: .init(disablePing: true) - ) - - try await client.connect() - try await client.ping() - try await client.disconnect() - try await client.shutdown() + try await withMQTTClient(identifier: "TestPing", configuration: .init(disablePing: true)) { client in + try await client.connect() + try await client.ping() + try await client.disconnect() + } } func testAsyncSequencePublishListener() async throws { - let client = self.createClient(identifier: "testAsyncSequencePublishListener+async", version: .v5_0) - let client2 = self.createClient(identifier: "testAsyncSequencePublishListener+async2", version: .v5_0) - - try await client.connect() - try await client2.connect() - _ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - let publishListener = client2.createPublishListener() - for await result in publishListener { - switch result { - case .success(let publish): - var buffer = publish.payload - let string = buffer.readString(length: buffer.readableBytes) - print("Received: \(string ?? "nothing")") - return - - case .failure(let error): - XCTFail("\(error)") + try await withMQTTClient(identifier: "testAsyncSequencePublishListener+async", configuration: .init(version: .v5_0)) { client in + try await withMQTTClient(identifier: "testAsyncSequencePublishListener+async2", configuration: .init(version: .v5_0)) { client2 in + + try await client.connect() + try await client2.connect() + _ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let publishListener = client2.createPublishListener() + for await result in publishListener { + switch result { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + print("Received: \(string ?? "nothing")") + return + + case .failure(let error): + XCTFail("\(error)") + } + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") } + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Hello"), qos: .atLeastOnce) + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Goodbye"), qos: .atLeastOnce) + + try await group.next() + group.cancelAll() } + try await client2.disconnect() } - group.addTask { - try await Task.sleep(nanoseconds: 5_000_000_000) - XCTFail("Timeout") - } - try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Hello"), qos: .atLeastOnce) - try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: "Goodbye"), qos: .atLeastOnce) - - try await group.next() - group.cancelAll() + try await client.disconnect() } - try await client.disconnect() - try await client2.disconnect() - try await client.shutdown() - try await client2.shutdown() } func testAsyncSequencePublishSubscriptionIdListener() async throws { - let client = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async", version: .v5_0) - let client2 = self.createClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async2", version: .v5_0) - let payloadString = "Hello" - - try await client.connect() - try await client2.connect() - _ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)], properties: [.subscriptionIdentifier(1)]) - _ = try await client2.v5.subscribe(to: [.init(topicFilter: "TestSubject2", qos: .atLeastOnce)], properties: [.subscriptionIdentifier(2)]) - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - let publishListener = client2.v5.createPublishListener(subscriptionId: 1) - for await event in publishListener { - XCTAssertEqual(String(buffer: event.payload), payloadString) - return - } - } - group.addTask { - let publishListener = client2.v5.createPublishListener(subscriptionId: 2) - for await event in publishListener { - XCTAssertEqual(String(buffer: event.payload), payloadString) - return + try await withMQTTClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async", configuration: .init(version: .v5_0)) { client in + try await withMQTTClient(identifier: "testAsyncSequencePublishSubscriptionIdListener+async2", configuration: .init(version: .v5_0)) { + client2 in + let payloadString = "Hello" + + try await client.connect() + try await client2.connect() + _ = try await client2.v5.subscribe( + to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)], + properties: [.subscriptionIdentifier(1)] + ) + _ = try await client2.v5.subscribe( + to: [.init(topicFilter: "TestSubject2", qos: .atLeastOnce)], + properties: [.subscriptionIdentifier(2)] + ) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let publishListener = client2.v5.createPublishListener(subscriptionId: 1) + for await event in publishListener { + XCTAssertEqual(String(buffer: event.payload), payloadString) + return + } + } + group.addTask { + let publishListener = client2.v5.createPublishListener(subscriptionId: 2) + for await event in publishListener { + XCTAssertEqual(String(buffer: event.payload), payloadString) + return + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") + } + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + try await client.publish(to: "TestSubject2", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + + try await group.next() + try await group.next() + group.cancelAll() } - } - group.addTask { - try await Task.sleep(nanoseconds: 5_000_000_000) - XCTFail("Timeout") - } - try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) - try await client.publish(to: "TestSubject2", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) - try await group.next() - try await group.next() - group.cancelAll() + try await client2.disconnect() + } + try await client.disconnect() } - - try await client.disconnect() - try await client2.disconnect() - try client.syncShutdownGracefully() - try client2.syncShutdownGracefully() } func testMQTTPublishRetain() async throws { @@ -190,38 +204,121 @@ final class AsyncMQTTNIOTests: XCTestCase { #"{"from":1000000,"to":1234567,"type":1,"content":"I am a beginner in swift and I am studying hard!!测试\n\n test, message","timestamp":1607243024,"nonce":"pAx2EsUuXrVuiIU3GGOGHNbUjzRRdT5b","sign":"ff902e31a6a5f5343d70a3a93ac9f946adf1caccab539c6f3a6"}"# let payload = ByteBufferAllocator().buffer(string: payloadString) - let client = self.createClient(identifier: "testMQTTPublishRetain_publisher") - defer { XCTAssertNoThrow(try client.syncShutdownGracefully()) } - let client2 = self.createClient(identifier: "testMQTTPublishRetain_subscriber") - defer { XCTAssertNoThrow(try client2.syncShutdownGracefully()) } - try await client.connect() - try await client.publish(to: "testAsyncMQTTPublishRetain", payload: payload, qos: .atLeastOnce, retain: true) - try await client2.connect() - try await withThrowingTaskGroup(of: Void.self) { group in - group.addTask { - let listener = client2.createPublishListener() - for try await event in listener { - switch event { - case .success(let publish): - var buffer = publish.payload - let string = buffer.readString(length: buffer.readableBytes) - XCTAssertEqual(string, payloadString) - return - case .failure(let error): - XCTFail("\(error)") + try await withMQTTClient(identifier: "testMQTTPublishRetain_publisher+async") { client in + try await withMQTTClient(identifier: "testMQTTPublishRetain_subscriber+async") { client2 in + try await client.connect() + try await client.publish(to: "testAsyncMQTTPublishRetain", payload: payload, qos: .atLeastOnce, retain: true) + try await client2.connect() + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let listener = client2.createPublishListener() + for try await event in listener { + switch event { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + return + case .failure(let error): + XCTFail("\(error)") + } + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") } + _ = try await client2.subscribe(to: [.init(topicFilter: "testAsyncMQTTPublishRetain", qos: .atLeastOnce)]) + try await group.next() + group.cancelAll() } + + try await client2.disconnect() } - group.addTask { - try await Task.sleep(nanoseconds: 5_000_000_000) - XCTFail("Timeout") + try await client.disconnect() + } + } + + func testPersistentSubscription() async throws { + let count = ManagedAtomic(0) + let (stream, cont) = AsyncStream.makeStream(of: Void.self) + try await withMQTTClient(identifier: "testPublish+async") { client in + try await withMQTTClient(identifier: "testPublish+async2") { client2 in + let payloadString = "Hello" + try await client.connect() + try await client2.connect(cleanSession: false) + _ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let listener = client2.createPublishListener() + cont.finish() + for try await event in listener { + switch event { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + let value = count.wrappingIncrementThenLoad(by: 1, ordering: .relaxed) + if value == 2 { + return + } + case .failure(let error): + XCTFail("\(error)") + } + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") + } + await stream.first { _ in true } + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + try await client2.disconnect() + try await client2.connect(cleanSession: false) + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + try await group.next() + group.cancelAll() + } + + try await client2.disconnect() } - _ = try await client2.subscribe(to: [.init(topicFilter: "testAsyncMQTTPublishRetain", qos: .atLeastOnce)]) - try await group.next() - group.cancelAll() + try await client.disconnect() } + XCTAssertEqual(count.load(ordering: .relaxed), 2) + } - try await client.disconnect() - try await client2.disconnect() + func testSubscriptionListenerEndsOnCleanSessionDisconnect() async throws { + try await withMQTTClient(identifier: "testPublish+async") { client in + try await withMQTTClient(identifier: "testPublish+async2") { client2 in + let payloadString = "Hello" + try await client.connect() + try await client2.connect(cleanSession: true) + _ = try await client2.subscribe(to: [.init(topicFilter: "TestSubject", qos: .atLeastOnce)]) + try await withThrowingTaskGroup(of: Void.self) { group in + group.addTask { + let listener = client2.createPublishListener() + for try await event in listener { + switch event { + case .success(let publish): + var buffer = publish.payload + let string = buffer.readString(length: buffer.readableBytes) + XCTAssertEqual(string, payloadString) + case .failure(let error): + XCTFail("\(error)") + } + } + } + group.addTask { + try await Task.sleep(nanoseconds: 5_000_000_000) + XCTFail("Timeout") + } + try await client.publish(to: "TestSubject", payload: ByteBufferAllocator().buffer(string: payloadString), qos: .atLeastOnce) + try await client2.disconnect() + try await group.next() + group.cancelAll() + } + } + try await client.disconnect() + } } } From ed9f50989b3304d09dddd04b39a1051afd4e91db Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 13 May 2025 09:55:45 +0100 Subject: [PATCH 2/4] Add Swift 6.1 to CI --- .github/workflows/ci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c1a0c96..e48230d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,10 +49,9 @@ jobs: strategy: matrix: tag: - - swift:5.9 - swift:5.10 - swift:6.0 - - swiftlang/swift:nightly-6.1-jammy + - swift:6.1 container: image: ${{ matrix.tag }} services: From f740dce8f63bdefd88fe68999c8a1a49ff7330ed Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 13 May 2025 10:01:38 +0100 Subject: [PATCH 3/4] Update CI --- .github/workflows/ci.yml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index e48230d..a36d524 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,10 +10,14 @@ on: release: types: [published] workflow_dispatch: +concurrency: + group: ${{ github.workflow }}-${{ github.ref }}-ci + cancel-in-progress: true jobs: macos: runs-on: macOS-latest + timeout-minutes: 15 steps: - name: Checkout uses: actions/checkout@v4 @@ -37,6 +41,7 @@ jobs: ios: runs-on: macOS-latest + timeout-minutes: 15 steps: - name: Checkout uses: actions/checkout@v4 @@ -46,6 +51,7 @@ jobs: linux: runs-on: ubuntu-latest + timeout-minutes: 15 strategy: matrix: tag: From 52f09dfbf9e1e3c466d596f3a603126d192db871 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Tue, 13 May 2025 10:18:27 +0100 Subject: [PATCH 4/4] Assume non zero session expiry properties don't clean session on close --- Sources/MQTTNIO/MQTTClient.swift | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/Sources/MQTTNIO/MQTTClient.swift b/Sources/MQTTNIO/MQTTClient.swift index 7cbf559..abc5a59 100644 --- a/Sources/MQTTNIO/MQTTClient.swift +++ b/Sources/MQTTNIO/MQTTClient.swift @@ -477,8 +477,17 @@ extension MQTTClient { authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture)? = nil ) -> EventLoopFuture { let pingInterval = self.configuration.pingInterval ?? TimeAmount.seconds(max(Int64(packet.keepAliveSeconds - 5), 5)) - - let connectFuture = MQTTConnection.create(client: self, cleanSession: packet.cleanSession, pingInterval: pingInterval) + var cleanSession = packet.cleanSession + // if connection has non zero session expiry then assume it doesnt clean session on close + for p in packet.properties { + // check topic alias + if case .sessionExpiryInterval(let interval) = p { + if interval > 0 { + cleanSession = false + } + } + } + let connectFuture = MQTTConnection.create(client: self, cleanSession: cleanSession, pingInterval: pingInterval) let eventLoop = connectFuture.eventLoop return connectFuture