From 0c4245cf421aa8ca26fefc39edd7c7a4562c3b29 Mon Sep 17 00:00:00 2001 From: Tim Jiang Date: Thu, 30 Jul 2020 15:20:42 -0700 Subject: [PATCH 1/4] rename root project to atlas to be more IDE friendly --- .../atlas/akka/DiagnosticMessage.scala | 9 ++ .../atlas/eval/model/AggrDatapoint.scala | 15 +++- .../netflix/atlas/eval/model/TimeGroup.scala | 5 +- .../atlas/eval/stream/EvaluatorImpl.scala | 7 +- .../atlas/eval/stream/FinalExprEval.scala | 82 +++++++++++++++---- .../atlas/eval/stream/TimeGrouped.scala | 15 +++- .../eval/stream/FinalExprEvalSuite.scala | 3 +- .../atlas/eval/stream/TimeGroupedSuite.scala | 53 ++++++++---- 8 files changed, 143 insertions(+), 46 deletions(-) diff --git a/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala b/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala index 24a9a513a..076b39087 100644 --- a/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala +++ b/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala @@ -26,6 +26,7 @@ object DiagnosticMessage { final val Warning: String = "warn" final val Error: String = "error" final val Close: String = "close" + final val Rate: String = "rate" def info(message: String): DiagnosticMessage = { DiagnosticMessage(Info, message, None) @@ -43,6 +44,14 @@ object DiagnosticMessage { error(s"${t.getClass.getSimpleName}: ${t.getMessage}") } + def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = { + DiagnosticMessage( + Rate, + s"dataType=$dataType,count=$count,timestamp=$timestamp,step=$step", + None + ) + } + val close: DiagnosticMessage = { DiagnosticMessage(Close, "operation complete", None) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala index 4b0567646..83293a012 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala @@ -88,6 +88,8 @@ object AggrDatapoint { * instance have the same data expression. */ trait Aggregator { + protected[this] var rawDatapointCounter = 0 + def numRawDatapoints: Long = rawDatapointCounter def aggregate(datapoint: AggrDatapoint): Aggregator def datapoints: List[AggrDatapoint] } @@ -101,9 +103,11 @@ object AggrDatapoint { extends Aggregator { private var value = init.value + rawDatapointCounter += 1 override def aggregate(datapoint: AggrDatapoint): Aggregator = { value = op(value, datapoint.value) + rawDatapointCounter += 1 this } @@ -123,7 +127,9 @@ object AggrDatapoint { private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = { datapoint.expr match { - case GroupBy(af: AggregateFunction, _) => new SimpleAggregator(datapoint, af) + case GroupBy(af: AggregateFunction, _) => + rawDatapointCounter += 1 + new SimpleAggregator(datapoint, af) case _ => throw new IllegalArgumentException("datapoint is not for a grouped expression") } @@ -131,8 +137,10 @@ object AggrDatapoint { override def aggregate(datapoint: AggrDatapoint): Aggregator = { aggregators.get(datapoint.tags) match { - case Some(aggr) => aggr.aggregate(datapoint) - case None => aggregators.put(datapoint.tags, newAggregator(datapoint)) + case Some(aggr) => + rawDatapointCounter += 1 + aggr.aggregate(datapoint) + case None => aggregators.put(datapoint.tags, newAggregator(datapoint)) } this } @@ -150,6 +158,7 @@ object AggrDatapoint { override def aggregate(datapoint: AggrDatapoint): Aggregator = { values = datapoint :: values + rawDatapointCounter += 1 this } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala index 4955f13e7..f7296e666 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala @@ -28,7 +28,8 @@ import com.netflix.atlas.core.model.DataExpr * Timestamp that applies to all values within the group. * @param step * Step size for the data within this group. - * @param values + * @param dataExprValues * Values associated with this time. */ -case class TimeGroup(timestamp: Long, step: Long, values: Map[DataExpr, List[AggrDatapoint]]) +case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo]) +case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Long) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index 0fa29fda7..f8489d155 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -47,6 +47,7 @@ import com.netflix.atlas.akka.StreamOps import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.eval.model.AggrDatapoint +import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.TimeGroup import com.netflix.atlas.eval.stream.EurekaSource.Instance import com.netflix.atlas.eval.stream.Evaluator.DataSource @@ -303,7 +304,7 @@ private[stream] abstract class EvaluatorImpl( } private def toTimeGroup(step: Long, exprs: List[DataExpr], group: DatapointGroup): TimeGroup = { - val values = group.getDatapoints.asScala.zipWithIndex + val valuesInfo = group.getDatapoints.asScala.zipWithIndex .flatMap { case (d, i) => val tags = d.getTags.asScala.toMap @@ -321,8 +322,8 @@ private[stream] abstract class EvaluatorImpl( } } .groupBy(_.expr) - .map(t => t._1 -> AggrDatapoint.aggregate(t._2.toList)) - TimeGroup(group.getTimestamp, step, values) + .map(t => t._1 -> AggrValuesInfo(AggrDatapoint.aggregate(t._2.toList), t._2.size)) + TimeGroup(group.getTimestamp, step, valuesInfo) } @scala.annotation.tailrec diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index 001c90680..a46347c80 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -42,6 +42,8 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.typesafe.scalalogging.StrictLogging +import scala.collection.mutable + /** * Takes the set of data sources and time grouped partial aggregates as input and performs * the final evaluation step. @@ -73,6 +75,9 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) // the data for it private var recipients = List.empty[(StyleExpr, List[String])] + // Track the set of DataExprs per DataSource + private var dataSourceIdToDataExprs = mutable.Map.empty[String, mutable.Set[DataExpr]] + // Empty data map used as base to account for expressions that do not have any // matches for a given time interval private var noData = Map.empty[DataExpr, List[TimeSeries]] @@ -110,9 +115,19 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) } } .groupBy(_._1) - .map(t => t._1 -> t._2.map(_._2).toList) + .map(t => t._1 -> t._2.map(_._2)) .toList + // Compute the map from DataSource id to DataExprs + dataSourceIdToDataExprs = mutable.Map.empty + recipients.foreach(kv => { + kv._2.foreach(id => { + val dataExprSet = dataSourceIdToDataExprs.getOrElse(id, mutable.Set.empty) + dataExprSet.addAll(kv._1.expr.dataExprs) + dataSourceIdToDataExprs(id) = dataExprSet + }) + }) + // Cleanup state for any expressions that are no longer needed val removed = previous.keySet -- recipients.map(_._1).toSet removed.foreach { expr => @@ -149,34 +164,44 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) private def handleData(group: TimeGroup): Unit = { // Finalize the DataExprs, needed as input for further evaluation val timestamp = group.timestamp - val groupedDatapoints = group.values + val groupedDatapoints = group.dataExprValues - val expressionDatapoints = noData ++ groupedDatapoints.map { + val dataExprToDatapoints = noData ++ groupedDatapoints.map { case (k, vs) => - k -> AggrDatapoint.aggregate(vs).map(_.toTimeSeries) + k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries) } - val expressionDiagnostics = groupedDatapoints.map { + + val dataExprToDiagnostics = groupedDatapoints.map { case (k, vs) => val t = Instant.ofEpochMilli(timestamp) - k -> DiagnosticMessage.info(s"$t: ${vs.length} input datapoints for [$k]") + k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]") } + val finalDataRateMap = mutable.Map.empty[String, Long] // Generate the time series and diagnostic output val output = recipients.flatMap { - case (expr, ids) => + case (styleExpr, ids) => // Use an identity map for the state to ensure that multiple equivalent stateful // expressions, e.g. derivative(a) + derivative(a), will have isolated state. - val state = states.getOrElse(expr, IdentityMap.empty[StatefulExpr, Any]) + val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any]) val context = EvalContext(timestamp, timestamp + step, step, state) try { - val result = expr.expr.eval(context, expressionDatapoints) - states(expr) = result.state - val data = if (result.data.isEmpty) List(noData(expr)) else result.data + val result = styleExpr.expr.eval(context, dataExprToDatapoints) + states(styleExpr) = result.state + val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data val msgs = data.map { t => - TimeSeriesMessage(expr, context, t.withLabel(expr.legend(t))) + TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t))) } - val diagnostics = expr.expr.dataExprs.flatMap(expressionDiagnostics.get) + val diagnostics = styleExpr.expr.dataExprs.flatMap(dataExprToDiagnostics.get) + + // Collect final output data count + ids.foreach(id => + finalDataRateMap.get(id) match { + case Some(count) => finalDataRateMap.update(id, count + data.length) + case None => finalDataRateMap += id -> data.length + } + ) ids.flatMap { id => (msgs ++ diagnostics).map { msg => @@ -185,14 +210,41 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) } } catch { case e: Exception => - val msg = error(expr.toString, "final eval failed", e) + val msg = error(styleExpr.toString, "final eval failed", e) ids.map { id => new MessageEnvelope(id, msg) } } } - push(out, Source(output)) + // Raw data rate for each DataSource + val rawDataRateMessages = dataSourceIdToDataExprs.map(kv => + new MessageEnvelope( + kv._1, + DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step) + ) + ) + + // Final output data rate for each DataSource + val finalDataRateMessages = finalDataRateMap.map(kv => { + new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step)) + }) + + push(out, Source(output ++ rawDataRateMessages ++ finalDataRateMessages)) + } + + private def getNumRawDatapoints( + dataExprs: mutable.Set[DataExpr], + timeGroup: TimeGroup + ): Long = { + dataExprs + .map(dataExpr => { + timeGroup.dataExprValues.get(dataExpr) match { + case Some(v) => v.numRawDatapoints + case None => 0 + } + }) + .sum } override def onPush(): Unit = { diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala index 0f213dc61..1785205a0 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/TimeGrouped.scala @@ -25,6 +25,7 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.eval.model.AggrDatapoint +import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.TimeGroup /** @@ -107,13 +108,20 @@ private[stream] class TimeGrouped( * so it can be used for a new time window. */ private def flush(i: Int): Unit = { - val vs = buf(i).map(t => t._1 -> t._2.datapoints).toMap val t = timestamps(i) - if (t > 0) push(out, TimeGroup(t, step, vs)) else pull(in) + if (t > 0) push(out, toTimeGroup(t, buf(i))) else pull(in) cutoffTime = t buf(i) = new AggrMap } + private def toTimeGroup(ts: Long, aggrMap: AggrMap): TimeGroup = { + TimeGroup( + ts, + step, + aggrMap.map(t => t._1 -> AggrValuesInfo(t._2.datapoints, t._2.numRawDatapoints)).toMap + ) + } + override def onPush(): Unit = { val v = grab(in) val t = v.timestamp @@ -149,8 +157,7 @@ private[stream] class TimeGrouped( override def onUpstreamFinish(): Unit = { val groups = buf.indices.map { i => - val vs = buf(i).map(t => t._1 -> t._2.datapoints).toMap - TimeGroup(timestamps(i), step, vs) + toTimeGroup(timestamps(i), buf(i)) }.toList pending = groups.filter(_.timestamp > 0).sortWith(_.timestamp < _.timestamp) flushPending() diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index c6d57880f..3dbcdc0af 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -25,6 +25,7 @@ import com.netflix.atlas.core.model.MathExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.eval.model.AggrDatapoint +import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.ArrayData import com.netflix.atlas.eval.model.TimeGroup import com.netflix.atlas.eval.model.TimeSeriesMessage @@ -67,7 +68,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val values = vs .map(_.copy(timestamp = timestamp)) .groupBy(_.expr) - .map(t => t._1 -> t._2.toList) + .map(t => t._1 -> AggrValuesInfo(t._2.toList, t._2.size)) .toMap TimeGroup(timestamp, step, values) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala index 3f703b520..2d2a2d901 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/TimeGroupedSuite.scala @@ -21,6 +21,7 @@ import akka.stream.scaladsl.Source import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.eval.model.AggrDatapoint +import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.TimeGroup import com.netflix.spectator.api.DefaultRegistry import org.scalatest.funsuite.AnyFunSuite @@ -44,7 +45,11 @@ class TimeGroupedSuite extends AnyFunSuite { Await .result(future, Duration.Inf) .reverse - .map(g => g.copy(values = g.values.map(t => t._1 -> t._2.sortWith(_.value < _.value)))) + .map(g => + g.copy(dataExprValues = g.dataExprValues + .map(t => t._1 -> t._2.copy(values = t._2.values.sortWith(_.value < _.value))) + ) + ) } private def run(data: List[AggrDatapoint]): List[TimeGroup] = { @@ -59,7 +64,7 @@ class TimeGroupedSuite extends AnyFunSuite { if (vs.isEmpty) TimeGroup(t, step, Map.empty) else - TimeGroup(t, step, Map(expr -> vs)) + TimeGroup(t, step, Map(expr -> AggrValuesInfo(vs, vs.size))) } private def datapoint(t: Long, v: Int): AggrDatapoint = { @@ -204,7 +209,7 @@ class TimeGroupedSuite extends AnyFunSuite { val expected = AggrDatapoint(10, 10, expr, "test", Map.empty, n * (n - 1) / 2) val groups = run(data) - assert(groups === List(TimeGroup(10, step, Map(expr -> List(expected))))) + assert(groups === List(TimeGroup(10, step, Map(expr -> AggrValuesInfo(List(expected), n))))) } test("simple aggregate: min") { @@ -216,7 +221,7 @@ class TimeGroupedSuite extends AnyFunSuite { val expected = AggrDatapoint(10, 10, expr, "test", Map.empty, 0) val groups = run(data) - assert(groups === List(TimeGroup(10, step, Map(expr -> List(expected))))) + assert(groups === List(TimeGroup(10, step, Map(expr -> AggrValuesInfo(List(expected), n))))) } test("simple aggregate: max") { @@ -228,7 +233,7 @@ class TimeGroupedSuite extends AnyFunSuite { val expected = AggrDatapoint(10, 10, expr, "test", Map.empty, n - 1) val groups = run(data) - assert(groups === List(TimeGroup(10, step, Map(expr -> List(expected))))) + assert(groups === List(TimeGroup(10, step, Map(expr -> AggrValuesInfo(List(expected), n))))) } test("simple aggregate: count") { @@ -240,7 +245,7 @@ class TimeGroupedSuite extends AnyFunSuite { val expected = AggrDatapoint(10, 10, expr, "test", Map.empty, n * (n - 1) / 2) val groups = run(data) - assert(groups === List(TimeGroup(10, step, Map(expr -> List(expected))))) + assert(groups === List(TimeGroup(10, step, Map(expr -> AggrValuesInfo(List(expected), n))))) } test("group by aggregate: sum") { @@ -251,9 +256,12 @@ class TimeGroupedSuite extends AnyFunSuite { AggrDatapoint(10, 10, expr, "test", Map("category" -> category), i) } val expected = Map( - expr -> List( - AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n * (n - 1)), - AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n * n) + expr -> AggrValuesInfo( + List( + AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n * (n - 1)), + AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n * n) + ), + 2 * n ) ) @@ -269,9 +277,12 @@ class TimeGroupedSuite extends AnyFunSuite { AggrDatapoint(10, 10, expr, "test", Map("category" -> category), i) } val expected = Map( - expr -> List( - AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), 0), - AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), 1) + expr -> AggrValuesInfo( + List( + AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), 0), + AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), 1) + ), + n ) ) @@ -287,9 +298,12 @@ class TimeGroupedSuite extends AnyFunSuite { AggrDatapoint(10, 10, expr, "test", Map("category" -> category), i) } val expected = Map( - expr -> List( - AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n - 2), - AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n - 1) + expr -> AggrValuesInfo( + List( + AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n - 2), + AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n - 1) + ), + n ) ) @@ -305,9 +319,12 @@ class TimeGroupedSuite extends AnyFunSuite { AggrDatapoint(10, 10, expr, "test", Map("category" -> category), i) } val expected = Map( - expr -> List( - AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n * (n - 1)), - AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n * n) + expr -> AggrValuesInfo( + List( + AggrDatapoint(10, 10, expr, "test", Map("category" -> "even"), n * (n - 1)), + AggrDatapoint(10, 10, expr, "test", Map("category" -> "odd"), n * n) + ), + 2 * n ) ) From e53c4c9f6fc2235a796277bfaaaac83468f2ad15 Mon Sep 17 00:00:00 2001 From: Tim Jiang Date: Fri, 31 Jul 2020 09:19:28 -0700 Subject: [PATCH 2/4] fix unit test --- .../eval/stream/FinalExprEvalSuite.scala | 55 +++++++++++++------ 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 3dbcdc0af..efbb9bc03 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -69,7 +69,6 @@ class FinalExprEvalSuite extends AnyFunSuite { .map(_.copy(timestamp = timestamp)) .groupBy(_.expr) .map(t => t._1 -> AggrValuesInfo(t._2.toList, t._2.size)) - .toMap TimeGroup(timestamp, step, values) } @@ -94,11 +93,31 @@ class FinalExprEvalSuite extends AnyFunSuite { TimeGroup(0L, step, Map.empty) ) val output = run(input) - assert(output.size === 1) - output.foreach { env => - assert(env.getId === "a") - val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] - assert(ts.label === "(NO DATA / NO DATA)") + assert(output.size === 3) + + val tsMsgs = output.filter(isTimeSeries) + assert(tsMsgs.size === 1) + val (tsId, tsMsg) = tsMsgs.head.getId -> tsMsgs.head.getMessage.asInstanceOf[TimeSeriesMessage] + assert(tsId == "a") + assert(tsMsg.label === "(NO DATA / NO DATA)") + + } + + private def isTimeSeries(messageEnvelope: MessageEnvelope): Boolean = { + messageEnvelope.getMessage match { + case _: TimeSeriesMessage => true + case _ => false + } + } + + private def isDiagnosticMessageWith( + messageEnvelope: MessageEnvelope, + theType: String, + keyword: String + ): Boolean = { + messageEnvelope.getMessage match { + case DiagnosticMessage(tp, msg, _) if tp == theType && msg.contains(keyword) => true + case _ => false } } @@ -133,7 +152,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 4) val expectedTimeseries = List(Double.NaN, 42.0, 43.0, 44.0) timeseries.zip(expectedTimeseries).foreach { @@ -143,7 +162,7 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(ts, expectedValue) } - val diagnostics = output.filter(_.getMessage.isInstanceOf[DiagnosticMessage]) + val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints for")) assert(diagnostics.size === 3) val expectedDiagnostics = List( DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"), @@ -178,7 +197,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 4) val expectedTimeseries = List(Double.NaN, 42.0, 129.0, 87.0) timeseries.zip(expectedTimeseries).foreach { @@ -188,7 +207,7 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(ts, expectedValue) } - val diagnostics = output.filter(_.getMessage.isInstanceOf[DiagnosticMessage]) + val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) assert(diagnostics.size === 3) val expectedDiagnostics = List( DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"), @@ -228,7 +247,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 3 + 3) // 3 for expr1, 3 for expr2 val expectedTimeseries1 = scala.collection.mutable.Queue(42.0, 84.0, 44.0) @@ -241,7 +260,7 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(actual, expectedTimeseries2.dequeue()) } - val diagnostics = output.filter(_.getMessage.isInstanceOf[DiagnosticMessage]) + val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) assert(diagnostics.size === 3 + 2) // 3 for datasource a, 2 for datasource b val expectedDiagnostics1 = scala.collection.mutable.Queue( @@ -291,7 +310,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 4) timeseries.foreach { env => val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] @@ -304,7 +323,7 @@ class FinalExprEvalSuite extends AnyFunSuite { } } - val diagnostics = output.filter(_.getMessage.isInstanceOf[DiagnosticMessage]) + val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) assert(diagnostics.size === 6) val expectedDiagnostics = List( @@ -361,7 +380,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 4) val expectedTimeseries = List(0.0, 0.0, 0.0, 0.0) timeseries.zip(expectedTimeseries).foreach { @@ -391,7 +410,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 5) val expectedTimeseries = List(Double.NaN, Double.NaN, -2.0, 0.0, -4.0) timeseries.zip(expectedTimeseries).foreach { @@ -437,7 +456,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 8) val expectedTimeseries = List(0.0, 1.0, 1.0, 2.0, 1.0, 1.0, 0.0, 1.0) timeseries.zip(expectedTimeseries).foreach { @@ -475,7 +494,7 @@ class FinalExprEvalSuite extends AnyFunSuite { val output = run(input) - val timeseries = output.filter(_.getMessage.isInstanceOf[TimeSeriesMessage]) + val timeseries = output.filter(isTimeSeries) assert(timeseries.size === 8) timeseries.foreach { env => val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] From daa0c5143b10be1bebcb1ace4f34584e410b5a95 Mon Sep 17 00:00:00 2001 From: Tim Jiang Date: Fri, 31 Jul 2020 17:38:11 -0700 Subject: [PATCH 3/4] collect data rate for raw input, intermediate input and final output --- .../atlas/akka/DiagnosticMessage.scala | 9 -- .../atlas/eval/model/AggrDatapoint.scala | 2 +- .../atlas/eval/model/EvalDataRate.scala | 30 ++++++ .../netflix/atlas/eval/model/TimeGroup.scala | 2 +- .../eval/stream/EvalDataRateCollector.scala | 92 +++++++++++++++++++ .../atlas/eval/stream/FinalExprEval.scala | 64 ++++--------- .../eval/stream/FinalExprEvalSuite.scala | 75 +-------------- 7 files changed, 145 insertions(+), 129 deletions(-) create mode 100644 atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EvalDataRate.scala create mode 100644 atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala diff --git a/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala b/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala index 076b39087..24a9a513a 100644 --- a/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala +++ b/atlas-akka/src/main/scala/com/netflix/atlas/akka/DiagnosticMessage.scala @@ -26,7 +26,6 @@ object DiagnosticMessage { final val Warning: String = "warn" final val Error: String = "error" final val Close: String = "close" - final val Rate: String = "rate" def info(message: String): DiagnosticMessage = { DiagnosticMessage(Info, message, None) @@ -44,14 +43,6 @@ object DiagnosticMessage { error(s"${t.getClass.getSimpleName}: ${t.getMessage}") } - def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = { - DiagnosticMessage( - Rate, - s"dataType=$dataType,count=$count,timestamp=$timestamp,step=$step", - None - ) - } - val close: DiagnosticMessage = { DiagnosticMessage(Close, "operation complete", None) } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala index 83293a012..54511d3d2 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/AggrDatapoint.scala @@ -89,7 +89,7 @@ object AggrDatapoint { */ trait Aggregator { protected[this] var rawDatapointCounter = 0 - def numRawDatapoints: Long = rawDatapointCounter + def numRawDatapoints: Int = rawDatapointCounter def aggregate(datapoint: AggrDatapoint): Aggregator def datapoints: List[AggrDatapoint] } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EvalDataRate.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EvalDataRate.scala new file mode 100644 index 000000000..b6509e1fd --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/EvalDataRate.scala @@ -0,0 +1,30 @@ +/* + * Copyright 2014-2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.model + +import com.netflix.atlas.json.JsonSupport + +case class EvalDataRate( + timestamp: Long, + step: Long, + inputSize: EvalDataSize, + intermediateSize: EvalDataSize, + outputSize: EvalDataSize +) extends JsonSupport { + val `type`: String = "rate" +} + +case class EvalDataSize(total: Int, details: Map[String, Int] = Map.empty[String, Int]) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala index f7296e666..5b932d6e0 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/model/TimeGroup.scala @@ -32,4 +32,4 @@ import com.netflix.atlas.core.model.DataExpr * Values associated with this time. */ case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo]) -case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Long) +case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Int) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala new file mode 100644 index 000000000..4555a33e2 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala @@ -0,0 +1,92 @@ +/* + * Copyright 2014-2020 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.netflix.atlas.core.model.DataExpr +import com.netflix.atlas.core.util.RefIntHashMap +import com.netflix.atlas.eval.model.EvalDataRate +import com.netflix.atlas.eval.model.EvalDataSize + +import scala.collection.mutable + +class EvalDataRateCollector(timestamp: Long, step: Long) { + + private val inputCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]] + private val intermediateCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]] + private val outputCounts = new RefIntHashMap[String] + + def incrementOutput(id: String, amount: Int): Unit = { + outputCounts.increment(id, amount) + } + + def incrementIntermediate(id: String, dataExpr: DataExpr, amount: Int): Unit = { + increment(intermediateCounts, id, dataExpr, amount) + } + + def incrementInput(id: String, dataExpr: DataExpr, amount: Int): Unit = { + increment(inputCounts, id, dataExpr, amount) + } + + def getAll: Map[String, EvalDataRate] = { + inputCounts.map { + case (id, _) => { + id -> EvalDataRate( + timestamp, + step, + getDataRate(inputCounts, id), + getDataRate(intermediateCounts, id), + EvalDataSize(outputCounts.get(id, 0)) + ) + } + }.toMap + } + + private def getDataRate( + counts: mutable.Map[String, RefIntHashMap[DataExpr]], + id: String + ): EvalDataSize = { + counts.get(id) match { + case Some(v: RefIntHashMap[DataExpr]) => + var total = 0 + val builder = Map.newBuilder[String, Int] + v.foreach { (dataExpr, count) => + builder += dataExpr.toString -> count + total += count + } + EvalDataSize(total, builder.result()) + case None => EvalDataRateCollector.EmptyRate + } + } + + private def increment( + counts: mutable.Map[String, RefIntHashMap[DataExpr]], + id: String, + dataExpr: DataExpr, + amount: Int + ): Unit = { + counts.get(id) match { + case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount) + case None => + val temp = new RefIntHashMap[DataExpr] + temp.increment(dataExpr, amount) + counts += (id -> temp) + } + } +} + +object EvalDataRateCollector { + val EmptyRate = EvalDataSize(0) +} diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index a46347c80..05c3c8173 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -15,8 +15,6 @@ */ package com.netflix.atlas.eval.stream -import java.time.Instant - import akka.NotUsed import akka.http.scaladsl.model.Uri import akka.stream.Attributes @@ -171,13 +169,21 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries) } - val dataExprToDiagnostics = groupedDatapoints.map { - case (k, vs) => - val t = Instant.ofEpochMilli(timestamp) - k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]") + // Collect input and intermediate data size per DataSource + val rateCollector = new EvalDataRateCollector(timestamp, step) + dataSourceIdToDataExprs.foreach { + case (id, dataExprSet) => { + dataExprSet.foreach(dataExpr => { + group.dataExprValues.get(dataExpr).foreach { info => + { + rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) + rateCollector.incrementIntermediate(id, dataExpr, info.values.size) + } + } + }) + } } - val finalDataRateMap = mutable.Map.empty[String, Long] // Generate the time series and diagnostic output val output = recipients.flatMap { case (styleExpr, ids) => @@ -193,18 +199,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t))) } - val diagnostics = styleExpr.expr.dataExprs.flatMap(dataExprToDiagnostics.get) - - // Collect final output data count - ids.foreach(id => - finalDataRateMap.get(id) match { - case Some(count) => finalDataRateMap.update(id, count + data.length) - case None => finalDataRateMap += id -> data.length - } - ) + // Collect final data size per DataSource + ids.foreach(rateCollector.incrementOutput(_, data.size)) ids.flatMap { id => - (msgs ++ diagnostics).map { msg => + msgs.map { msg => new MessageEnvelope(id, msg) } } @@ -217,34 +216,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) } } - // Raw data rate for each DataSource - val rawDataRateMessages = dataSourceIdToDataExprs.map(kv => - new MessageEnvelope( - kv._1, - DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step) - ) - ) - - // Final output data rate for each DataSource - val finalDataRateMessages = finalDataRateMap.map(kv => { - new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step)) - }) - - push(out, Source(output ++ rawDataRateMessages ++ finalDataRateMessages)) - } + val rateMessages = rateCollector.getAll.map { + case (id, rate) => new MessageEnvelope(id, rate) + }.toList - private def getNumRawDatapoints( - dataExprs: mutable.Set[DataExpr], - timeGroup: TimeGroup - ): Long = { - dataExprs - .map(dataExpr => { - timeGroup.dataExprValues.get(dataExpr) match { - case Some(v) => v.numRawDatapoints - case None => 0 - } - }) - .sum + push(out, Source(output ++ rateMessages)) } override def onPush(): Unit = { diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index efbb9bc03..1b56ded31 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -19,7 +19,6 @@ import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source -import com.netflix.atlas.akka.DiagnosticMessage import com.netflix.atlas.core.model.DataExpr import com.netflix.atlas.core.model.MathExpr import com.netflix.atlas.core.model.Query @@ -93,7 +92,7 @@ class FinalExprEvalSuite extends AnyFunSuite { TimeGroup(0L, step, Map.empty) ) val output = run(input) - assert(output.size === 3) + assert(output.size === 1) val tsMsgs = output.filter(isTimeSeries) assert(tsMsgs.size === 1) @@ -110,17 +109,6 @@ class FinalExprEvalSuite extends AnyFunSuite { } } - private def isDiagnosticMessageWith( - messageEnvelope: MessageEnvelope, - theType: String, - keyword: String - ): Boolean = { - messageEnvelope.getMessage match { - case DiagnosticMessage(tp, msg, _) if tp == theType && msg.contains(keyword) => true - case _ => false - } - } - private def getValue(ts: TimeSeriesMessage): Double = { ts.data match { case ArrayData(vs) => @@ -161,18 +149,6 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } - - val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints for")) - assert(diagnostics.size === 3) - val expectedDiagnostics = List( - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr]"), - DiagnosticMessage.info(s"1970-01-01T00:03:00Z: 1 input datapoints for [$expr]") - ) - diagnostics.zip(expectedDiagnostics).foreach { - case (actual, expected) => - assert(actual.getMessage === expected) - } } test("aggregate with multiple datapoints per group") { @@ -206,19 +182,6 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } - - val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) - assert(diagnostics.size === 3) - val expectedDiagnostics = List( - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 3 input datapoints for [$expr]"), - DiagnosticMessage.info(s"1970-01-01T00:03:00Z: 2 input datapoints for [$expr]") - ) - diagnostics.zip(expectedDiagnostics).foreach { - case (actual, expected) => - assert(actual.getMessage === expected) - } - } test("aggregate with multiple expressions") { @@ -259,26 +222,6 @@ class FinalExprEvalSuite extends AnyFunSuite { else checkValue(actual, expectedTimeseries2.dequeue()) } - - val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) - assert(diagnostics.size === 3 + 2) // 3 for datasource a, 2 for datasource b - - val expectedDiagnostics1 = scala.collection.mutable.Queue( - DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 1 input datapoints for [$expr1]"), - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr1]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr1]") - ) - val expectedDiagnostics2 = scala.collection.mutable.Queue( - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr2]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 2 input datapoints for [$expr2]") - ) - diagnostics.foreach { env => - val actual = env.getMessage.asInstanceOf[DiagnosticMessage] - if (env.getId == "a") - assert(actual === expectedDiagnostics1.dequeue()) - else - assert(actual === expectedDiagnostics2.dequeue()) - } } // https://github.com/Netflix/atlas/issues/693 @@ -322,22 +265,6 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(ts, 21.0) } } - - val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints")) - assert(diagnostics.size === 6) - - val expectedDiagnostics = List( - DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 1 input datapoints for [$expr1]"), - DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 2 input datapoints for [$expr2]"), - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr1]"), - DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr2]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 2 input datapoints for [$expr1]"), - DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr2]") - ) - diagnostics.zip(expectedDiagnostics).foreach { - case (actual, expected) => - assert(actual.getMessage === expected) - } } // https://github.com/Netflix/atlas/issues/762 From 51dbdee9a99c4afd4cf2a19f6379ed283ec03235 Mon Sep 17 00:00:00 2001 From: Tim Jiang Date: Tue, 4 Aug 2020 12:01:20 -0700 Subject: [PATCH 4/4] code review fixes, adding unit test, document data rate messages --- atlas-eval/README.md | 40 ++++ .../eval/stream/EvalDataRateCollector.scala | 8 +- .../atlas/eval/stream/FinalExprEval.scala | 33 ++-- .../eval/stream/FinalExprEvalSuite.scala | 185 ++++++++++++++++++ 4 files changed, 243 insertions(+), 23 deletions(-) diff --git a/atlas-eval/README.md b/atlas-eval/README.md index a9651b390..a3ad0f873 100644 --- a/atlas-eval/README.md +++ b/atlas-eval/README.md @@ -49,3 +49,43 @@ Processor processor = Each `DataSource` for the input has a string id that will be added to corresponding `MessageEnvelope` objects in the output. This allows the messages to be matched with the correct input by the user. + +### Data Rate Messages +The library also emits data rate messages to help gain insights in the data rate per `DataSource` +per step. There are 3 type of data sizes in a data rate message: + - Input size: number of data points coming in as raw input. + - Intermediate size: number of aggregate data points that are used in the eval step. + - Output size: number of data points for the final result. + +The "size" has the 2 fields: + - `total`: total number of data points + - `details`: number of data points per data expression (empty for "outputSize") + +For example, given a query of "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by" with 4 nodes +and 2 clusters, one of the data rate message evelope may look like this: +```json5 +{ + "id": "_", + "message": { + "type": "rate", + "timestamp": 1596570660000, + "step": 60000, + "inputSize": { + "total": 4, + "details": { + "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 4 + } + }, + "intermediateSize": { + "total": 2, + "details": { + "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 2 + } + }, + "outputSize": { + "total": 2, + "details": {} + } + } +} +``` \ No newline at end of file diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala index 4555a33e2..abab6a4aa 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvalDataRateCollector.scala @@ -77,13 +77,7 @@ class EvalDataRateCollector(timestamp: Long, step: Long) { dataExpr: DataExpr, amount: Int ): Unit = { - counts.get(id) match { - case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount) - case None => - val temp = new RefIntHashMap[DataExpr] - temp.increment(dataExpr, amount) - counts += (id -> temp) - } + counts.getOrElseUpdate(id, new RefIntHashMap[DataExpr]).increment(dataExpr, amount) } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala index 05c3c8173..7c838635b 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/FinalExprEval.scala @@ -74,7 +74,7 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) private var recipients = List.empty[(StyleExpr, List[String])] // Track the set of DataExprs per DataSource - private var dataSourceIdToDataExprs = mutable.Map.empty[String, mutable.Set[DataExpr]] + private var dataSourceIdToDataExprs = Map.empty[String, Set[DataExpr]] // Empty data map used as base to account for expressions that do not have any // matches for a given time interval @@ -116,15 +116,19 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) .map(t => t._1 -> t._2.map(_._2)) .toList - // Compute the map from DataSource id to DataExprs - dataSourceIdToDataExprs = mutable.Map.empty - recipients.foreach(kv => { - kv._2.foreach(id => { - val dataExprSet = dataSourceIdToDataExprs.getOrElse(id, mutable.Set.empty) - dataExprSet.addAll(kv._1.expr.dataExprs) - dataSourceIdToDataExprs(id) = dataExprSet - }) - }) + dataSourceIdToDataExprs = recipients + .flatMap(styleExprAndIds => + styleExprAndIds._2.map(id => id -> styleExprAndIds._1.expr.dataExprs.toSet) + ) + // Fold to mutable map to avoid creating new Map on every update + .foldLeft(mutable.Map.empty[String, Set[DataExpr]]) { + case (map, (id, dataExprs)) => { + map += map.get(id).fold(id -> dataExprs) { vs => + id -> (dataExprs ++ vs) + } + } + } + .toMap // Cleanup state for any expressions that are no longer needed val removed = previous.keySet -- recipients.map(_._1).toSet @@ -172,16 +176,13 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter) // Collect input and intermediate data size per DataSource val rateCollector = new EvalDataRateCollector(timestamp, step) dataSourceIdToDataExprs.foreach { - case (id, dataExprSet) => { + case (id, dataExprSet) => dataExprSet.foreach(dataExpr => { group.dataExprValues.get(dataExpr).foreach { info => - { - rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) - rateCollector.incrementIntermediate(id, dataExpr, info.values.size) - } + rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints) + rateCollector.incrementIntermediate(id, dataExpr, info.values.size) } }) - } } // Generate the time series and diagnostic output diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala index 1b56ded31..7d3a33729 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/FinalExprEvalSuite.scala @@ -26,6 +26,8 @@ import com.netflix.atlas.core.model.StatefulExpr import com.netflix.atlas.eval.model.AggrDatapoint import com.netflix.atlas.eval.model.AggrValuesInfo import com.netflix.atlas.eval.model.ArrayData +import com.netflix.atlas.eval.model.EvalDataRate +import com.netflix.atlas.eval.model.EvalDataSize import com.netflix.atlas.eval.model.TimeGroup import com.netflix.atlas.eval.model.TimeSeriesMessage import com.netflix.atlas.eval.stream.Evaluator.DataSource @@ -109,6 +111,34 @@ class FinalExprEvalSuite extends AnyFunSuite { } } + private def isEvalDataRate(messageEnvelope: MessageEnvelope): Boolean = { + messageEnvelope.getMessage match { + case _: EvalDataRate => true + case _ => false + } + } + + private def getAsEvalDataRate( + env: MessageEnvelope + ): EvalDataRate = { + env.getMessage.asInstanceOf[EvalDataRate] + } + + private def checkRate( + rate: EvalDataRate, + timestamp: Long, + step: Long, + inputSize: EvalDataSize, + intermediateSize: EvalDataSize, + outputSize: EvalDataSize + ): Unit = { + assert(rate.timestamp === timestamp) + assert(rate.step === step) + assert(rate.inputSize === inputSize) + assert(rate.intermediateSize === intermediateSize) + assert(rate.outputSize === outputSize) + } + private def getValue(ts: TimeSeriesMessage): Double = { ts.data match { case ArrayData(vs) => @@ -149,6 +179,38 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } test("aggregate with multiple datapoints per group") { @@ -182,6 +244,38 @@ class FinalExprEvalSuite extends AnyFunSuite { val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] checkValue(ts, expectedValue) } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1, Map(expr.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(3, Map(expr.toString -> 3)), + EvalDataSize(3, Map(expr.toString -> 3)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr.toString -> 2)), + EvalDataSize(2, Map(expr.toString -> 2)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } test("aggregate with multiple expressions") { @@ -222,6 +316,65 @@ class FinalExprEvalSuite extends AnyFunSuite { else checkValue(actual, expectedTimeseries2.dequeue()) } + + val expr1DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(expr1DataRateMsgs.size == 3) + val expr1ExpectedSizes = Array( + Array( + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr1.toString -> 2)), + EvalDataSize(2, Map(expr1.toString -> 2)), + EvalDataSize(1) + ), + Array( + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1, Map(expr1.toString -> 1)), + EvalDataSize(1) + ) + ) + expr1DataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * i, + 60000, + expr1ExpectedSizes(i)(0), + expr1ExpectedSizes(i)(1), + expr1ExpectedSizes(i)(2) + ) + }) + + val expr2DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "b") + assert(expr2DataRateMsgs.size == 2) + val expr2ExpectedSizes = Array( + Array( + EvalDataSize(1, Map(expr2.toString -> 1)), + EvalDataSize(1, Map(expr2.toString -> 1)), + EvalDataSize(1) + ), + Array( + EvalDataSize(2, Map(expr2.toString -> 2)), + EvalDataSize(2, Map(expr2.toString -> 2)), + EvalDataSize(1) + ) + ) + expr2DataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * (i + 1), + 60000, + expr2ExpectedSizes(i)(0), + expr2ExpectedSizes(i)(1), + expr2ExpectedSizes(i)(2) + ) + }) } // https://github.com/Netflix/atlas/issues/693 @@ -265,6 +418,38 @@ class FinalExprEvalSuite extends AnyFunSuite { checkValue(ts, 21.0) } } + + val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a") + assert(dataRateMsgs.size == 3) + val expectedSizes = Array( + Array( + EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)), + EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)), + EvalDataSize(1) + ), + Array( + EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)), + EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)), + EvalDataSize(2) + ), + Array( + EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)), + EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)), + EvalDataSize(1) + ) + ) + dataRateMsgs.zipWithIndex.foreach(envAndIndex => { + val rate = getAsEvalDataRate(envAndIndex._1) + val i = envAndIndex._2 + checkRate( + rate, + 60000 * i, + 60000, + expectedSizes(i)(0), + expectedSizes(i)(1), + expectedSizes(i)(2) + ) + }) } // https://github.com/Netflix/atlas/issues/762