Skip to content

Commit

Permalink
MQTT streaming: Termination signal handling (#1592)
Browse files Browse the repository at this point in the history
Fixed a couple of behaviours that were not handling the termination signal where it was possible to receive it.
  • Loading branch information
huntc authored and ennru committed Mar 19, 2019
1 parent b860c64 commit bd737f7
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,61 +193,67 @@ import scala.util.{Failure, Success}
private val ConsumerNamePrefix = "consumer-"
private val ProducerNamePrefix = "producer-"

def disconnected(data: Disconnected)(implicit mat: Materializer): Behavior[Event] = Behaviors.receivePartial {
case (context, ConnectReceivedLocally(connect, connectData, remote)) =>
val (queue, source) = Source
.queue[ForwardConnectCommand](1, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
remote.success(source)

queue.offer(ForwardConnect)
data.stash.foreach(context.self.tell)

if (connect.connectFlags.contains(ConnectFlags.CleanSession)) {
context.children.foreach(context.stop)
serverConnect(
ConnectReceived(
connect,
connectData,
queue,
Vector.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings
)
)
} else {
serverConnect(
ConnectReceived(
connect,
connectData,
queue,
Vector.empty,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings
)
)
def disconnected(data: Disconnected)(implicit mat: Materializer): Behavior[Event] =
Behaviors
.receivePartial[Event] {
case (context, ConnectReceivedLocally(connect, connectData, remote)) =>
val (queue, source) = Source
.queue[ForwardConnectCommand](1, OverflowStrategy.dropHead)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
remote.success(source)

queue.offer(ForwardConnect)
data.stash.foreach(context.self.tell)

if (connect.connectFlags.contains(ConnectFlags.CleanSession)) {
context.children.foreach(context.stop)
serverConnect(
ConnectReceived(
connect,
connectData,
queue,
Vector.empty,
Map.empty,
Map.empty,
Vector.empty,
Vector.empty,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings
)
)
} else {
serverConnect(
ConnectReceived(
connect,
connectData,
queue,
Vector.empty,
data.activeConsumers,
data.activeProducers,
data.pendingLocalPublications,
data.pendingRemotePublications,
data.consumerPacketRouter,
data.producerPacketRouter,
data.subscriberPacketRouter,
data.unsubscriberPacketRouter,
data.settings
)
)

}
case (_, ConnectionLost) =>
Behavior.same
case (_, e) =>
disconnected(data.copy(stash = data.stash :+ e))
}
.receiveSignal {
case (_, _: Terminated) =>
Behaviors.same
}
case (_, ConnectionLost) =>
Behavior.same
case (_, e) =>
disconnected(data.copy(stash = data.stash :+ e))
}

def disconnect(context: ActorContext[Event], remote: SourceQueueWithComplete[ForwardConnectCommand], data: Data)(
implicit mat: Materializer
Expand Down Expand Up @@ -319,6 +325,8 @@ import scala.util.{Failure, Success}
serverConnect(data.copy(stash = data.stash :+ e))
}
.receiveSignal {
case (_, _: Terminated) =>
Behaviors.same
case (_, PostStop) =>
data.remote.complete()
Behaviors.same
Expand Down
3 changes: 1 addition & 2 deletions mqtt-streaming/src/test/java/docs/javadsl/MqttFlowTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic()
JavaConverters.asJavaCollectionConverter(subscribe.topicFilters())
.asJavaCollection();
List<Integer> flags =
topicFilters
.stream()
topicFilters.stream()
.map(x -> x._2().underlying())
.collect(Collectors.toList());
queue.offer(
Expand Down

0 comments on commit bd737f7

Please sign in to comment.