Skip to content

Commit

Permalink
lwcapi: drop old data if queue is full (#1680)
Browse files Browse the repository at this point in the history
Update queue for incoming data to drop old data rather than
new data. For use-cases like logs pass-through it is preferable
to bias for new data.
  • Loading branch information
brharrington authored Aug 7, 2024
1 parent 60f7b0c commit 5dd68d3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import com.typesafe.scalalogging.StrictLogging
* @param queue
* Underlying queue that will receive the messsages.
*/
class QueueHandler(streamMeta: StreamMetadata, queue: StreamOps.SourceQueue[Seq[JsonSupport]])
extends StrictLogging {
class QueueHandler(
streamMeta: StreamMetadata,
queue: StreamOps.BlockingSourceQueue[Seq[JsonSupport]]
) extends StrictLogging {

private val id = streamMeta.streamId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging

import java.io.ByteArrayOutputStream
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent.duration.*
import scala.util.Failure
import scala.util.Success
Expand Down Expand Up @@ -149,8 +150,9 @@ class SubscribeApi(
// For now, the queue will close and if no messages are sent from the client, the
// pekko.http.server.idle-timeout will kill the client connection and we'll try to
// close a closed queue.
val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](queueSize)
val (queue, pub) = StreamOps
.blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", queueSize)
.wrapBlockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", blockingQueue, dropNew = false)
.toMat(Sink.asPublisher(true))(Keep.both)
.run()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.typesafe.config.ConfigFactory

import java.io.ByteArrayInputStream
import java.nio.charset.StandardCharsets
import java.util.concurrent.ArrayBlockingQueue
import java.util.zip.GZIPInputStream
import scala.util.Using

Expand All @@ -42,13 +43,21 @@ class ExpressionApiSuite extends MUnitRouteSuite {
private val splitter = new ExpressionSplitter(ConfigFactory.load())

// Dummy queue used for handler
private val queue = new QueueHandler(
StreamMetadata("test"),
StreamOps
.blockingQueue[Seq[JsonSupport]](new NoopRegistry, "test", 1)
.toMat(Sink.ignore)(Keep.left)
.run()
)
private val queue = {
val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](1)
new QueueHandler(
StreamMetadata("test"),
StreamOps
.wrapBlockingQueue[Seq[JsonSupport]](
new NoopRegistry,
"test",
blockingQueue,
dropNew = false
)
.toMat(Sink.ignore)(Keep.left)
.run()
)
}

private val sm = new StreamSubscriptionManager(new NoopRegistry)
private val endpoint = ExpressionApi(sm, new NoopRegistry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.netflix.atlas.pekko.StreamOps
import com.netflix.spectator.api.NoopRegistry
import munit.FunSuite

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent.Await
import scala.concurrent.duration.Duration

Expand All @@ -35,8 +36,9 @@ class StreamSubscriptionManagerSuite extends FunSuite {
val sm = new StreamSubscriptionManager(registry)
val meta = StreamMetadata("id")

val blockingQueue = new ArrayBlockingQueue[Seq[JsonSupport]](100)
val (queue, queueSrc) = StreamOps
.blockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", 100)
.wrapBlockingQueue[Seq[JsonSupport]](registry, "SubscribeApi", blockingQueue, dropNew = false)
.toMat(Sink.ignore)(Keep.both)
.run()
val handler = new QueueHandler(meta, queue)
Expand Down

0 comments on commit 5dd68d3

Please sign in to comment.