diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala index aaac0a913..bcf79f774 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/LwcToAggrDatapointSuite.scala @@ -110,6 +110,30 @@ class LwcToAggrDatapointSuite extends FunSuite { assertEquals(countData.map(_.value).toSet, Set(4.0)) } + test("eval trace time series") { + val styleExpr = "name,cpu,:eq,:avg" + val tsExpr = s"app,foo,:eq,$styleExpr,:span-time-series" + def subExpr(n: String, e: String): String = { + s"""{"id":"$n","expression":"$e","frequency":$step}""" + } + val expr1 = subExpr("sum", "name,cpu,:eq,:sum") + val expr2 = subExpr("count", "name,cpu,:eq,:count") + val subv2 = + s"""{"type":"subscription-v2","expression":"$tsExpr","exprType":"TRACE_TIME_SERIES","metrics":[$expr1,$expr2]}""" + val results = eval(subv2 :: input.tail) + assertEquals(results.size, 8) + + val groups = results.groupBy(_.expr) + assertEquals(groups.size, 2) + + val sumData = groups(DataExpr.Sum(Query.Equal("name", "cpu"))) + assertEquals(sumData.map(_.value).toSet, Set(1.0, 2.0, 3.0, 4.0)) + + val countData = groups(DataExpr.Count(Query.Equal("name", "cpu"))) + assertEquals(countData.size, 4) + assertEquals(countData.map(_.value).toSet, Set(4.0)) + } + test("diagnostic messages are logged") { logMessages.clear() eval(input) diff --git a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala index f797836bb..3e2dad0f1 100644 --- a/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala +++ b/atlas-lwcapi/src/main/scala/com/netflix/atlas/lwcapi/ExpressionSplitter.scala @@ -21,6 +21,7 @@ import com.netflix.atlas.core.model.EventExpr import com.netflix.atlas.core.model.Query import com.netflix.atlas.core.model.Query.KeyQuery import com.netflix.atlas.core.model.StyleExpr +import com.netflix.atlas.core.model.TraceQuery import com.netflix.atlas.eval.model.ExprType import com.netflix.atlas.eval.stream.ExprInterpreter import com.netflix.spectator.ipc.ServerGroup @@ -134,11 +135,23 @@ class ExpressionSplitter(config: Config) { val q = intern(compress(e.query)) DataExprMeta(e.toString, q) } - case ExprType.TRACE_EVENTS | ExprType.TRACE_TIME_SERIES => + case ExprType.TRACE_EVENTS => parsedExpressions.map { e => // Tracing cannot be scoped to specific infrastructure, always use True DataExprMeta(e.toString, Query.True) } + case ExprType.TRACE_TIME_SERIES => + parsedExpressions + .collect { + case tq: TraceQuery.SpanTimeSeries => + tq.expr.expr.dataExprs.map(e => tq.copy(expr = StyleExpr(e, Map.empty))) + } + .flatten + .distinct + .map { e => + // Tracing cannot be scoped to specific infrastructure, always use True + DataExprMeta(e.toString, Query.True) + } } } diff --git a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala index 89803badf..d38fc28e1 100644 --- a/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala +++ b/atlas-lwcapi/src/test/scala/com/netflix/atlas/lwcapi/ExpressionSplitterSuite.scala @@ -50,6 +50,25 @@ class ExpressionSplitterSuite extends FunSuite { assertEquals(actual, expected) } + test("splits compound trace time series expression into data expressions") { + def childExpr(e: String): String = { + s"nf.app,api,:eq,nf.cluster,skan-test,:eq,:child,$e,:span-time-series" + } + val expr = s"${childExpr(query1)},${childExpr(query1)}" + val actual = splitter.split(expr, ExprType.TRACE_TIME_SERIES, frequency1) + val expected = List( + Subscription( + Query.True, + ExpressionMetadata(childExpr(ds1a), ExprType.TRACE_TIME_SERIES, frequency1) + ), + Subscription( + Query.True, + ExpressionMetadata(childExpr(ds1b), ExprType.TRACE_TIME_SERIES, frequency1) + ) + ).reverse + assertEquals(actual, expected) + } + test("throws IAE for invalid expressions") { val msg = intercept[IllegalArgumentException] { splitter.split("foo", ExprType.TIME_SERIES, frequency1)