diff --git a/atlas-eval/README.md b/atlas-eval/README.md index a9651b390..e73cc5709 100644 --- a/atlas-eval/README.md +++ b/atlas-eval/README.md @@ -49,3 +49,44 @@ Processor processor = Each `DataSource` for the input has a string id that will be added to corresponding `MessageEnvelope` objects in the output. This allows the messages to be matched with the correct input by the user. + +### Data Rate Messages +The library also emits data rate messages to help gain insights in the data rate per `DataSource` +per step. There are 3 type of data sizes in a data rate message: + - Input size: number of data points coming in as raw input. + - Intermediate size: number of aggregate data points that are used in the eval step, typically + when there's 'group by' in a query. + - Output size: number of data points for the final result. + +The "size" has the 2 fields: + - `total`: total number of data points + - `details`: number of data points per data expression (empty for "outputSize") + +For example, given a query of "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by" with 4 nodes +and 2 clusters, one of the data rate message evelope may look like this: +```json5 +{ + "id": "_", + "message": { + "type": "rate", + "timestamp": 1596570660000, + "step": 60000, + "inputSize": { + "total": 4, + "details": { + "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 4 + } + }, + "intermediateSize": { + "total": 2, + "details": { + "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 2 + } + }, + "outputSize": { + "total": 2, + "details": {} + } + } +} +``` \ No newline at end of file diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala index 4555a33e2..abab6a4aa 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala @@ -77,13 +77,7 @@ class EvalDataRateCollector(timestamp: Long, step: Long) { dataExpr: DataExpr, amount: Int ): Unit = { - counts.get(id) match { - case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount) - case None => - val temp = new RefIntHashMap[DataExpr] - temp.increment(dataExpr, amount) - counts += (id -> temp) - } + counts.getOrElseUpdate(id, new RefIntHashMap[DataExpr]).increment(dataExpr, amount) } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index 05c3c8173..9a2a5ff47 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -172,16 +172,13 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) // Collect input and intermediate data size per DataSource val rateCollector = new EvalDataRateCollector(timestamp, step) dataSourceIdToDataExprs.foreach { - case (id, dataExprSet) => { + case (id, dataExprSet) => dataExprSet.foreach(dataExpr => { group.dataExprValues.get(dataExpr).foreach { info => - { - rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) - rateCollector.incrementIntermediate(id, dataExpr, info.values.size) - } + rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) + rateCollector.incrementIntermediate(id, dataExpr, info.values.size) } }) - } } // Generate the time series and diagnostic output diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 1b56ded31..7d3a33729 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -26,6 +26,8 @@ import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.ArrayData +import com.netflix.atlas.eval.model.EvalDataRate +import com.netflix.atlas.eval.model.EvalDataSize import com.netflix.atlas.eval.model.TimeGroup import com.netflix.atlas.eval.model.TimeSeriesMessage import com.netflix.atlas.eval.stream.Evaluator.DataSource @@ -109,6 +111,34 @@ class FinalExprEvalSuite extends AnyFunSuite { } } + private def isEvalDataRate(messageEnvelope: MessageEnvelope): Boolean = { + messageEnvelope.getMessage match { + case _: EvalDataRate => true + case _ => false + } + } + + private def getAsEvalDataRate( + env: MessageEnvelope + ): EvalDataRate = { + env.getMessage.asInstanceOf[EvalDataRate] + } + + private def checkRate( + rate: EvalDataRate, + timestamp: Long, + step: Long, + inputSize: EvalDataSize, + intermediateSize: EvalDataSize, + outputSize: EvalDataSize + ): Unit = { + assert(rate.timestamp === timestamp) + assert(rate.step === step) + assert(rate.inputSize === inputSize) + assert(rate.intermediateSize === intermediateSize) + assert(rate.outputSize === outputSize) + } + private def getValue(ts: TimeSeriesMessage): Double = { ts.data match { case ArrayData(vs) => @@ -149,6 +179,38 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } test("aggregate with multiple datapoints per group") { @@ -182,6 +244,38 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(3, Map(expr.toString -> 3)), + EvalDataSize(3, Map(expr.toString -> 3)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr.toString -> 2)), + EvalDataSize(2, Map(expr.toString -> 2)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } test("aggregate with multiple expressions") { @@ -222,6 +316,65 @@ class FinalExprEvalSuite extends AnyFunSuite { else checkValue(actual, expectedTimeseries2.dequeue()) } + + val expr1DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(expr1DataRateMsgs.size == 3) + val expr1ExpectedSizes = Array( + Array( + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr1.toString -> 2)), + EvalDataSize(2, Map(expr1.toString -> 2)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1) + ) + ) + expr1DataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * i, + 60000, + expr1ExpectedSizes(i)(0), + expr1ExpectedSizes(i)(1), + expr1ExpectedSizes(i)(2) + ) + }) + + val expr2DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "b") + assert(expr2DataRateMsgs.size == 2) + val expr2ExpectedSizes = Array( + Array( + EvalDataSize(1, Map(expr2.toString -> 1)), + EvalDataSize(1, Map(expr2.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr2.toString -> 2)), + EvalDataSize(2, Map(expr2.toString -> 2)), + EvalDataSize(1) + ) + ) + expr2DataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expr2ExpectedSizes(i)(0), + expr2ExpectedSizes(i)(1), + expr2ExpectedSizes(i)(2) + ) + }) } // https://github.com/Netflix/atlas/issues/693 @@ -265,6 +418,38 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(ts, 21.0) } } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)), + EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)), + EvalDataSize(1) + ), + Array( + EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)), + EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)), + EvalDataSize(2) + ), + Array( + EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)), + EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * i, + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } // https://github.com/Netflix/atlas/issues/762