Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: sum aggregation for gauges #1533

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.netflix.atlas.core.model.Datapoint
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.TimeSeries
import com.netflix.atlas.core.util.Math
import com.netflix.atlas.core.util.RefDoubleHashMap
import com.netflix.spectator.api.Counter
import com.netflix.spectator.api.Registry

Expand Down Expand Up @@ -74,6 +75,8 @@ case class AggrDatapoint(

object AggrDatapoint {

private val aggrTagKey = "atlas.aggr"

/**
* Creates a dummy datapoint passed along when a heartbeat message is received from the
* lwcapi server. These are used to ensure regular messages are flowing into the time
Expand Down Expand Up @@ -172,20 +175,58 @@ object AggrDatapoint {
def datapoint: AggrDatapoint = init.copy(value = value)
}

/**
* Aggregator for the sum or count when used with gauges. In cases where data is going
* to the aggregator service, there can be duplicates of the gauge values. To get a
* correct sum as if it was from a single aggregator service instance, this will compute
* the max for a given `atlas.aggr` key and then sum the final results.
*/
private class GaugeSumAggregator(
init: AggrDatapoint,
op: (Double, Double) => Double,
settings: AggregatorSettings
) extends Aggregator(settings) {

private val maxValues = new RefDoubleHashMap[String]
numIntermediateDatapoints = 1
aggregate(init)

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
if (!checkLimits) {
val aggrKey = datapoint.tags.getOrElse(aggrTagKey, "unknown")
maxValues.max(aggrKey, datapoint.value)
numInputDatapoints += 1
}
this
}

override def datapoints: List[AggrDatapoint] = List(datapoint)

def datapoint: AggrDatapoint = {
val tags = init.tags - aggrTagKey
var sum = 0.0
maxValues.foreach { (_, v) => sum = op(sum, v) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume NaNs are filtered out before this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be NaN values here, though note the op in this case is addNaN so it can handle them in the expected way.

init.copy(tags = tags, value = sum)
}
}

/**
* Group the datapoints by the tags and maintain a simple aggregator per distinct tag
* set.
*/
private class GroupByAggregator(settings: AggregatorSettings) extends Aggregator(settings) {

private val aggregators =
scala.collection.mutable.AnyRefMap.empty[Map[String, String], SimpleAggregator]
scala.collection.mutable.AnyRefMap.empty[Map[String, String], Aggregator]

private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = {
private def newAggregator(datapoint: AggrDatapoint): Aggregator = {
datapoint.expr match {
case GroupBy(af: DataExpr.Sum, _) if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case GroupBy(af: DataExpr.Count, _) if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case GroupBy(af: AggregateFunction, _) =>
val aggregator = new SimpleAggregator(datapoint, aggrOp(af), settings)
aggregator
new SimpleAggregator(datapoint, aggrOp(af), settings)
case _ =>
throw new IllegalArgumentException("datapoint is not for a grouped expression")
}
Expand All @@ -206,7 +247,7 @@ object AggrDatapoint {
}

override def datapoints: List[AggrDatapoint] = {
aggregators.values.map(_.datapoint).toList
aggregators.values.flatMap(_.datapoints).toList
}
}

Expand Down Expand Up @@ -235,6 +276,10 @@ object AggrDatapoint {
*/
def newAggregator(datapoint: AggrDatapoint, settings: AggregatorSettings): Aggregator = {
datapoint.expr match {
case af: DataExpr.Sum if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case af: DataExpr.Count if datapoint.tags.contains(aggrTagKey) =>
new GaugeSumAggregator(datapoint, aggrOp(af), settings)
case af: AggregateFunction =>
new SimpleAggregator(datapoint, aggrOp(af), settings)
case _: GroupBy =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ class AggrDatapointSuite extends FunSuite {
}
}

private def createGaugeDatapoints(expr: DataExpr, t: Long, nodes: Int): List[AggrDatapoint] = {
(0 until nodes).toList.map { i =>
val k = i % 2
val node = f"i-$k%08d"
val tags = Map("name" -> "cpu", "atlas.aggr" -> k.toString)
if (!expr.isInstanceOf[DataExpr.AggregateFunction])
AggrDatapoint(t, step, expr, i.toString, tags + ("node" -> node), i)
else
AggrDatapoint(t, step, expr, i.toString, tags, i)
}
}

test("aggregate empty") {
assertEquals(
AggrDatapoint.aggregate(Nil, settings(Integer.MAX_VALUE, Integer.MAX_VALUE)),
Expand Down Expand Up @@ -82,6 +94,32 @@ class AggrDatapointSuite extends FunSuite {
assertEquals(result.head.value, 45.0)
}

test("aggregate gauges sum") {
val expr = DataExpr.Sum(Query.True)
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 1)
assertEquals(result.head.timestamp, 0L)
assertEquals(result.head.tags, Map("name" -> "cpu"))
assertEquals(result.head.value, 17.0)
}

test("aggregate gauges count") {
val expr = DataExpr.Count(Query.True)
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 1)
assertEquals(result.head.timestamp, 0L)
assertEquals(result.head.tags, Map("name" -> "cpu"))
assertEquals(result.head.value, 17.0)
}

test("aggregate group by") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createDatapoints(expr, 0, 10)
Expand All @@ -96,6 +134,20 @@ class AggrDatapointSuite extends FunSuite {
}
}

test("aggregate gauges group by") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createGaugeDatapoints(expr, 0, 10)
val aggregator =
AggrDatapoint.aggregate(dataset, settings(Integer.MAX_VALUE, Integer.MAX_VALUE))
val result = aggregator.get.datapoints

assertEquals(result.size, 2)
result.foreach { d =>
val v = d.tags("node").substring(2).toInt
assertEquals(d.value, if (v % 2 == 0) 8.0 else 9.0)
}
}

test("aggregate group by exceeds input data points") {
val expr = DataExpr.GroupBy(DataExpr.Sum(Query.True), List("node"))
val dataset = createDatapoints(expr, 0, 10)
Expand Down