Skip to content

Commit

Permalink
pekko: source based on BlockingQueue (#1676)
Browse files Browse the repository at this point in the history
Add helper to create a source that is based on a Java
BlockingQueue implementation. This can be used as an
alternative to Pekko's BoundedSourceQueue when more
control of the queue is desirable. If the queue is full
it can be set to either drop newly arriving values or
older values that are already in the queue.
  • Loading branch information
brharrington authored Jul 30, 2024
1 parent 6586b2e commit 20731ad
Show file tree
Hide file tree
Showing 3 changed files with 287 additions and 6 deletions.
9 changes: 9 additions & 0 deletions atlas-pekko/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ atlas.pekko {

# How long to wait before giving up on bind
bind-timeout = 5 seconds

blocking-queue {
# If the queue is empty when pulled, then it will keep checking until some new
# data is available. This mechanism is simpler than juggling async callbacks and
# ensures the overhead is limited to the queue. There is some risk if there is
# bursty data that it could increase the rate of drops due to the queue being full
# while waiting for the delay.
frequency = 100ms
}
}

pekko {
Expand Down
148 changes: 147 additions & 1 deletion atlas-pekko/src/main/scala/com/netflix/atlas/pekko/StreamOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.netflix.atlas.pekko

import java.util.concurrent.TimeUnit

import org.apache.pekko.NotUsed
import org.apache.pekko.stream.ActorAttributes
import org.apache.pekko.stream.Attributes
Expand All @@ -39,7 +38,9 @@ import org.apache.pekko.stream.stage.TimerGraphStageLogic
import com.netflix.spectator.api.Clock
import com.netflix.spectator.api.Registry
import com.typesafe.scalalogging.StrictLogging
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue

import java.util.concurrent.BlockingQueue
import scala.concurrent.duration.FiniteDuration

/**
Expand Down Expand Up @@ -135,6 +136,151 @@ object StreamOps extends StrictLogging {
def size: Int = queue.size()
}

/**
* Wraps a `BlockingQueue` as a source. This can be a useful alternative to Pekko's
* BoundedSourceQueue when you want more control over the queue implementation. If no
* data is available in the queue when the downstream is ready, then it will poll the
* queue several times a second and forward data downstream when it is detected.
*
* The blocking queue should not be used directly. Use the materialized view to ensure
* that metrics are reported properly and you can detect if the stream is still running.
*
* @param registry
* Spectator registry to manage metrics for this queue.
* @param id
* Dimension used to distinguish a particular queue usage.
* @param queue
* Blocking queue to use as the source of data.
* @param dropNew
* Controls the behavior of dropping elements if the queue is full. If true, then
* new elements that arrive will be dropped. If false, then it will remove old elements
* from the queue until it is successfully able to insert the new element.
* @return
* Source that emits values offered to the queue.
*/
def wrapBlockingQueue[T](
registry: Registry,
id: String,
queue: BlockingQueue[T],
dropNew: Boolean = true
): Source[T, BlockingSourceQueue[T]] = {
val sourceQueue = new BlockingSourceQueue[T](registry, id, queue, dropNew)
Source.fromGraph(new BlockingQueueSource[T](sourceQueue))
}

final class BlockingSourceQueue[V] private[pekko] (
registry: Registry,
id: String,
queue: BlockingQueue[V],
dropNew: Boolean
) {

private val baseId = registry.createId("pekko.stream.offeredToQueue", "id", id)
private val enqueued = registry.counter(baseId.withTag("result", "enqueued"))
private val dropped = registry.counter(baseId.withTag("result", "droppedQueueFull"))
private val closed = registry.counter(baseId.withTag("result", "droppedQueueClosed"))

@volatile private var completed: Boolean = false

/**
* Add the value into the queue if there is room. Returns true if the value was successfully
* enqueued.
*/
def offer(value: V): Boolean = {
if (completed) {
closed.increment()
return false
}

if (dropNew) {
// If the queue is full, then drop the new value that just arrived
if (queue.offer(value))
enqueued.increment()
else
dropped.increment()
} else {
// If the queue is full, then drop old items in the queue
while (!queue.offer(value)) {
queue.poll()
dropped.increment()
}
enqueued.increment()
}
true
}

private[pekko] def poll(): V = queue.poll()

/**
* Indicate that the use of the queue is complete. This will allow the associated stream
* to finish processing elements and then shutdown. Any new elements offered to the queue
* will be dropped.
*/
def complete(): Unit = {
completed = true
}

/** Check if the queue is open to take more data. */
def isOpen: Boolean = !completed

/** The approximate number of entries in the queue. */
def size: Int = queue.size()

/** Check if queue has been marked complete and it is now empty. */
def isCompleteAndEmpty: Boolean = completed && queue.isEmpty
}

private final class BlockingQueueSource[V](queue: BlockingSourceQueue[V])
extends GraphStageWithMaterializedValue[SourceShape[V], BlockingSourceQueue[V]] {

private val out = Outlet[V]("BlockingQueueSource.out")

override val shape: SourceShape[V] = SourceShape(out)

override def createLogicAndMaterializedValue(
inheritedAttributes: Attributes
): (TimerGraphStageLogic, BlockingSourceQueue[V]) = {

val logic = new TimerGraphStageLogic(shape) with OutHandler {

override def preStart(): Unit = {
// If the queue is empty when pulled, then it will keep checking until some new
// data is available. This mechanism is simpler than juggling async callbacks and
// ensures the overhead is limited to the queue. There is some risk if there is
// bursty data that it could increase the rate of drops due to the queue being full
// while waiting for the delay.
val config = materializer.system.settings.config
val frequency = FiniteDuration(
config.getDuration("atlas.pekko.blocking-queue.frequency").toMillis,
TimeUnit.MILLISECONDS
)
scheduleWithFixedDelay(NotUsed, frequency, frequency)
}

override def postStop(): Unit = {
queue.complete()
}

override def onTimer(timerKey: Any): Unit = {
if (isAvailable(out))
onPull()
if (queue.isCompleteAndEmpty)
completeStage()
}

override def onPull(): Unit = {
val value = queue.poll()
if (value != null)
push(out, value)
}

setHandler(out, this)
}

logic -> queue
}
}

/**
* Stage that measures the flow of data through the stream. It will keep track of three
* meters:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.netflix.atlas.pekko

import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit

import org.apache.pekko.NotUsed
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.Materializer
Expand All @@ -30,6 +29,7 @@ import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Utils
import munit.FunSuite

import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.Promise
Expand All @@ -42,7 +42,7 @@ class StreamOpsSuite extends FunSuite {

private implicit val system: ActorSystem = ActorSystem(getClass.getSimpleName)

private def checkOfferedCounts(registry: Registry, expected: Map[String, Double]): Unit = {
private def checkOfferedCounts(registry: Registry, expected: Map[String, Any]): Unit = {
import scala.jdk.CollectionConverters.*
registry
.stream()
Expand All @@ -52,7 +52,14 @@ class StreamOpsSuite extends FunSuite {
.filter(m => m.id().name().equals("pekko.stream.offeredToQueue"))
.foreach { m =>
val result = Utils.getTagValue(m.id(), "result")
assertEquals(m.value(), expected.getOrElse(result, 0.0), result)
expected.get(result) match {
case Some(v: Double) =>
assertEquals(m.value(), v, result)
case Some((mn: Double, mx: Double)) =>
assert(m.value() >= mn && m.value() <= mx, s"$result = ${m.value()}")
case _ =>
assertEquals(m.value(), 0.0, result)
}
}
}

Expand Down Expand Up @@ -134,7 +141,119 @@ class StreamOpsSuite extends FunSuite {
checkOfferedCounts(registry, Map("enqueued" -> 1.0, "droppedQueueClosed" -> 1.0))
}

private def checkCounts(registry: Registry, name: String, expected: Map[String, Double]): Unit = {
test("wrap blocking queue, enqueued") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Int](10)
val source = StreamOps.wrapBlockingQueue[Int](registry, "test", blockingQueue)
val queue = source.toMat(Sink.ignore)(Keep.left).run()
Seq(1, 2, 3, 4).foreach(queue.offer)
queue.complete()
checkOfferedCounts(registry, Map("enqueued" -> 4.0))
}

test("wrap blocking queue, droppedQueueFull") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Future[Int]](1)
val source = StreamOps.wrapBlockingQueue[Future[Int]](registry, "test", blockingQueue)
val streamStarted = new CountDownLatch(1)
val queue = source
.flatMapConcat(Source.future)
.map { value =>
streamStarted.countDown()
value
}
.toMat(Sink.ignore)(Keep.left)
.run()

// wait for stream to start and first item to pass through
queue.offer(Promise.successful(0).future)
streamStarted.await()

val promise = Promise[Int]()
queue.offer(promise.future) // will pass through
Seq(2, 3, 4, 5).foreach(i => queue.offer(Future(i)))
promise.complete(Success(1))
queue.complete()
checkOfferedCounts(
registry,
Map("enqueued" -> (2.0 -> 3.0), "droppedQueueFull" -> (3.0 -> 4.0))
)
}

test("wrap blocking queue, droppedQueueFull, dropOld") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Future[Int]](1)
val source = StreamOps.wrapBlockingQueue[Future[Int]](registry, "test", blockingQueue, false)
val streamStarted = new CountDownLatch(1)
val queue = source
.flatMapConcat(Source.future)
.map { value =>
streamStarted.countDown()
value
}
.toMat(Sink.ignore)(Keep.left)
.run()

// wait for stream to start and first item to pass through
queue.offer(Promise.successful(0).future)
streamStarted.await()

val promise = Promise[Int]()
queue.offer(promise.future) // will pass through
Seq(2, 3, 4, 5).foreach(i => queue.offer(Future(i)))
promise.complete(Success(1))
queue.complete()
// when using drop old, all items will get enqueued
checkOfferedCounts(registry, Map("enqueued" -> 6.0, "droppedQueueFull" -> (3.0 -> 4.0)))
}

test("wrap blocking queue, droppedQueueClosed") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Int](1)
val source = StreamOps.wrapBlockingQueue[Int](registry, "test", blockingQueue)
val queue = source
.toMat(Sink.ignore)(Keep.left)
.run()
queue.offer(1)
queue.complete()
Seq(2, 3, 4, 5).foreach(i => queue.offer(i))
checkOfferedCounts(registry, Map("enqueued" -> 1.0, "droppedQueueClosed" -> 4.0))
}

test("wrap blocking queue, complete with no data") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Int](1)
val source = StreamOps.wrapBlockingQueue[Int](registry, "test", blockingQueue)
val latch = new CountDownLatch(1)
val (queue, fut) = source
.toMat(Sink.foreach(_ => latch.countDown()))(Keep.both)
.run()
queue.offer(1)
latch.await()
queue.complete()
Await.ready(fut, Duration.Inf)
}

test("wrap blocking queue, not open after completed") {
val registry = new DefaultRegistry()
val blockingQueue = new ArrayBlockingQueue[Int](1)
val source = StreamOps.wrapBlockingQueue[Int](registry, "test", blockingQueue)
val queue = source
.toMat(Sink.ignore)(Keep.left)
.run()
assert(queue.isOpen)

queue.offer(1)
assert(queue.isOpen)
checkOfferedCounts(registry, Map("enqueued" -> 1.0, "droppedQueueClosed" -> 0.0))

queue.complete()
assert(!queue.isOpen)
queue.offer(1)
checkOfferedCounts(registry, Map("enqueued" -> 1.0, "droppedQueueClosed" -> 1.0))
}

private def checkCounts(registry: Registry, name: String, expected: Map[String, Any]): Unit = {
import scala.jdk.CollectionConverters.*
registry
.stream()
Expand All @@ -145,7 +264,14 @@ class StreamOpsSuite extends FunSuite {
.foreach { m =>
val value = Utils.getTagValue(m.id(), "statistic")
val stat = if (value == null) "count" else value
assertEquals(m.value(), expected.getOrElse(stat, 0.0), stat)
expected.get(stat) match {
case Some(v: Double) =>
assertEquals(m.value(), v, stat)
case Some((mn: Double, mx: Double)) =>
assert(m.value() >= mn && m.value() <= mx, stat)
case _ =>
assertEquals(m.value(), 0.0, stat)
}
}
}

Expand Down

0 comments on commit 20731ad

Please sign in to comment.