Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ on:
release:
types: [published]
workflow_dispatch:
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-ci
cancel-in-progress: true

jobs:
macos:
runs-on: macOS-latest
timeout-minutes: 15
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -37,6 +41,7 @@ jobs:

ios:
runs-on: macOS-latest
timeout-minutes: 15
steps:
- name: Checkout
uses: actions/checkout@v4
Expand All @@ -46,13 +51,13 @@ jobs:

linux:
runs-on: ubuntu-latest
timeout-minutes: 15
strategy:
matrix:
tag:
- swift:5.9
- swift:5.10
- swift:6.0
- swiftlang/swift:nightly-6.1-jammy
- swift:6.1
container:
image: ${{ matrix.tag }}
services:
Expand Down
7 changes: 7 additions & 0 deletions Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,25 @@ public class MQTTPublishListener: AsyncSequence {
let name = UUID().uuidString
self.client = client
self.name = name
let cleanSession = client.connection?.cleanSession ?? true
self.stream = AsyncStream { cont in
client.addPublishListener(named: name) { result in
cont.yield(result)
}
client.addShutdownListener(named: name) { _ in
cont.finish()
}
client.addCloseListener(named: name) { connectResult in
if cleanSession {
cont.finish()
}
}
}
}

deinit {
self.client.removePublishListener(named: self.name)
self.client.removeCloseListener(named: self.name)
self.client.removeShutdownListener(named: self.name)
}

Expand Down
7 changes: 7 additions & 0 deletions Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,25 @@
let name = UUID().uuidString
self.client = client.client
self.name = name
let cleanSession = client.client.connection?.cleanSession ?? true
self.stream = AsyncStream { cont in
client.addPublishListener(named: name, subscriptionId: subscriptionId) { result in
cont.yield(result)
}
client.client.addShutdownListener(named: name) { _ in
cont.finish()
}
client.client.addCloseListener(named: name) { connectResult in
if cleanSession {
cont.finish()
}
}

Check warning on line 181 in Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift

View check run for this annotation

Codecov / codecov/patch

Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift#L178-L181

Added lines #L178 - L181 were not covered by tests
}
}

deinit {
self.client.removePublishListener(named: self.name)
self.client.removeCloseListener(named: self.name)
self.client.removeShutdownListener(named: self.name)
}

Expand Down
19 changes: 14 additions & 5 deletions Sources/MQTTNIO/MQTTClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -463,9 +463,9 @@ public final class MQTTClient {
}

var connectionParameters = ConnectionParameters()
let publishListeners = MQTTListeners<MQTTPublishInfo>()
let closeListeners = MQTTListeners<Void>()
let shutdownListeners = MQTTListeners<Void>()
let publishListeners = MQTTListeners<Result<MQTTPublishInfo, Error>>()
let closeListeners = MQTTListeners<Result<Void, Error>>()
let shutdownListeners = MQTTListeners<Result<Void, Error>>()
private var _connection: MQTTConnection?
private var lock = NIOLock()
}
Expand All @@ -477,8 +477,17 @@ extension MQTTClient {
authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture<MQTTAuthV5>)? = nil
) -> EventLoopFuture<MQTTConnAckPacket> {
let pingInterval = self.configuration.pingInterval ?? TimeAmount.seconds(max(Int64(packet.keepAliveSeconds - 5), 5))

let connectFuture = MQTTConnection.create(client: self, pingInterval: pingInterval)
var cleanSession = packet.cleanSession
// if connection has non zero session expiry then assume it doesnt clean session on close
for p in packet.properties {
// check topic alias
if case .sessionExpiryInterval(let interval) = p {
if interval > 0 {
cleanSession = false
}
}
}
let connectFuture = MQTTConnection.create(client: self, cleanSession: cleanSession, pingInterval: pingInterval)
let eventLoop = connectFuture.eventLoop
return
connectFuture
Expand Down
8 changes: 5 additions & 3 deletions Sources/MQTTNIO/MQTTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,21 @@ import NIOSSL

final class MQTTConnection {
let channel: Channel
let cleanSession: Bool
let timeout: TimeAmount?
let taskHandler: MQTTTaskHandler

private init(channel: Channel, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) {
private init(channel: Channel, cleanSession: Bool, timeout: TimeAmount?, taskHandler: MQTTTaskHandler) {
self.channel = channel
self.cleanSession = cleanSession
self.timeout = timeout
self.taskHandler = taskHandler
}

static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
static func create(client: MQTTClient, cleanSession: Bool, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
let taskHandler = MQTTTaskHandler(client: client)
return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler)
.map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) }
.map { MQTTConnection(channel: $0, cleanSession: cleanSession, timeout: client.configuration.timeout, taskHandler: taskHandler) }
}

static func createBootstrap(client: MQTTClient, pingInterval: TimeAmount, taskHandler: MQTTTaskHandler) -> EventLoopFuture<Channel> {
Expand Down
4 changes: 2 additions & 2 deletions Sources/MQTTNIO/MQTTListeners.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ import NIO
import NIOConcurrencyHelpers

final class MQTTListeners<ReturnType> {
typealias Listener = (Result<ReturnType, Error>) -> Void
typealias Listener = (ReturnType) -> Void

func notify(_ result: Result<ReturnType, Error>) {
func notify(_ result: ReturnType) {
let listeners = self.lock.withLock {
return self.listeners
}
Expand Down
Loading