Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

atlas-eval: expose data rate per datasource via diagnostic messages #1192

Merged
merged 5 commits into from
Aug 5, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object DiagnosticMessage {
final val Warning: String = "warn"
final val Error: String = "error"
final val Close: String = "close"
final val Rate: String = "rate"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like this is used anymore, it can be removed.


def info(message: String): DiagnosticMessage = {
DiagnosticMessage(Info, message, None)
Expand All @@ -43,6 +44,14 @@ object DiagnosticMessage {
error(s"${t.getClass.getSimpleName}: ${t.getMessage}")
}

def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems pretty specific to the eval library. Could we just use an info message with the right information encoded?

DiagnosticMessage(
Rate,
s"dataType=$dataType,count=$count,timestamp=$timestamp,step=$step",
None
)
}

val close: DiagnosticMessage = {
DiagnosticMessage(Close, "operation complete", None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ object AggrDatapoint {
* instance have the same data expression.
*/
trait Aggregator {
protected[this] var rawDatapointCounter = 0
def numRawDatapoints: Long = rawDatapointCounter
def aggregate(datapoint: AggrDatapoint): Aggregator
def datapoints: List[AggrDatapoint]
}
Expand All @@ -101,9 +103,11 @@ object AggrDatapoint {
extends Aggregator {

private var value = init.value
rawDatapointCounter += 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need increase here because the constructor take 1 Datapoint


override def aggregate(datapoint: AggrDatapoint): Aggregator = {
value = op(value, datapoint.value)
rawDatapointCounter += 1
this
}

Expand All @@ -123,16 +127,20 @@ object AggrDatapoint {

private def newAggregator(datapoint: AggrDatapoint): SimpleAggregator = {
datapoint.expr match {
case GroupBy(af: AggregateFunction, _) => new SimpleAggregator(datapoint, af)
case GroupBy(af: AggregateFunction, _) =>
rawDatapointCounter += 1
new SimpleAggregator(datapoint, af)
case _ =>
throw new IllegalArgumentException("datapoint is not for a grouped expression")
}
}

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
aggregators.get(datapoint.tags) match {
case Some(aggr) => aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
case Some(aggr) =>
rawDatapointCounter += 1
aggr.aggregate(datapoint)
case None => aggregators.put(datapoint.tags, newAggregator(datapoint))
}
this
}
Expand All @@ -150,6 +158,7 @@ object AggrDatapoint {

override def aggregate(datapoint: AggrDatapoint): Aggregator = {
values = datapoint :: values
rawDatapointCounter += 1
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ import com.netflix.atlas.core.model.DataExpr
* Timestamp that applies to all values within the group.
* @param step
* Step size for the data within this group.
* @param values
* @param dataExprValues
* Values associated with this time.
*/
case class TimeGroup(timestamp: Long, step: Long, values: Map[DataExpr, List[AggrDatapoint]])
case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo])
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Long)
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import com.netflix.atlas.akka.StreamOps
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.stream.EurekaSource.Instance
import com.netflix.atlas.eval.stream.Evaluator.DataSource
Expand Down Expand Up @@ -303,7 +304,7 @@ private[stream] abstract class EvaluatorImpl(
}

private def toTimeGroup(step: Long, exprs: List[DataExpr], group: DatapointGroup): TimeGroup = {
val values = group.getDatapoints.asScala.zipWithIndex
val valuesInfo = group.getDatapoints.asScala.zipWithIndex
.flatMap {
case (d, i) =>
val tags = d.getTags.asScala.toMap
Expand All @@ -321,8 +322,8 @@ private[stream] abstract class EvaluatorImpl(
}
}
.groupBy(_.expr)
.map(t => t._1 -> AggrDatapoint.aggregate(t._2.toList))
TimeGroup(group.getTimestamp, step, values)
.map(t => t._1 -> AggrValuesInfo(AggrDatapoint.aggregate(t._2.toList), t._2.size))
TimeGroup(group.getTimestamp, step, valuesInfo)
}

@scala.annotation.tailrec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources
import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope
import com.typesafe.scalalogging.StrictLogging

import scala.collection.mutable

/**
* Takes the set of data sources and time grouped partial aggregates as input and performs
* the final evaluation step.
Expand Down Expand Up @@ -73,6 +75,9 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
// the data for it
private var recipients = List.empty[(StyleExpr, List[String])]

// Track the set of DataExprs per DataSource
private var dataSourceIdToDataExprs = mutable.Map.empty[String, mutable.Set[DataExpr]]

// Empty data map used as base to account for expressions that do not have any
// matches for a given time interval
private var noData = Map.empty[DataExpr, List[TimeSeries]]
Expand Down Expand Up @@ -110,9 +115,19 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
}
}
.groupBy(_._1)
.map(t => t._1 -> t._2.map(_._2).toList)
.map(t => t._1 -> t._2.map(_._2))
.toList

// Compute the map from DataSource id to DataExprs
dataSourceIdToDataExprs = mutable.Map.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be refactored to keep it immutable since it is never modified later, something like:

      dataSourceIdToDataExprs = recipients
          .flatMap {
            case (styleExpr, ids) => ids.map(_ -> styleExpr.expr.dataExprs.toSet)
          }
          .foldLeft(Map.empty[String, Set[DataExpr]]) {
            case (acc, (id, dataExprs)) =>
              acc + acc.get(id).fold(id -> dataExprs) { vs => id -> (dataExprs ++ vs) }
          }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, made the change with one difference: made the init value of foldLeft mutable map to avoid creating new Map on every update

recipients.foreach(kv => {
kv._2.foreach(id => {
val dataExprSet = dataSourceIdToDataExprs.getOrElse(id, mutable.Set.empty)
dataExprSet.addAll(kv._1.expr.dataExprs)
dataSourceIdToDataExprs(id) = dataExprSet
})
})

// Cleanup state for any expressions that are no longer needed
val removed = previous.keySet -- recipients.map(_._1).toSet
removed.foreach { expr =>
Expand Down Expand Up @@ -149,34 +164,44 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
private def handleData(group: TimeGroup): Unit = {
// Finalize the DataExprs, needed as input for further evaluation
val timestamp = group.timestamp
val groupedDatapoints = group.values
val groupedDatapoints = group.dataExprValues

val expressionDatapoints = noData ++ groupedDatapoints.map {
val dataExprToDatapoints = noData ++ groupedDatapoints.map {
case (k, vs) =>
k -> AggrDatapoint.aggregate(vs).map(_.toTimeSeries)
k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries)
}
val expressionDiagnostics = groupedDatapoints.map {

val dataExprToDiagnostics = groupedDatapoints.map {
case (k, vs) =>
val t = Instant.ofEpochMilli(timestamp)
k -> DiagnosticMessage.info(s"$t: ${vs.length} input datapoints for [$k]")
k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure this is still needed. I think we can refactor it to be consistent with the new messages or remove if it is redundant.

}

val finalDataRateMap = mutable.Map.empty[String, Long]
// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (expr, ids) =>
case (styleExpr, ids) =>
Copy link
Contributor Author

@jfz jfz Jul 30, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed for easier to follow, I was a little confused when first saw expr.expr.dataExprs

// Use an identity map for the state to ensure that multiple equivalent stateful
// expressions, e.g. derivative(a) + derivative(a), will have isolated state.
val state = states.getOrElse(expr, IdentityMap.empty[StatefulExpr, Any])
val state = states.getOrElse(styleExpr, IdentityMap.empty[StatefulExpr, Any])
val context = EvalContext(timestamp, timestamp + step, step, state)
try {
val result = expr.expr.eval(context, expressionDatapoints)
states(expr) = result.state
val data = if (result.data.isEmpty) List(noData(expr)) else result.data
val result = styleExpr.expr.eval(context, dataExprToDatapoints)
states(styleExpr) = result.state
val data = if (result.data.isEmpty) List(noData(styleExpr)) else result.data
val msgs = data.map { t =>
TimeSeriesMessage(expr, context, t.withLabel(expr.legend(t)))
TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t)))
}

val diagnostics = expr.expr.dataExprs.flatMap(expressionDiagnostics.get)
val diagnostics = styleExpr.expr.dataExprs.flatMap(dataExprToDiagnostics.get)

// Collect final output data count
ids.foreach(id =>
finalDataRateMap.get(id) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using RefIntHashMap would be more efficient here. For our current uses it is unlikely we would overflow the int, though we could create a long specialization if needed.

case Some(count) => finalDataRateMap.update(id, count + data.length)
case None => finalDataRateMap += id -> data.length
}
)

ids.flatMap { id =>
(msgs ++ diagnostics).map { msg =>
Expand All @@ -185,14 +210,41 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
}
} catch {
case e: Exception =>
val msg = error(expr.toString, "final eval failed", e)
val msg = error(styleExpr.toString, "final eval failed", e)
ids.map { id =>
new MessageEnvelope(id, msg)
}
}
}

push(out, Source(output))
// Raw data rate for each DataSource
val rawDataRateMessages = dataSourceIdToDataExprs.map(kv =>
new MessageEnvelope(
kv._1,
DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to output for each data expr? If it is a complex expression with many parts, then that would allow them to see which sub-expressions are contributing the most to the cost.

)
)

// Final output data rate for each DataSource
val finalDataRateMessages = finalDataRateMap.map(kv => {
new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could already be computed by the user with counting the datapoints. I think we are missing the intermediate size that the user couldn't compute. So we would have:

  1. Input size: number of data points coming in as raw input.
  2. Intermediate size: number of aggregate data points that are used in the eval step. This is not currently reported by this change.
  3. Output size: number of data points for the final result.

The first two could potentially be reported per data expression. Since we expect the user to pay attention to these data points and process them, we might want to make them a standard JSON payload rather than comma separated string so they are easy to consume.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"intermediate size" looks like internal implementation detail, I wonder how is "intermediate size" useful to users, is it actionable without changing raw input size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an example consider this expression:

nf.app,www,:eq,
name,foo,:eq,:and,
(,id,),:by,

nf.app,www,:eq,
(,id,),:by,
1,:gt,

:mul

This is similar to a pattern we saw recently. In this case the second clause that only restricts by app had a high input size and intermediate size. A slight variation would be:

nf.app,www,:eq,
name,foo,:eq,:and,
(,id,),:by,

nf.app,www,:eq,
name,bar,:eq,:and,
(,id,),:by,
1,:gt,

:mul

Lets suppose that the input size is the same for both foo and bar. However, bar has many distinct values for id, but foo only has a few. The output size will only indicate the set that are in common between both. The difference in potential processing cost and memory usage, something we would want to support limits on, would be based on the intermediate size for the group by. Also if there is a concern about the cost, this is the difference that would tell you which clause is leading to higher cost as the input size will be the same.

})

push(out, Source(output ++ rawDataRateMessages ++ finalDataRateMessages))
}

private def getNumRawDatapoints(
dataExprs: mutable.Set[DataExpr],
timeGroup: TimeGroup
): Long = {
dataExprs
.map(dataExpr => {
timeGroup.dataExprValues.get(dataExpr) match {
case Some(v) => v.numRawDatapoints
case None => 0
}
})
.sum
}

override def onPush(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import akka.stream.stage.InHandler
import akka.stream.stage.OutHandler
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.TimeGroup

/**
Expand Down Expand Up @@ -107,13 +108,20 @@ private[stream] class TimeGrouped(
* so it can be used for a new time window.
*/
private def flush(i: Int): Unit = {
val vs = buf(i).map(t => t._1 -> t._2.datapoints).toMap
val t = timestamps(i)
if (t > 0) push(out, TimeGroup(t, step, vs)) else pull(in)
if (t > 0) push(out, toTimeGroup(t, buf(i))) else pull(in)
cutoffTime = t
buf(i) = new AggrMap
}

private def toTimeGroup(ts: Long, aggrMap: AggrMap): TimeGroup = {
TimeGroup(
ts,
step,
aggrMap.map(t => t._1 -> AggrValuesInfo(t._2.datapoints, t._2.numRawDatapoints)).toMap
)
}

override def onPush(): Unit = {
val v = grab(in)
val t = v.timestamp
Expand Down Expand Up @@ -149,8 +157,7 @@ private[stream] class TimeGrouped(

override def onUpstreamFinish(): Unit = {
val groups = buf.indices.map { i =>
val vs = buf(i).map(t => t._1 -> t._2.datapoints).toMap
TimeGroup(timestamps(i), step, vs)
toTimeGroup(timestamps(i), buf(i))
}.toList
pending = groups.filter(_.timestamp > 0).sortWith(_.timestamp < _.timestamp)
flushPending()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.netflix.atlas.core.model.MathExpr
import com.netflix.atlas.core.model.Query
import com.netflix.atlas.core.model.StatefulExpr
import com.netflix.atlas.eval.model.AggrDatapoint
import com.netflix.atlas.eval.model.AggrValuesInfo
import com.netflix.atlas.eval.model.ArrayData
import com.netflix.atlas.eval.model.TimeGroup
import com.netflix.atlas.eval.model.TimeSeriesMessage
Expand Down Expand Up @@ -67,7 +68,7 @@ class FinalExprEvalSuite extends AnyFunSuite {
val values = vs
.map(_.copy(timestamp = timestamp))
.groupBy(_.expr)
.map(t => t._1 -> t._2.toList)
.map(t => t._1 -> AggrValuesInfo(t._2.toList, t._2.size))
.toMap
TimeGroup(timestamp, step, values)
}
Expand Down
Loading