From eeede19e45fe8aa143c5f21c455446fb2a0d6d6d Mon Sep 17 00:00:00 2001 From: Brian Harrington Date: Wed, 7 Aug 2024 10:56:58 -0500 Subject: [PATCH] lwcapi: drop old data if queue is full Update queue for incoming data to drop old data rather than new data. For use-cases like logs pass-through it is preferable to bias for new data. --- .../netflix/atlas/lwcapi/QueueHandler.scala | 6 +++-- .../netflix/atlas/lwcapi/SubscribeApi.scala | 4 +++- .../atlas/lwcapi/ExpressionApiSuite.scala | 23 +++++++++++++------ .../StreamSubscriptionManagerSuite.scala | 4 +++- 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/QueueHandler.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/QueueHandler.scala index 517ba7442..fca1f5d9a 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/QueueHandler.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/QueueHandler.scala @@ -28,8 +28,10 @@ import com.typesafe.scalalogging.StrictLogging * @param queue * Underlying queue that will receive the messsages. */ -class QueueHandler(streamMeta: StreamMetadata, queue: StreamOps.SourceQueue[Seq[JsonSupport]]) - extends StrictLogging { +class QueueHandler( + streamMeta: StreamMetadata, + queue: StreamOps.BlockingSourceQueue[Seq[JsonSupport]] +) extends StrictLogging { private val id = streamMeta.streamId diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala index a224c6145..4cacbf170 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/SubscribeApi.scala @@ -44,6 +44,7 @@ import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging import java.io.ByteArrayOutputStream +import java.util.concurrent.ArrayBlockingQueue import scala.concurrent.duration.* import scala.util.Failure import scala.util.Success @@ -149,8 +150,9 @@ class SubscribeApi( // For now, the queue will close and if no messages are sent from the client, the // pekko.http.server.idle-timeout will kill the client connection and we'll try to // close a closed queue. + val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](queueSize) val (queue, pub) = StreamOps - .blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", queueSize) + .wrapBlockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", blockingQueue, dropNew = false) .toMat(Sink.asPublisher(true))(Keep.both) .run() diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala index aa98bf523..63b4867ea 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionApiSuite.scala @@ -30,6 +30,7 @@ import com.typesafe.config.ConfigFactory import java.io.ByteArrayInputStream import java.nio.charset.StandardCharsets +import java.util.concurrent.ArrayBlockingQueue import java.util.zip.GZIPInputStream import scala.util.Using @@ -42,13 +43,21 @@ class ExpressionApiSuite extends MUnitRouteSuite { private val splitter = new ExpressionSplitter(ConfigFactory.load()) // Dummy queue used for handler - private val queue = new QueueHandler( - StreamMetadata("test"), - StreamOps - .blockingQueue[Seq[JsonSupport]](new NoopRegistry, "test", 1) - .toMat(Sink.ignore)(Keep.left) - .run() - ) + private val queue = { + val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](1) + new QueueHandler( + StreamMetadata("test"), + StreamOps + .wrapBlockingQueue[Seq[JsonSupport]]( + new NoopRegistry, + "test", + blockingQueue, + dropNew = false + ) + .toMat(Sink.ignore)(Keep.left) + .run() + ) + } private val sm = new StreamSubscriptionManager(new NoopRegistry) private val endpoint = ExpressionApi(sm, new NoopRegistry) diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamSubscriptionManagerSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamSubscriptionManagerSuite.scala index ce76d409a..a604c018e 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamSubscriptionManagerSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/StreamSubscriptionManagerSuite.scala @@ -23,6 +23,7 @@ import com.netflix.atlas.pekko.StreamOps import com.netflix.spectator.api.NoopRegistry import munit.FunSuite +import java.util.concurrent.ArrayBlockingQueue import scala.concurrent.Await import scala.concurrent.duration.Duration @@ -35,8 +36,9 @@ class StreamSubscriptionManagerSuite extends FunSuite { val sm = new StreamSubscriptionManager(registry) val meta = StreamMetadata("id") + val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](100) val (queue, queueSrc) = StreamOps - .blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", 100) + .wrapBlockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", blockingQueue, dropNew = false) .toMat(Sink.ignore)(Keep.both) .run() val handler = new QueueHandler(meta, queue)