Skip to content

Commit

Permalink
Re-instate unhandled packet processing (#78)
Browse files Browse the repository at this point in the history
* Re-instate unhandled packet processing

* swift format
  • Loading branch information
adam-fowler authored Nov 2, 2021
1 parent 75f32f5 commit 162e622
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 32 deletions.
55 changes: 27 additions & 28 deletions Sources/MQTTNIO/ChannelHandlers/MQTTTaskHandler.swift
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import NIO

/// Task handler.
final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = MQTTPacket

var eventLoop: EventLoop!
var client: MQTTClient

init() {
init(client: MQTTClient) {
self.client = client
self.eventLoop = nil
self.tasks = []
}
Expand All @@ -16,11 +19,11 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
}
}

func _removeTask(_ task: MQTTTask) {
private func _removeTask(_ task: MQTTTask) {
self.tasks.removeAll { $0 === task }
}

func removeTask(_ task: MQTTTask) {
private func removeTask(_ task: MQTTTask) {
if self.eventLoop.inEventLoop {
self._removeTask(task)
} else {
Expand All @@ -38,6 +41,7 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
let response = self.unwrapInboundIn(data)
for task in self.tasks {
do {
// should this task respond to inbound packet
if try task.checkInbound(response) {
self.removeTask(task)
task.succeed(response)
Expand All @@ -49,42 +53,37 @@ final class MQTTTaskHandler: ChannelInboundHandler, RemovableChannelHandler {
return
}
}

self.processUnhandledPacket(response)
}

/// process packets where no equivalent task was found
func processUnhandledPacket(_ packet: MQTTPacket) {
// we only send response to v5 server
guard self.client.configuration.version == .v5_0 else { return }
guard let connection = client.connection else { return }

switch packet.type {
case .PUBREC:
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: packet.packetId, reason: .packetIdentifierNotFound))
case .PUBREL:
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: packet.packetId, reason: .packetIdentifierNotFound))
default:
break
}
}

func channelInactive(context: ChannelHandlerContext) {
// channel is inactive so we should fail or tasks in progress
self.tasks.forEach { $0.fail(MQTTError.serverClosedConnection) }
self.tasks.removeAll()
}

func errorCaught(context: ChannelHandlerContext, error: Error) {
// we caught an error so we should fail all active tasks
self.tasks.forEach { $0.fail(error) }
self.tasks.removeAll()
}

var tasks: [MQTTTask]
}

/// If packet reaches this handler then it was never dealt with by a task
final class MQTTUnhandledPacketHandler: ChannelInboundHandler {
typealias InboundIn = MQTTPacket
let client: MQTTClient

init(client: MQTTClient) {
self.client = client
}

func channelRead(context: ChannelHandlerContext, data: NIOAny) {
// we only send response to v5 server
guard self.client.configuration.version == .v5_0 else { return }
guard let connection = client.connection else { return }
let response = self.unwrapInboundIn(data)
switch response.type {
case .PUBREC:
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBREL, packetId: response.packetId, reason: .packetIdentifierNotFound))
case .PUBREL:
_ = connection.sendMessageNoWait(MQTTPubAckPacket(type: .PUBCOMP, packetId: response.packetId, reason: .packetIdentifierNotFound))
default:
break
}
}
}
2 changes: 1 addition & 1 deletion Sources/MQTTNIO/MQTTConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ final class MQTTConnection {
}

static func create(client: MQTTClient, pingInterval: TimeAmount) -> EventLoopFuture<MQTTConnection> {
let taskHandler = MQTTTaskHandler()
let taskHandler = MQTTTaskHandler(client: client)
return self.createBootstrap(client: client, pingInterval: pingInterval, taskHandler: taskHandler)
.map { MQTTConnection(channel: $0, timeout: client.configuration.timeout, taskHandler: taskHandler) }
}
Expand Down
3 changes: 2 additions & 1 deletion Sources/MQTTNIO/MQTTError.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public enum MQTTError: Error {
case noConnection
/// the server disconnected
case serverDisconnection(MQTTAckV5)
/// the server closed the connection
/// the server closed the connection. If this happens during a publish you can resend
/// the publish packet by reconnecting to server with `cleanSession` set to false.
case serverClosedConnection
/// received unexpected message from broker
case unexpectedMessage
Expand Down
2 changes: 0 additions & 2 deletions Sources/MQTTNIO/MQTTTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ import NIO
final class MQTTTask {
let promise: EventLoopPromise<MQTTPacket>
let checkInbound: (MQTTPacket) throws -> Bool
let timeout: TimeAmount?
let timeoutTask: Scheduled<Void>?

init(on eventLoop: EventLoop, timeout: TimeAmount?, checkInbound: @escaping (MQTTPacket) throws -> Bool) {
let promise = eventLoop.makePromise(of: MQTTPacket.self)
self.promise = promise
self.checkInbound = checkInbound
self.timeout = timeout
if let timeout = timeout {
self.timeoutTask = eventLoop.scheduleTask(in: timeout) {
promise.fail(MQTTError.timeout)
Expand Down

0 comments on commit 162e622

Please sign in to comment.