diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala index 9a3227169..e19205ff7 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/AggregateCollector.scala @@ -123,11 +123,11 @@ abstract class SimpleAggregateCollector extends AggregateCollector { blocks.foreach { b => if (valueMask != null) { - val v = buffer.aggrBlock(tags, b, aggr, ConsolidationFunction.Sum, multiple, op) + val v = buffer.aggrBlock(b, aggr, ConsolidationFunction.Sum, multiple, op) buffer.valueMask(valueMask, b, multiple) valueCount += v } else { - val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op) + val v = buffer.aggrBlock(b, aggr, cf, multiple, op) valueCount += v } } @@ -286,7 +286,7 @@ class AllAggregateCollector extends LimitedAggregateCollector { } blocks.foreach { b => - val v = buffer.aggrBlock(tags, b, aggr, cf, multiple, op) + val v = buffer.aggrBlock(b, aggr, cf, multiple, op) valueCount += v } diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala index 61ec5d14c..25248ada1 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/MemoryDatabase.scala @@ -172,13 +172,7 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database { val bufEnd = bufStart + cfStepLength * cfStep - cfStep def newBuffer(tags: Map[String, String]): TimeSeriesBuffer = { - val resultTags = expr match { - case _: DataExpr.All => tags - case _ => - val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping - tags.filter(t => resultKeys.contains(t._1)) - } - TimeSeriesBuffer(resultTags, cfStep, bufStart, bufEnd) + TimeSeriesBuffer(tags, cfStep, bufStart, bufEnd) } index.findItems(query).foreach { item => @@ -199,9 +193,14 @@ class MemoryDatabase(registry: Registry, config: Config) extends Database { queryInputDatapoints.increment(stats.inputDatapoints) queryOutputDatapoints.increment(stats.outputDatapoints) + val resultKeys = Query.exactKeys(expr.query) ++ expr.finalGrouping val vs = collector.result .map { t => - DataExpr.withDefaultLabel(expr, t) + val resultTags = expr match { + case _: DataExpr.All => t.tags + case _ => t.tags.filter(t => resultKeys.contains(t._1)) + } + DataExpr.withDefaultLabel(expr, t.withTags(resultTags)) } .sortWith { _.label < _.label } finalValues(context, expr, vs) diff --git a/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala b/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala index 50b2d2a5a..023e3b42d 100644 --- a/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala +++ b/atlas-core/src/main/scala/com/netflix/atlas/core/db/TimeSeriesBuffer.scala @@ -129,7 +129,7 @@ object TimeSeriesBuffer { /** * Mutable buffer for efficiently manipulating metric data. */ -final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeSeq) +final class TimeSeriesBuffer(val tags: Map[String, String], val data: ArrayTimeSeq) extends TimeSeries with TimeSeq with LazyTaggedItem { @@ -167,24 +167,14 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS new TimeSeriesBuffer(tags, new ArrayTimeSeq(data.dsType, start, step, values.clone)) } - /** - * Compute the new tags for the aggregate buffer. The tags are the - * intersection of tag values. - */ - private def aggrTags(t: Map[String, String]): Map[String, String] = { - tags.toSet.intersect(t.toSet).toMap - } - /** Aggregate the data from the block into this buffer. */ private[db] def aggrBlock( - blkTags: Map[String, String], block: Block, aggr: Int, cf: ConsolidationFunction, multiple: Int, op: (Double, Double) => Double ): Int = { - tags = aggrTags(blkTags) val s = start / step val e = values.length + s - 1 val bs = block.start / step @@ -260,11 +250,10 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS /** * Add the corresponding positions of the two buffers. The buffers must have - * the same settings. The tags for the new buffer will be the intersection. + * the same settings. */ def add(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -273,8 +262,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def add(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN) + def add(block: Block): Int = { + aggrBlock(block, Block.Sum, ConsolidationFunction.Sum, 1, Math.addNaN) } /** @@ -295,7 +284,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def subtract(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -322,7 +310,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def multiply(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -349,7 +336,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def divide(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -376,7 +362,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def max(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -387,8 +372,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def max(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN) + def max(block: Block): Int = { + aggrBlock(block, Block.Max, ConsolidationFunction.Sum, 1, Math.maxNaN) } /** @@ -398,7 +383,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def min(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -409,8 +393,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def min(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN) + def min(block: Block): Int = { + aggrBlock(block, Block.Min, ConsolidationFunction.Sum, 1, Math.minNaN) } /** @@ -431,7 +415,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS */ def count(ts: TimeSeriesBuffer): Unit = { val nts = ts.normalize(step, start, values.length) - tags = aggrTags(nts.tags) val length = scala.math.min(values.length, nts.values.length) var pos = 0 while (pos < length) { @@ -442,8 +425,8 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS } } - def count(tags: Map[String, String], block: Block): Int = { - aggrBlock(tags, block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN) + def count(block: Block): Int = { + aggrBlock(block, Block.Count, ConsolidationFunction.Sum, 1, Math.addNaN) } /** @@ -452,7 +435,6 @@ final class TimeSeriesBuffer(var tags: Map[String, String], val data: ArrayTimeS def merge(ts: TimeSeriesBuffer): Unit = { require(step == ts.step, "step sizes must be the same") require(start == ts.start, "start times must be the same") - tags = aggrTags(ts.tags) val length = math.min(values.length, ts.values.length) var i = 0 while (i < length) { diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala index 194d1e669..b530dac37 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/db/AggregateCollectorSuite.scala @@ -149,7 +149,7 @@ class AggregateCollectorSuite extends FunSuite { newTaggedBuffer(Map("a" -> "4", "b" -> "2", "c" -> "7"), 5.0) ) val expected = List( - newTaggedBuffer(Map("b" -> "2"), 9.0), + newTaggedBuffer(Map("a" -> "1", "b" -> "2"), 9.0), newTaggedBuffer(Map("a" -> "3", "b" -> "3"), 2.0) ) val by = DataExpr.GroupBy(DataExpr.Sum(Query.False), List("b")) diff --git a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala index 7bf5fef8b..7d643b3a5 100644 --- a/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala +++ b/atlas-core/src/test/scala/com/netflix/atlas/core/db/TimeSeriesBufferSuite.scala @@ -71,7 +71,7 @@ class TimeSeriesBufferSuite extends FunSuite { val buffer = TimeSeriesBuffer(tags, step, 1 * step, 19 * step) blocks.foreach { b => - buffer.add(tags, b) + buffer.add(b) } val m = buffer assertEquals(m.step, step) @@ -93,7 +93,7 @@ class TimeSeriesBufferSuite extends FunSuite { val buffer = TimeSeriesBuffer(tags, 6 * step, step, 18 * step) blocks.foreach { b => - buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN) + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, 6, Math.addNaN) } val m = buffer assertEquals(m.step, 6 * step) @@ -115,7 +115,7 @@ class TimeSeriesBufferSuite extends FunSuite { val consol = multiple * step val buffer = TimeSeriesBuffer(tags, consol, consol, 20 * consol) blocks.foreach { b => - buffer.aggrBlock(tags, b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) + buffer.aggrBlock(b, Block.Sum, ConsolidationFunction.Max, multiple, Math.addNaN) } val m = buffer assertEquals(m.step, consol) @@ -123,75 +123,15 @@ class TimeSeriesBufferSuite extends FunSuite { assert(m.values.forall(_ == 4.0)) } - test("cf with start".ignore) { - val tags = emptyTags - val step = 60000L - val block = ArrayBlock(0L, 60) - (8 until 60).foreach { i => - block.buffer(i) = 1.0 - } - - val buffer = TimeSeriesBuffer(tags, 6 * step, step, 60 * step) - buffer.aggrBlock(tags, block, Block.Sum, ConsolidationFunction.Avg, 6, Math.addNaN) - val m = buffer - println(m) - } - - val pairs = List( - (ConsolidationFunction.Avg -> Math.addNaN _), - (ConsolidationFunction.Max -> Math.maxNaN _), - (ConsolidationFunction.Min -> Math.minNaN _) - ) - - /*pairs.foreach { - case (name, af) => - test(s"$name: consolidate then aggregate === aggregate then consolidate") { - val cf = name - val tags = emptyTags[String, String] - val step = 60000L - val blocks = (0 until 1000).map(_ => newBlock(0, 60)) - - val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step) - val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1) - blocks.foreach { b => - afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af) - cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af) - } - - val cfSecond = afFirst.consolidate(6, cf) - (0 until cfFirst.values.length).foreach { i => - assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i") - } - } - - test(s"$name with NaN: consolidate then aggregate === aggregate then consolidate") { - val cf = name - val tags = emptyTags[String, String] - val step = 60000L - val blocks = (0 until 1000).map(_ => newBlockWithNaN(0, 60)) - - val afFirst = TimeSeriesBuffer(tags, step, 0L, 60 * step) - val cfFirst = TimeSeriesBuffer(tags, 6 * step, 0L, 60 * step - 1) - blocks.foreach { b => - afFirst.aggrBlock(tags, b, Block.Sum, cf, 1, af) - cfFirst.aggrBlock(tags, b, Block.Sum, cf, 6, af) - } - - val cfSecond = afFirst.consolidate(6, cf) - (0 until cfFirst.values.length).foreach { i => - assertEquals(cfSecond.values(i), (cfFirst.values(i) +- 1e-9), s"position $i") - } - } - }*/ - test("aggregate tags") { + // No longer done, it will always use the tags from the initial buffer val common = Map("a" -> "b", "c" -> "d") val t1 = common + ("c" -> "e") val t2 = common + ("z" -> "y") val b1 = TimeSeriesBuffer(t1, 60000, 0, Array.fill(1)(0.0)) val b2 = TimeSeriesBuffer(t2, 60000, 0, Array.fill(1)(0.0)) b1.add(b2) - assertEquals(b1.tags, Map("a" -> "b")) + assertEquals(b1.tags, t1) } test("add buffer") { @@ -548,7 +488,7 @@ class TimeSeriesBufferSuite extends FunSuite { val end = bufStart + step * 12 val buffer = TimeSeriesBuffer(emptyTags, step, bufStart, end) - buffer.aggrBlock(emptyTags, block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN) + buffer.aggrBlock(block, Block.Sum, ConsolidationFunction.Avg, 5, Math.addNaN) buffer.values.foreach { v => assert(v.isNaN || v <= 0.0) }