diff --git a/Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift b/Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift index b17206f6..a123d1a6 100644 --- a/Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift +++ b/Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift @@ -1,11 +1,14 @@ import NIO +/// Task handler. final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler { typealias InboundIn = MQTTPacket var eventLoop: EventLoop! + var client: MQTTClient - init() { + init(client: MQTTClient) { + self.client = client self.eventLoop = nil self.tasks = [] } @@ -16,11 +19,11 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler { } } - func _removeTask(_ task: MQTTTask) { + private func _removeTask(_ task: MQTTTask) { self.tasks.removeAll { $0 === task } } - func removeTask(_ task: MQTTTask) { + private func removeTask(_ task: MQTTTask) { if self.eventLoop.inEventLoop { self._removeTask(task) } else { @@ -38,6 +41,7 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler { let response = self.unwrapInboundIn(data) for task in self.tasks { do { + // should this task respond to inbound packet if try task.checkInbound(response) { self.removeTask(task) task.succeed(response) @@ -49,42 +53,37 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler { return } } + + self.processUnhandledPacket(response) + } + + /// process packets where no equivalent task was found + func processUnhandledPacket(_ packet: MQTTPacket) { + // we only send response to v5 server + guard self.client.configuration.version == .v5_0 else { return } + guard let connection = client.connection else { return } + + switch packet.type { + case .PUBREC: + _ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: packet.packetId, reason: .packetIdentifierNotFound)) + case .PUBREL: + _ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: packet.packetId, reason: .packetIdentifierNotFound)) + default: + break + } } func channelInactive(context: ChannelHandlerContext) { + // channel is inactive so we should fail or tasks in progress self.tasks.forEach { $0.fail(MQTTError.serverClosedConnection) } self.tasks.removeAll() } func errorCaught(context: ChannelHandlerContext, error: Error) { + // we caught an error so we should fail all active tasks self.tasks.forEach { $0.fail(error) } self.tasks.removeAll() } var tasks: [MQTTTask] } - -/// If packet reaches this handler then it was never dealt with by a task -final class MQTTUnhandledPacketHandler: ChannelInboundHandler { - typealias InboundIn = MQTTPacket - let client: MQTTClient - - init(client: MQTTClient) { - self.client = client - } - - func channelRead(context: ChannelHandlerContext, data: NIOAny) { - // we only send response to v5 server - guard self.client.configuration.version == .v5_0 else { return } - guard let connection = client.connection else { return } - let response = self.unwrapInboundIn(data) - switch response.type { - case .PUBREC: - _ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: response.packetId, reason: .packetIdentifierNotFound)) - case .PUBREL: - _ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: response.packetId, reason: .packetIdentifierNotFound)) - default: - break - } - } -} diff --git a/Sources/MQTTNIO/MQTTConnection.swift b/Sources/MQTTNIO/MQTTConnection.swift index d48d03b8..f02aee97 100644 --- a/Sources/MQTTNIO/MQTTConnection.swift +++ b/Sources/MQTTNIO/MQTTConnection.swift @@ -22,7 +22,7 @@ final class MQTTConnection { } static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture { - let taskHandler = MQTTTaskHandler() + let taskHandler = MQTTTaskHandler(client: client) return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler) .map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) } } diff --git a/Sources/MQTTNIO/MQTTError.swift b/Sources/MQTTNIO/MQTTError.swift index 7745dd15..9b14cbb8 100644 --- a/Sources/MQTTNIO/MQTTError.swift +++ b/Sources/MQTTNIO/MQTTError.swift @@ -32,7 +32,8 @@ public enum MQTTError: Error { case noConnection /// the server disconnected case serverDisconnection(MQTTAckV5) - /// the server closed the connection + /// the server closed the connection. If this happens during a publish you can resend + /// the publish packet by reconnecting to server with `cleanSession` set to false. case serverClosedConnection /// received unexpected message from broker case unexpectedMessage diff --git a/Sources/MQTTNIO/MQTTTask.swift b/Sources/MQTTNIO/MQTTTask.swift index 65afbaa8..35a21983 100644 --- a/Sources/MQTTNIO/MQTTTask.swift +++ b/Sources/MQTTNIO/MQTTTask.swift @@ -5,14 +5,12 @@ import NIO final class MQTTTask { let promise: EventLoopPromise let checkInbound: (MQTTPacket) throws -> Bool - let timeout: TimeAmount? let timeoutTask: Scheduled? init(on eventLoop: EventLoop, timeout: TimeAmount?, checkInbound: @escaping (MQTTPacket) throws -> Bool) { let promise = eventLoop.makePromise(of: MQTTPacket.self) self.promise = promise self.checkInbound = checkInbound - self.timeout = timeout if let timeout = timeout { self.timeoutTask = eventLoop.scheduleTask(in: timeout) { promise.fail(MQTTError.timeout)