From 345ef169423ee234de353735a96edcb26b30ddc4 Mon Sep 17 00:00:00 2001 From: Adam Fowler Date: Fri, 3 Nov 2023 12:00:47 +0000 Subject: [PATCH] MQTT connect configuration (#143) --- .../AsyncAwaitSupport/MQTTClient+async.swift | 38 +++++++++++--- .../MQTTClientV5+async.swift | 43 +++++++++++++-- Sources/MQTTNIO/MQTTClient.swift | 42 ++++++++++++--- Sources/MQTTNIO/MQTTClientV5.swift | 52 ++++++++++++++++--- Sources/MQTTNIO/MQTTConfiguration.swift | 26 ++++++++++ 5 files changed, 177 insertions(+), 24 deletions(-) diff --git a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift index a084819e..7eb191bb 100644 --- a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift +++ b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClient+async.swift @@ -27,7 +27,7 @@ extension MQTTClient { /// - queue: Dispatch Queue to run shutdown on public func shutdown(queue: DispatchQueue = .global()) async throws { return try await withUnsafeThrowingContinuation { cont in - shutdown(queue: queue) { error in + self.shutdown(queue: queue) { error in if let error = error { cont.resume(throwing: error) } else { @@ -39,15 +39,16 @@ extension MQTTClient { /// Connect to MQTT server /// - /// Completes when CONNACK is received - /// - /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier). - /// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session - /// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one + /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the Client + /// and Server MUST discard any previous Session and start a new one /// /// - Parameters: /// - cleanSession: should we start with a new session /// - will: Publish message to be posted as soon as connection is made + /// - Returns: EventLoopFuture to be updated with whether server holds a session for this client /// - Returns: Whether server held a session for this client and has restored it. @discardableResult public func connect( cleanSession: Bool = true, @@ -56,6 +57,31 @@ extension MQTTClient { return try await self.connect(cleanSession: cleanSession, will: will).get() } + /// Connect to MQTT server + /// + /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the Client + /// and Server MUST discard any previous Session and start a new one + /// + /// - Parameters: + /// - cleanSession: should we start with a new session + /// - will: Publish message to be posted as soon as connection is made + /// - Returns: EventLoopFuture to be updated with whether server holds a session for this client + /// - Returns: Whether server held a session for this client and has restored it. + @discardableResult public func connect( + cleanSession: Bool = true, + will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil, + connectConfiguration: ConnectConfiguration + ) async throws -> Bool { + return try await self.connect( + cleanSession: cleanSession, + will: will, connectConfiguration: + connectConfiguration + ).get() + } + /// Publish message to topic /// /// Depending on QoS completes when message is sent, when PUBACK is received or when PUBREC diff --git a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift index 3db50fd2..059cc7c4 100644 --- a/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift +++ b/Sources/MQTTNIO/AsyncAwaitSupport/MQTTClientV5+async.swift @@ -18,9 +18,11 @@ import NIOCore extension MQTTClient.V5 { /// Connect to MQTT server /// - /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier). - /// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session - /// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one + /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the + /// Client and Server MUST discard any previous Session and start a new one /// /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. /// @@ -29,7 +31,7 @@ extension MQTTClient.V5 { /// - properties: properties to attach to connect message /// - will: Publish message to be posted as soon as connection is made /// - authWorkflow: The authentication workflow. This is currently unimplemented. - /// - Returns: EventLoopFuture to be updated with connack + /// - Returns: CONNACK response public func connect( cleanStart: Bool = true, properties: MQTTProperties = .init(), @@ -39,6 +41,39 @@ extension MQTTClient.V5 { return try await self.connect(cleanStart: cleanStart, properties: properties, will: will, authWorkflow: authWorkflow).get() } + /// Connect to MQTT server + /// + /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the + /// Client and Server MUST discard any previous Session and start a new one + /// + /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. + /// + /// - Parameters: + /// - cleanStart: should we start with a new session + /// - properties: properties to attach to connect message + /// - will: Publish message to be posted as soon as connection is made + /// - authWorkflow: The authentication workflow. This is currently unimplemented. + /// - connectConfiguration: Override client configuration during connection + /// - Returns: CONNACK response + public func connect( + cleanStart: Bool = true, + properties: MQTTProperties = .init(), + will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil, + authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture)? = nil, + connectConfiguration: MQTTClient.ConnectConfiguration + ) async throws -> MQTTConnackV5 { + return try await self.connect( + cleanStart: cleanStart, + properties: properties, + will: will, + authWorkflow: authWorkflow, + connectConfiguration: connectConfiguration + ).get() + } + /// Publish message to topic /// - Parameters: /// - topicName: Topic name on which the message is published diff --git a/Sources/MQTTNIO/MQTTClient.swift b/Sources/MQTTNIO/MQTTClient.swift index de3b6d80..d7d5a23d 100644 --- a/Sources/MQTTNIO/MQTTClient.swift +++ b/Sources/MQTTNIO/MQTTClient.swift @@ -242,9 +242,11 @@ public final class MQTTClient { /// Connect to MQTT server /// - /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier). - /// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session - /// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one + /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the Client + /// and Server MUST discard any previous Session and start a new one /// /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. /// @@ -255,6 +257,33 @@ public final class MQTTClient { public func connect( cleanSession: Bool = true, will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil + ) -> EventLoopFuture { + self.connect( + cleanSession: cleanSession, + will: will, + connectConfiguration: .init() + ) + } + + /// Connect to MQTT server + /// + /// If `cleanSession` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the Client + /// and Server MUST discard any previous Session and start a new one + /// + /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. + /// + /// - Parameters: + /// - cleanSession: should we start with a new session + /// - will: Publish message to be posted as soon as connection is made + /// - connectConfiguration: Override client configuration during connection + /// - Returns: EventLoopFuture to be updated with whether server holds a session for this client + public func connect( + cleanSession: Bool = true, + will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool)? = nil, + connectConfiguration: ConnectConfiguration ) -> EventLoopFuture { let publish = will.map { MQTTPublishInfo( @@ -270,12 +299,13 @@ public final class MQTTClient { if self.configuration.version == .v5_0, cleanSession == false { properties.append(.sessionExpiryInterval(0xFFFF_FFFF)) } + let keepAliveInterval = connectConfiguration.keepAliveInterval ?? self.configuration.keepAliveInterval let packet = MQTTConnectPacket( cleanSession: cleanSession, - keepAliveSeconds: UInt16(configuration.keepAliveInterval.nanoseconds / 1_000_000_000), + keepAliveSeconds: UInt16(keepAliveInterval.nanoseconds / 1_000_000_000), clientIdentifier: self.identifier, - userName: self.configuration.userName, - password: self.configuration.password, + userName: connectConfiguration.userName ?? self.configuration.userName, + password: connectConfiguration.password ?? self.configuration.password, properties: properties, will: publish ) diff --git a/Sources/MQTTNIO/MQTTClientV5.swift b/Sources/MQTTNIO/MQTTClientV5.swift index 7035c726..317116c3 100644 --- a/Sources/MQTTNIO/MQTTClientV5.swift +++ b/Sources/MQTTNIO/MQTTClientV5.swift @@ -20,9 +20,11 @@ extension MQTTClient { /// Connect to MQTT server /// - /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on state from the current Session (as identified by the Client identifier). - /// If there is no Session associated with the Client identifier the Server MUST create a new Session. The Client and Server MUST store the Session - /// after the Client and Server are disconnected. If set to true then the Client and Server MUST discard any previous Session and start a new one + /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the + /// Client and Server MUST discard any previous Session and start a new one /// /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. /// @@ -37,6 +39,39 @@ extension MQTTClient { properties: MQTTProperties = .init(), will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil, authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture)? = nil + ) -> EventLoopFuture { + self.connect( + cleanStart: cleanStart, + properties: properties, + will: will, + authWorkflow: authWorkflow, + connectConfiguration: .init() + ) + } + + /// Connect to MQTT server + /// + /// If `cleanStart` is set to false the Server MUST resume communications with the Client based on + /// state from the current Session (as identified by the Client identifier). If there is no Session + /// associated with the Client identifier the Server MUST create a new Session. The Client and Server + /// MUST store the Session after the Client and Server are disconnected. If set to true then the + /// Client and Server MUST discard any previous Session and start a new one + /// + /// The function returns an EventLoopFuture which will be updated with whether the server has restored a session for this client. + /// + /// - Parameters: + /// - cleanStart: should we start with a new session + /// - properties: properties to attach to connect message + /// - will: Publish message to be posted as soon as connection is made + /// - authWorkflow: The authentication workflow. This is currently unimplemented. + /// - connectConfiguration: Override client configuration during connection + /// - Returns: EventLoopFuture to be updated with connack + public func connect( + cleanStart: Bool = true, + properties: MQTTProperties = .init(), + will: (topicName: String, payload: ByteBuffer, qos: MQTTQoS, retain: Bool, properties: MQTTProperties)? = nil, + authWorkflow: ((MQTTAuthV5, EventLoop) -> EventLoopFuture)? = nil, + connectConfiguration: ConnectConfiguration ) -> EventLoopFuture { let publish = will.map { MQTTPublishInfo( @@ -48,17 +83,18 @@ extension MQTTClient { properties: $0.properties ) } + let keepAliveInterval = connectConfiguration.keepAliveInterval ?? self.client.configuration.keepAliveInterval let packet = MQTTConnectPacket( cleanSession: cleanStart, - keepAliveSeconds: UInt16(client.configuration.keepAliveInterval.nanoseconds / 1_000_000_000), + keepAliveSeconds: UInt16(keepAliveInterval.nanoseconds / 1_000_000_000), clientIdentifier: self.client.identifier, - userName: self.client.configuration.userName, - password: self.client.configuration.password, + userName: connectConfiguration.userName ?? self.client.configuration.userName, + password: connectConfiguration.password ?? self.client.configuration.password, properties: properties, will: publish ) - return self.client.connect(packet: packet).map { + return self.client.connect(packet: packet, authWorkflow: authWorkflow).map { .init( sessionPresent: $0.sessionPresent, reason: MQTTReasonCode(rawValue: $0.returnCode) ?? .unrecognisedReason, @@ -152,7 +188,7 @@ extension MQTTClient { return eventLoop.makeSucceededFuture(auth) } guard let authWorkflow = authWorkflow else { return eventLoop.makeFailedFuture(MQTTError.authWorkflowRequired) } - return client.processAuth(authPacket, authWorkflow: authWorkflow, on: eventLoop) + return self.client.processAuth(authPacket, authWorkflow: authWorkflow, on: eventLoop) } .flatMapThrowing { response -> MQTTAuthV5 in guard let auth = response as? MQTTAuthPacket else { throw MQTTError.unexpectedMessage } diff --git a/Sources/MQTTNIO/MQTTConfiguration.swift b/Sources/MQTTNIO/MQTTConfiguration.swift index bfa7183f..fb42f738 100644 --- a/Sources/MQTTNIO/MQTTConfiguration.swift +++ b/Sources/MQTTNIO/MQTTConfiguration.swift @@ -252,4 +252,30 @@ extension MQTTClient { /// WebSocket configuration public let webSocketConfiguration: WebSocketConfiguration? } + + /// Configuration used at connection time to override values stored in the MQTTClient.Configuration + public struct ConnectConfiguration { + /// MQTT user name. + public let userName: String? + /// MQTT password. + public let password: String? + /// MQTT keep alive period. + public let keepAliveInterval: TimeAmount? + + /// Initialize MQTTClient connect configuration struct + /// + /// - Parameters: + /// - keepAliveInterval: MQTT keep alive period. + /// - userName: MQTT user name + /// - password: MQTT password + public init( + keepAliveInterval: TimeAmount? = nil, + userName: String? = nil, + password: String? = nil + ) { + self.keepAliveInterval = keepAliveInterval + self.userName = userName + self.password = password + } + } }