Skip to content

Commit

Permalink
move flatMap to after the broadcast hub (#1346)
Browse files Browse the repository at this point in the history
Doing it before results in a lot more message passing. By
moving it after fusing happens and it is fairly cheap.
  • Loading branch information
brharrington authored Sep 21, 2021
1 parent 020a121 commit f013eed
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ class SubscribeApi @Inject() (
// Create queue to allow messages coming into /evaluate to be passed to this stream
val (queue, queueSrc) = StreamOps
.blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", queueSize)
.flatMapConcat(Source.apply)
.toMat(BroadcastHub.sink(1))(Keep.both)
.run()

Expand All @@ -178,6 +177,7 @@ class SubscribeApi @Inject() (
}

val source = queueSrc
.flatMapConcat(Source.apply)
.merge(heartbeatSrc)
.via(StreamOps.monitorFlow(registry, "StreamApi"))
.watchTermination() { (_, f) =>
Expand Down

0 comments on commit f013eed

Please sign in to comment.