Skip to content

Commit

Permalink
code review fixes, adding unit test, document data rate messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jfz committed Aug 4, 2020
1 parent daa0c51 commit 37f93d2
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 13 deletions.
41 changes: 41 additions & 0 deletions atlas-eval/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,44 @@ Processor<Evaluator.DataSources, Evaluator.MessageEnvelope> processor =
Each `DataSource` for the input has a string id that will be added to corresponding
`MessageEnvelope` objects in the output. This allows the messages to be matched with
the correct input by the user.

### Data Rate Messages
The library also emits data rate messages to help gain insights in the data rate per `DataSource`
per step. There are 3 type of data sizes in a data rate message:
- Input size: number of data points coming in as raw input.
- Intermediate size: number of aggregate data points that are used in the eval step, typically
when there's 'group by' in a query.
- Output size: number of data points for the final result.

The "size" has the 2 fields:
- `total`: total number of data points
- `details`: number of data points per data expression (empty for "outputSize")

For example, given a query of "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by" with 4 nodes
and 2 clusters, one of the data rate message evelope may look like this:
```json5
{
"id": "_",
"message": {
"type": "rate",
"timestamp": 1596570660000,
"step": 60000,
"inputSize": {
"total": 4,
"details": {
"app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 4
}
},
"intermediateSize": {
"total": 2,
"details": {
"app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 2
}
},
"outputSize": {
"total": 2,
"details": {}
}
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,7 @@ class EvalDataRateCollector(timestamp: Long, step: Long) {
dataExpr: DataExpr,
amount: Int
): Unit = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount)
case None =>
val temp = new RefIntHashMap[DataExpr]
temp.increment(dataExpr, amount)
counts += (id -> temp)
}
counts.getOrElseUpdate(id, new RefIntHashMap[DataExpr]).increment(dataExpr, amount)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,13 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
// Collect input and intermediate data size per DataSource
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) => {
case (id, dataExprSet) =>
dataExprSet.foreach(dataExpr => {
group.dataExprValues.get(dataExpr).foreach { info =>
{
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
})
}
}

// Generate the time series and diagnostic output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.EvalDataRate
import com.netflix.atlas.eval.model.EvalDataSize
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeSeriesMessage
import com.netflix.atlas.eval.stream.Evaluator.DataSource
Expand Down Expand Up @@ -109,6 +111,34 @@ class FinalExprEvalSuite extends AnyFunSuite {
}
}

private def isEvalDataRate(messageEnvelope: MessageEnvelope): Boolean = {
messageEnvelope.getMessage match {
case _: EvalDataRate => true
case _ => false
}
}

private def getAsEvalDataRate(
env: MessageEnvelope
): EvalDataRate = {
env.getMessage.asInstanceOf[EvalDataRate]
}

private def checkRate(
rate: EvalDataRate,
timestamp: Long,
step: Long,
inputSize: EvalDataSize,
intermediateSize: EvalDataSize,
outputSize: EvalDataSize
): Unit = {
assert(rate.timestamp === timestamp)
assert(rate.step === step)
assert(rate.inputSize === inputSize)
assert(rate.intermediateSize === intermediateSize)
assert(rate.outputSize === outputSize)
}

private def getValue(ts: TimeSeriesMessage): Double = {
ts.data match {
case ArrayData(vs) =>
Expand Down Expand Up @@ -149,6 +179,38 @@ class FinalExprEvalSuite extends AnyFunSuite {
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}

val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a")
assert(dataRateMsgs.size == 3)
val expectedSizes = Array(
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
)
)
dataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * (i + 1),
60000,
expectedSizes(i)(0),
expectedSizes(i)(1),
expectedSizes(i)(2)
)
})
}

test("aggregate with multiple datapoints per group") {
Expand Down Expand Up @@ -182,6 +244,38 @@ class FinalExprEvalSuite extends AnyFunSuite {
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}

val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a")
assert(dataRateMsgs.size == 3)
val expectedSizes = Array(
Array(
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1, Map(expr.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(3, Map(expr.toString -> 3)),
EvalDataSize(3, Map(expr.toString -> 3)),
EvalDataSize(1)
),
Array(
EvalDataSize(2, Map(expr.toString -> 2)),
EvalDataSize(2, Map(expr.toString -> 2)),
EvalDataSize(1)
)
)
dataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * (i + 1),
60000,
expectedSizes(i)(0),
expectedSizes(i)(1),
expectedSizes(i)(2)
)
})
}

test("aggregate with multiple expressions") {
Expand Down Expand Up @@ -222,6 +316,65 @@ class FinalExprEvalSuite extends AnyFunSuite {
else
checkValue(actual, expectedTimeseries2.dequeue())
}

val expr1DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a")
assert(expr1DataRateMsgs.size == 3)
val expr1ExpectedSizes = Array(
Array(
EvalDataSize(1, Map(expr1.toString -> 1)),
EvalDataSize(1, Map(expr1.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(2, Map(expr1.toString -> 2)),
EvalDataSize(2, Map(expr1.toString -> 2)),
EvalDataSize(1)
),
Array(
EvalDataSize(1, Map(expr1.toString -> 1)),
EvalDataSize(1, Map(expr1.toString -> 1)),
EvalDataSize(1)
)
)
expr1DataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * i,
60000,
expr1ExpectedSizes(i)(0),
expr1ExpectedSizes(i)(1),
expr1ExpectedSizes(i)(2)
)
})

val expr2DataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "b")
assert(expr2DataRateMsgs.size == 2)
val expr2ExpectedSizes = Array(
Array(
EvalDataSize(1, Map(expr2.toString -> 1)),
EvalDataSize(1, Map(expr2.toString -> 1)),
EvalDataSize(1)
),
Array(
EvalDataSize(2, Map(expr2.toString -> 2)),
EvalDataSize(2, Map(expr2.toString -> 2)),
EvalDataSize(1)
)
)
expr2DataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * (i + 1),
60000,
expr2ExpectedSizes(i)(0),
expr2ExpectedSizes(i)(1),
expr2ExpectedSizes(i)(2)
)
})
}

// https://github.com/Netflix/atlas/issues/693
Expand Down Expand Up @@ -265,6 +418,38 @@ class FinalExprEvalSuite extends AnyFunSuite {
checkValue(ts, 21.0)
}
}

val dataRateMsgs = output.filter(isEvalDataRate).filter(_.getId == "a")
assert(dataRateMsgs.size == 3)
val expectedSizes = Array(
Array(
EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)),
EvalDataSize(3, Map(expr1.toString -> 1, expr2.toString -> 2)),
EvalDataSize(1)
),
Array(
EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)),
EvalDataSize(4, Map(expr1.toString -> 2, expr2.toString -> 2)),
EvalDataSize(2)
),
Array(
EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)),
EvalDataSize(3, Map(expr1.toString -> 2, expr2.toString -> 1)),
EvalDataSize(1)
)
)
dataRateMsgs.zipWithIndex.foreach(envAndIndex => {
val rate = getAsEvalDataRate(envAndIndex._1)
val i = envAndIndex._2
checkRate(
rate,
60000 * i,
60000,
expectedSizes(i)(0),
expectedSizes(i)(1),
expectedSizes(i)(2)
)
})
}

// https://github.com/Netflix/atlas/issues/762
Expand Down

0 comments on commit 37f93d2

Please sign in to comment.