Skip to content

Commit

Permalink
collect data rate for raw input, intermediate input and final output
Browse files Browse the repository at this point in the history
  • Loading branch information
jfz committed Aug 4, 2020
1 parent e53c4c9 commit daa0c51
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ object DiagnosticMessage {
final val Warning: String = "warn"
final val Error: String = "error"
final val Close: String = "close"
final val Rate: String = "rate"

def info(message: String): DiagnosticMessage = {
DiagnosticMessage(Info, message, None)
Expand All @@ -44,14 +43,6 @@ object DiagnosticMessage {
error(s"${t.getClass.getSimpleName}: ${t.getMessage}")
}

def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = {
DiagnosticMessage(
Rate,
s"dataType=$dataType,count=$count,timestamp=$timestamp,step=$step",
None
)
}

val close: DiagnosticMessage = {
DiagnosticMessage(Close, "operation complete", None)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ object AggrDatapoint {
*/
trait Aggregator {
protected[this] var rawDatapointCounter = 0
def numRawDatapoints: Long = rawDatapointCounter
def numRawDatapoints: Int = rawDatapointCounter
def aggregate(datapoint: AggrDatapoint): Aggregator
def datapoints: List[AggrDatapoint]
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2014-2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.model

import com.netflix.atlas.json.JsonSupport

case class EvalDataRate(
timestamp: Long,
step: Long,
inputSize: EvalDataSize,
intermediateSize: EvalDataSize,
outputSize: EvalDataSize
) extends JsonSupport {
val `type`: String = "rate"
}

case class EvalDataSize(total: Int, details: Map[String, Int] = Map.empty[String, Int])
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ import com.netflix.atlas.core.model.DataExpr
* Values associated with this time.
*/
case class TimeGroup(timestamp: Long, step: Long, dataExprValues: Map[DataExpr, AggrValuesInfo])
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Long)
case class AggrValuesInfo(values: List[AggrDatapoint], numRawDatapoints: Int)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2014-2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.atlas.eval.stream

import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.util.RefIntHashMap
import com.netflix.atlas.eval.model.EvalDataRate
import com.netflix.atlas.eval.model.EvalDataSize

import scala.collection.mutable

class EvalDataRateCollector(timestamp: Long, step: Long) {

private val inputCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val intermediateCounts = mutable.Map.empty[String, RefIntHashMap[DataExpr]]
private val outputCounts = new RefIntHashMap[String]

def incrementOutput(id: String, amount: Int): Unit = {
outputCounts.increment(id, amount)
}

def incrementIntermediate(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(intermediateCounts, id, dataExpr, amount)
}

def incrementInput(id: String, dataExpr: DataExpr, amount: Int): Unit = {
increment(inputCounts, id, dataExpr, amount)
}

def getAll: Map[String, EvalDataRate] = {
inputCounts.map {
case (id, _) => {
id -> EvalDataRate(
timestamp,
step,
getDataRate(inputCounts, id),
getDataRate(intermediateCounts, id),
EvalDataSize(outputCounts.get(id, 0))
)
}
}.toMap
}

private def getDataRate(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String
): EvalDataSize = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) =>
var total = 0
val builder = Map.newBuilder[String, Int]
v.foreach { (dataExpr, count) =>
builder += dataExpr.toString -> count
total += count
}
EvalDataSize(total, builder.result())
case None => EvalDataRateCollector.EmptyRate
}
}

private def increment(
counts: mutable.Map[String, RefIntHashMap[DataExpr]],
id: String,
dataExpr: DataExpr,
amount: Int
): Unit = {
counts.get(id) match {
case Some(v: RefIntHashMap[DataExpr]) => v.increment(dataExpr, amount)
case None =>
val temp = new RefIntHashMap[DataExpr]
temp.increment(dataExpr, amount)
counts += (id -> temp)
}
}
}

object EvalDataRateCollector {
val EmptyRate = EvalDataSize(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.netflix.atlas.eval.stream

import java.time.Instant

import akka.NotUsed
import akka.http.scaladsl.model.Uri
import akka.stream.Attributes
Expand Down Expand Up @@ -171,13 +169,21 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
k -> AggrDatapoint.aggregate(vs.values).map(_.toTimeSeries)
}

val dataExprToDiagnostics = groupedDatapoints.map {
case (k, vs) =>
val t = Instant.ofEpochMilli(timestamp)
k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]")
// Collect input and intermediate data size per DataSource
val rateCollector = new EvalDataRateCollector(timestamp, step)
dataSourceIdToDataExprs.foreach {
case (id, dataExprSet) => {
dataExprSet.foreach(dataExpr => {
group.dataExprValues.get(dataExpr).foreach { info =>
{
rateCollector.incrementInput(id, dataExpr, info.numRawDatapoints)
rateCollector.incrementIntermediate(id, dataExpr, info.values.size)
}
}
})
}
}

val finalDataRateMap = mutable.Map.empty[String, Long]
// Generate the time series and diagnostic output
val output = recipients.flatMap {
case (styleExpr, ids) =>
Expand All @@ -193,18 +199,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
TimeSeriesMessage(styleExpr, context, t.withLabel(styleExpr.legend(t)))
}

val diagnostics = styleExpr.expr.dataExprs.flatMap(dataExprToDiagnostics.get)

// Collect final output data count
ids.foreach(id =>
finalDataRateMap.get(id) match {
case Some(count) => finalDataRateMap.update(id, count + data.length)
case None => finalDataRateMap += id -> data.length
}
)
// Collect final data size per DataSource
ids.foreach(rateCollector.incrementOutput(_, data.size))

ids.flatMap { id =>
(msgs ++ diagnostics).map { msg =>
msgs.map { msg =>
new MessageEnvelope(id, msg)
}
}
Expand All @@ -217,34 +216,11 @@ private[stream] class FinalExprEval(interpreter: ExprInterpreter)
}
}

// Raw data rate for each DataSource
val rawDataRateMessages = dataSourceIdToDataExprs.map(kv =>
new MessageEnvelope(
kv._1,
DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step)
)
)

