Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atlas-eval: expose data rate per datasource via diagnostic messages #1192

Merged
merged 5 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions atlas-eval/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,43 @@ Processor<Evaluator.DataSources, Evaluator.MessageEnvelope> processor =
Each `DataSource` for the input has a string id that will be added to corresponding
`MessageEnvelope` objects in the output. This allows the messages to be matched with
the correct input by the user.

### Data Rate Messages
The library also emits data rate messages to help gain insights in the data rate per `DataSource`
per step. There are 3 type of data sizes in a data rate message:
- Input size: number of data points coming in as raw input.
- Intermediate size: number of aggregate data points that are used in the eval step.
- Output size: number of data points for the final result.

The "size" has the 2 fields:
- `total`: total number of data points
- `details`: number of data points per data expression (empty for "outputSize")

For example, given a query of "app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by" with 4 nodes
and 2 clusters, one of the data rate message evelope may look like this:
```json5
{
"id": "_",
"message": {
"type": "rate",
"timestamp": 1596570660000,
"step": 60000,
"inputSize": {
"total": 4,
"details": {
"app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 4
}
},
"intermediateSize": {
"total": 2,
"details": {
"app,www,:eq,name,request,:eq,:and,:sum,(,cluster,),:by": 2
}
},
"outputSize": {
"total": 2,
"details": {}
}
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ object AggrDatapoint {
* instance have the same data expression.
*/
trait Aggregator {
protected[this] var rawDatapointCounter = 0
def numRawDatapoints: Int = rawDatapointCounter
def aggregate(datapoint: AggrDatapoint): Aggregator
def datapoints: List[AggrDatapoint]
}
Expand All @@ -101,9 +103,11 @@ object AggrDatapoint {
extends Aggregator {

private var value = init.value
rawDatapointCounter += 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need increase here because the constructor take 1 Datapoint


override def aggregate(datapoint: AggrDatapoint): Aggregator = {
value = op(value, datapoint.value)
rawDatapointCounter += 1
this
}

Expand All @@ -123,16 +127,20 @@ object AggrDatapoint {

private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = {
datapoint.expr match {
case GroupBy(af: AggregateFunction, _) => new SimpleAggregator(datapoint, af)
case GroupBy(af: AggregateFunction, _) =>
rawDatapointCounter += 1
new SimpleAggregator(datapoint, af)
case _ =>
throw new IllegalArgumentException("datapoint is not for a grouped expression")
}
}

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
aggregators.get(datapoint.tags) match {
case Some(aggr) => aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
case Some(aggr) =>
rawDatapointCounter += 1
aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
}
this
}
Expand All @@ -150,6 +158,7 @@ object AggrDatapoint {

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
values = datapoint :: values
rawDatapointCounter += 1
this
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2014-2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

case class EvalDataRate(
timestamp: Long,
step: Long,
inputSize: EvalDataSize,
intermediateSize: EvalDataSize,
outputSize: EvalDataSize
) extends JsonSupport {
val `type`: String = "rate"
}

case class EvalDataSize(total: Int, details: Map[String, Int] = Map.empty[String, Int])
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import com.netflix.atlas.core.model.DataExpr
* Timestamp that applies to all values within the group.
* @param step
* Step size for the data within this group.
* @param values
* @param dataExprValues
* Values associated with this time.
*/
case class TimeGroup(timestamp: Long, step: Long, values: Map[DataExpr, List[AggrDatapoint]])
case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo])
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2014-2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.util.RefIntHashMap
import com.netflix.atlas.eval.model.EvalDataRate
import com.netflix.atlas.eval.model.EvalDataSize

import scala.collection.mutable

class EvalDataRateCollector(timestamp: Long, step: Long) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to have some tests for this class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be covered by FinalExprEvalSuite


private val inputCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val intermediateCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val outputCounts = new RefIntHashMap[String]

def incrementOutput(id: String, amount: Int): Unit = {
outputCounts.increment(id, amount)
}

def incrementIntermediate(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(intermediateCounts, id, dataExpr, amount)
}

def incrementInput(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(inputCounts, id, dataExpr, amount)
}

def getAll: Map[String, EvalDataRate] = {
inputCounts.map {
case (id, _) => {
id -> EvalDataRate(
timestamp,
step,
getDataRate(inputCounts, id),
getDataRate(intermediateCounts, id),
EvalDataSize(outputCounts.get(id, 0))
)
}
}.toMap
}

private def getDataRate(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String
): EvalDataSize = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) =>
var total = 0
val builder = Map.newBuilder[String, Int]
v.foreach { (dataExpr, count) =>
builder += dataExpr.toString -> count
total += count
}
EvalDataSize(total, builder.result())
case None => EvalDataRateCollector.EmptyRate
}
}

private def increment(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String,
dataExpr: DataExpr,
amount: Int
): Unit = {
counts.getOrElseUpdate(id, new RefIntHashMap[DataExpr]).increment(dataExpr, amount)
}
}

object EvalDataRateCollector {
val EmptyRate = EvalDataSize(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import com.netflix.atlas.akka.StreamOps
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.stream.EurekaSource.Instance
import com.netflix.atlas.eval.stream.Evaluator.DataSource
Expand Down Expand Up @@ -303,7 +304,7 @@ private[stream] abstract class EvaluatorImpl(
}

private def toTimeGroup(step: Long, exprs: List[DataExpr], group: DatapointGroup): TimeGroup = {
val values = group.getDatapoints.asScala.zipWithIndex
val valuesInfo = group.getDatapoints.asScala.zipWithIndex
.flatMap {
case (d, i) =>
val tags = d.getTags.asScala.toMap
Expand All @@ -321,8 +322,8 @@ private[stream] abstract class EvaluatorImpl(
}
}
.groupBy(_.expr)
.map(t => t._1 -> AggrDatapoint.aggregate(t._2.toList))
TimeGroup(group.getTimestamp, step, values)
.map(t => t._1 -> AggrValuesInfo(AggrDatapoint.aggregate(t._2.toList), t._2.size))
TimeGroup(group.getTimestamp, step, valuesInfo)
}

@scala.annotation.tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.netflix.atlas.eval.stream

import java.time.Instant

import akka.NotUsed
import akka.http.scaladsl.model.Uri
import akka.stream.Attributes
Expand All @@ -42,6 +40,8 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope
import com.typesafe.scalalogging.StrictLogging

import scala.collection.mutable

/**
* Takes the set of data sources and time grouped partial aggregates as input and performs
* the final evaluation step.
Expand Down Expand Up @@ -73,6 +73,9 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
// the data for it
private var recipients = List.empty[(StyleExpr, List[String])]

// Track the set of DataExprs per DataSource
private var dataSourceIdToDataExprs = Map.empty[String, Set[DataExpr]]

// Empty data map used as base to account for expressions that do not have any
// matches for a given time interval
private var noData = Map.empty[DataExpr, List[TimeSeries]]
Expand Down Expand Up @@ -110,9 +113,23 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
}
}
.groupBy(_._1)
.map(t => t._1 -> t._2.map(_._2).toList)
.map(t => t._1 -> t._2.map(_._2))
.toList

dataSourceIdToDataExprs = recipients
.flatMap(styleExprAndIds =>
styleExprAndIds._2.map(id => id -> styleExprAndIds._1.expr.dataExprs.toSet)
)
// Fold to mutable map to avoid creating new Map on every update
.foldLeft(mutable.Map.empty[String, Set[DataExpr]]) {
case (map, (id, dataExprs)) => {
map += map.get(id).fold(id -> dataExprs) { vs =>
id -> (dataExprs ++ vs)
}
}
}
.toMap

// Cleanup state for any expressions that are no longer needed
val removed = previous.keySet -- recipients.map(_._1).toSet
removed.foreach { expr =>
Expand Down Expand Up @@ -149,50 +166,62 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
private def handleData(group: TimeGroup): Unit = {
// Finalize the DataExprs, needed as input for further evaluation
val timestamp = group.timestamp
val groupedDatapoints = group.values
val groupedDatapoints = group.dataExprValues

val expressionDatapoints = noData ++ groupedDatapoints.map {
val dataExprToDatapoints = noData ++ groupedDatapoints.map {
case (k, vs) =>
k -> AggrDatapoint.aggregate(vs).map(_.toTimeSeries)
k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries)
}
val expressionDiagnostics = groupedDatapoints.map {
case (k, vs) =>
val t = Instant.ofEpochMilli(timestamp)
k -> DiagnosticMessage.info(s"$t: ${vs.length} input datapoints for [$k]")

// Collect input and intermediate data size per DataSource
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) =>
dataExprSet.foreach(dataExpr => {
group.dataExprValues.get(dataExpr).foreach { info =>
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
})
}

// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (expr, ids) =>
case (styleExpr, ids) =>
Copy link
Contributor Author

@jfz jfz Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed for easier to follow, I was a little confused when first saw expr.expr.dataExprs

// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(expr, IdentityMap.empty[StatefulExpr, Any])
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
val context = EvalContext(timestamp, timestamp + step, step, state)
try {
val result = expr.expr.eval(context, expressionDatapoints)
states(expr) = result.state
val data = if (result.data.isEmpty) List(noData(expr)) else result.data
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data
val msgs = data.map { t =>
TimeSeriesMessage(expr, context, t.withLabel(expr.legend(t)))
TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t)))
}

val diagnostics = expr.expr.dataExprs.flatMap(expressionDiagnostics.get)
// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

ids.flatMap { id =>
(msgs ++ diagnostics).map { msg =>
msgs.map { msg =>
new MessageEnvelope(id, msg)
}
}
} catch {
case e: Exception =>
val msg = error(expr.toString, "final eval failed", e)
val msg = error(styleExpr.toString, "final eval failed", e)
ids.map { id =>
new MessageEnvelope(id, msg)
}
}
}

push(out, Source(output))
val rateMessages = rateCollector.getAll.map {
case (id, rate) => new MessageEnvelope(id, rate)
}.toList

push(out, Source(output ++ rateMessages))
}

override def onPush(): Unit = {
Expand Down
Loading