diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala index 86e2f1850..8322806ab 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala @@ -152,7 +152,6 @@ class SubscribeApi @Inject() ( // Create queue to allow messages coming into /evaluate to be passed to this stream val (queue, queueSrc) = StreamOps .blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", queueSize) - .flatMapConcat(Source.apply) .toMat(BroadcastHub.sink(1))(Keep.both) .run() @@ -178,6 +177,7 @@ class SubscribeApi @Inject() ( } val source = queueSrc + .flatMapConcat(Source.apply) .merge(heartbeatSrc) .via(StreamOps.monitorFlow(registry, "StreamApi")) .watchTermination() { (_, f) =>