Skip to content

Commit

Permalink
collect data rate for raw input, intermediate input and final output
Browse files Browse the repository at this point in the history
  • Loading branch information
jfz committed Aug 1, 2020
1 parent e53c4c9 commit 5015a89
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ object DiagnosticMessage {
error(s"${t.getClass.getSimpleName}: ${t.getMessage}")
}

def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = {
DiagnosticMessage(
Rate,
s"dataType=$dataType,count=$count,timestamp=$timestamp,step=$step",
None
)
}

val close: DiagnosticMessage = {
DiagnosticMessage(Close, "operation complete", None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object AggrDatapoint {
*/
trait Aggregator {
protected[this] var rawDatapointCounter = 0
def numRawDatapoints: Long = rawDatapointCounter
def numRawDatapoints: Int = rawDatapointCounter
def aggregate(datapoint: AggrDatapoint): Aggregator
def datapoints: List[AggrDatapoint]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

case class EvalDataRate(
timestamp: Long,
step: Long,
inputCount: Map[String, Int],
intermediateCount: Map[String, Int],
outputCount: Int
) extends JsonSupport {
val `type`: String = "rate"
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ import com.netflix.atlas.core.model.DataExpr
* Values associated with this time.
*/
case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo])
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Long)
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.util.RefIntHashMap
import com.netflix.atlas.eval.model.EvalDataRate

import scala.collection.mutable

class EvalDataRateCollector(timestamp: Long, step: Long) {

private val inputCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val intermediateCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val outputCounts = new RefIntHashMap[String]

def incrementOutput(id: String, amount: Int): Unit = {
outputCounts.increment(id, amount)
}

def incrementIntermediate(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(intermediateCounts, id, dataExpr, amount)
}

def incrementInput(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(inputCounts, id, dataExpr, amount)
}

def getAll: Map[String, EvalDataRate] = {
inputCounts.map {
case (id, _) => {
id -> EvalDataRate(
timestamp,
step,
getDataRate(inputCounts, id),
getDataRate(intermediateCounts, id),
outputCounts.get(id, 0)
)
}
}.toMap
}

private def getDataRate(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String
): Map[String, Int] = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) =>
var total = 0
val builder = Map.newBuilder[String, Int]
v.foreach { (dataExpr, count) =>
builder += dataExpr.toString -> count
total += count
}
builder += EvalDataRateCollector.Total -> total
builder.result()
case None => EvalDataRateCollector.EmptyCounts
}
}

private def increment(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String,
dataExpr: DataExpr,
amount: Int
): Unit = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount)
case None =>
val temp = new RefIntHashMap[DataExpr]
temp.increment(dataExpr, amount)
counts += (id -> temp)
}
}

}

object EvalDataRateCollector {
val Total = "total"
val EmptyCounts = Map(Total -> 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.netflix.atlas.eval.stream

import java.time.Instant

import akka.NotUsed
import akka.http.scaladsl.model.Uri
import akka.stream.Attributes
Expand Down Expand Up @@ -171,13 +169,21 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries)
}

val dataExprToDiagnostics = groupedDatapoints.map {
case (k, vs) =>
val t = Instant.ofEpochMilli(timestamp)
k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]")
// Collect input and intermediate data size per DataSource
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) => {
dataExprSet.foreach(dataExpr => {
group.dataExprValues.get(dataExpr) match {
case Some(v) =>
rateCollector.incrementInput(id, dataExpr, v.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, v.values.size)
case None =>
}
})
}
}

val finalDataRateMap = mutable.Map.empty[String, Long]
// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (styleExpr, ids) =>
Expand All @@ -193,18 +199,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t)))
}

val diagnostics = styleExpr.expr.dataExprs.flatMap(dataExprToDiagnostics.get)

// Collect final output data count
ids.foreach(id =>
finalDataRateMap.get(id) match {
case Some(count) => finalDataRateMap.update(id, count + data.length)
case None => finalDataRateMap += id -> data.length
}
)
// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

ids.flatMap { id =>
(msgs ++ diagnostics).map { msg =>
msgs.map { msg =>
new MessageEnvelope(id, msg)
}
}
Expand All @@ -217,34 +216,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
}
}

// Raw data rate for each DataSource
val rawDataRateMessages = dataSourceIdToDataExprs.map(kv =>
new MessageEnvelope(
kv._1,
DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step)
)
)

// Final output data rate for each DataSource
val finalDataRateMessages = finalDataRateMap.map(kv => {
new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step))
})

push(out, Source(output ++ rawDataRateMessages ++ finalDataRateMessages))
}
val rateMessages = rateCollector.getAll.map {
case (id, rate) => new MessageEnvelope(id, rate)
}.toList

private def getNumRawDatapoints(
dataExprs: mutable.Set[DataExpr],
timeGroup: TimeGroup
): Long = {
dataExprs
.map(dataExpr => {
timeGroup.dataExprValues.get(dataExpr) match {
case Some(v) => v.numRawDatapoints
case None => 0
}
})
.sum
push(out, Source(output ++ rateMessages))
}

override def onPush(): Unit = {
Expand Down

0 comments on commit 5015a89

Please sign in to comment.