// Final output data rate for each DataSource
val finalDataRateMessages = finalDataRateMap.map(kv => {
new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step))
})

push(out, Source(output ++ rawDataRateMessages ++ finalDataRateMessages))
}
val rateMessages = rateCollector.getAll.map {
case (id, rate) => new MessageEnvelope(id, rate)
}.toList

private def getNumRawDatapoints(
dataExprs: mutable.Set[DataExpr],
timeGroup: TimeGroup
): Long = {
dataExprs
.map(dataExpr => {
timeGroup.dataExprValues.get(dataExpr) match {
case Some(v) => v.numRawDatapoints
case None => 0
}
})
.sum
push(out, Source(output ++ rateMessages))
}

override def onPush(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import com.netflix.atlas.akka.DiagnosticMessage
import com.netflix.atlas.core.model.DataExpr
import com.netflix.atlas.core.model.MathExpr
import com.netflix.atlas.core.model.Query
Expand Down Expand Up @@ -93,7 +92,7 @@ class FinalExprEvalSuite extends AnyFunSuite {
TimeGroup(0L, step, Map.empty)
)
val output = run(input)
assert(output.size === 3)
assert(output.size === 1)

val tsMsgs = output.filter(isTimeSeries)
assert(tsMsgs.size === 1)
Expand All @@ -110,17 +109,6 @@ class FinalExprEvalSuite extends AnyFunSuite {
}
}

private def isDiagnosticMessageWith(
messageEnvelope: MessageEnvelope,
theType: String,
keyword: String
): Boolean = {
messageEnvelope.getMessage match {
case DiagnosticMessage(tp, msg, _) if tp == theType && msg.contains(keyword) => true
case _ => false
}
}

private def getValue(ts: TimeSeriesMessage): Double = {
ts.data match {
case ArrayData(vs) =>
Expand Down Expand Up @@ -161,18 +149,6 @@ class FinalExprEvalSuite extends AnyFunSuite {
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}

val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints for"))
assert(diagnostics.size === 3)
val expectedDiagnostics = List(
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr]"),
DiagnosticMessage.info(s"1970-01-01T00:03:00Z: 1 input datapoints for [$expr]")
)
diagnostics.zip(expectedDiagnostics).foreach {
case (actual, expected) =>
assert(actual.getMessage === expected)
}
}

test("aggregate with multiple datapoints per group") {
Expand Down Expand Up @@ -206,19 +182,6 @@ class FinalExprEvalSuite extends AnyFunSuite {
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage]
checkValue(ts, expectedValue)
}

val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints"))
assert(diagnostics.size === 3)
val expectedDiagnostics = List(
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 3 input datapoints for [$expr]"),
DiagnosticMessage.info(s"1970-01-01T00:03:00Z: 2 input datapoints for [$expr]")
)
diagnostics.zip(expectedDiagnostics).foreach {
case (actual, expected) =>
assert(actual.getMessage === expected)
}

}

test("aggregate with multiple expressions") {
Expand Down Expand Up @@ -259,26 +222,6 @@ class FinalExprEvalSuite extends AnyFunSuite {
else
checkValue(actual, expectedTimeseries2.dequeue())
}

val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints"))
assert(diagnostics.size === 3 + 2) // 3 for datasource a, 2 for datasource b

val expectedDiagnostics1 = scala.collection.mutable.Queue(
DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 1 input datapoints for [$expr1]"),
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr1]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr1]")
)
val expectedDiagnostics2 = scala.collection.mutable.Queue(
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 1 input datapoints for [$expr2]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 2 input datapoints for [$expr2]")
)
diagnostics.foreach { env =>
val actual = env.getMessage.asInstanceOf[DiagnosticMessage]
if (env.getId == "a")
assert(actual === expectedDiagnostics1.dequeue())
else
assert(actual === expectedDiagnostics2.dequeue())
}
}

// https://github.com/Netflix/atlas/issues/693
Expand Down Expand Up @@ -322,22 +265,6 @@ class FinalExprEvalSuite extends AnyFunSuite {
checkValue(ts, 21.0)
}
}

val diagnostics = output.filter(isDiagnosticMessageWith(_, "info", "input datapoints"))
assert(diagnostics.size === 6)

val expectedDiagnostics = List(
DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 1 input datapoints for [$expr1]"),
DiagnosticMessage.info(s"1970-01-01T00:00:00Z: 2 input datapoints for [$expr2]"),
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr1]"),
DiagnosticMessage.info(s"1970-01-01T00:01:00Z: 2 input datapoints for [$expr2]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 2 input datapoints for [$expr1]"),
DiagnosticMessage.info(s"1970-01-01T00:02:00Z: 1 input datapoints for [$expr2]")
)
diagnostics.zip(expectedDiagnostics).foreach {
case (actual, expected) =>
assert(actual.getMessage === expected)
}
}

// https://github.com/Netflix/atlas/issues/762
Expand Down

0 comments on commit daa0c51

Please sign in to comment.