-
Notifications
You must be signed in to change notification settings - Fork 305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
atlas-eval: expose data rate per datasource via diagnostic messages #1192
Conversation
// Generate the time series and diagnostic output | ||
val output = recipients.flatMap { | ||
case (expr, ids) => | ||
case (styleExpr, ids) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed for easier to follow, I was a little confused when first saw expr.expr.dataExprs
@@ -101,9 +103,11 @@ object AggrDatapoint { | |||
extends Aggregator { | |||
|
|||
private var value = init.value | |||
rawDatapointCounter += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need increase here because the constructor take 1 Datapoint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did an initial pass, will take another look tomorrow. Once we have ironed out the message format, can we update the README for the eval project to include details on the messages?
@@ -43,6 +44,14 @@ object DiagnosticMessage { | |||
error(s"${t.getClass.getSimpleName}: ${t.getMessage}") | |||
} | |||
|
|||
def rate(dataType: String, count: Long, timestamp: Long, step: Long): DiagnosticMessage = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems pretty specific to the eval library. Could we just use an info message with the right information encoded?
case (k, vs) => | ||
val t = Instant.ofEpochMilli(timestamp) | ||
k -> DiagnosticMessage.info(s"$t: ${vs.length} input datapoints for [$k]") | ||
k -> DiagnosticMessage.info(s"$t: ${vs.values.length} input datapoints for [$k]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is still needed. I think we can refactor it to be consistent with the new messages or remove if it is redundant.
val rawDataRateMessages = dataSourceIdToDataExprs.map(kv => | ||
new MessageEnvelope( | ||
kv._1, | ||
DiagnosticMessage.rate("raw", getNumRawDatapoints(kv._2, group), group.timestamp, step) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to output for each data expr? If it is a complex expression with many parts, then that would allow them to see which sub-expressions are contributing the most to the cost.
|
||
// Collect final output data count | ||
ids.foreach(id => | ||
finalDataRateMap.get(id) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using RefIntHashMap
would be more efficient here. For our current uses it is unlikely we would overflow the int, though we could create a long specialization if needed.
|
||
// Final output data rate for each DataSource | ||
val finalDataRateMessages = finalDataRateMap.map(kv => { | ||
new MessageEnvelope(kv._1, DiagnosticMessage.rate("final", kv._2, group.timestamp, step)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could already be computed by the user with counting the datapoints. I think we are missing the intermediate size that the user couldn't compute. So we would have:
- Input size: number of data points coming in as raw input.
- Intermediate size: number of aggregate data points that are used in the eval step. This is not currently reported by this change.
- Output size: number of data points for the final result.
The first two could potentially be reported per data expression. Since we expect the user to pay attention to these data points and process them, we might want to make them a standard JSON payload rather than comma separated string so they are easy to consume.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"intermediate size" looks like internal implementation detail, I wonder how is "intermediate size" useful to users, is it actionable without changing raw input size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an example consider this expression:
nf.app,www,:eq,
name,foo,:eq,:and,
(,id,),:by,
nf.app,www,:eq,
(,id,),:by,
1,:gt,
:mul
This is similar to a pattern we saw recently. In this case the second clause that only restricts by app had a high input size and intermediate size. A slight variation would be:
nf.app,www,:eq,
name,foo,:eq,:and,
(,id,),:by,
nf.app,www,:eq,
name,bar,:eq,:and,
(,id,),:by,
1,:gt,
:mul
Lets suppose that the input size is the same for both foo
and bar
. However, bar
has many distinct values for id
, but foo
only has a few. The output size will only indicate the set that are in common between both. The difference in potential processing cost and memory usage, something we would want to support limits on, would be based on the intermediate size for the group by. Also if there is a concern about the cost, this is the difference that would tell you which clause is leading to higher cost as the input size will be the same.
@@ -186,19 +182,6 @@ class FinalExprEvalSuite extends AnyFunSuite { | |||
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] | |||
checkValue(ts, expectedValue) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBD - Will add some validation for the new data rate messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is still pending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added validations
627c8c3
to
cef982c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. Would be nice to improve the test coverage for the new messages and update the readme to give an overview.
@@ -26,6 +26,7 @@ object DiagnosticMessage { | |||
final val Warning: String = "warn" | |||
final val Error: String = "error" | |||
final val Close: String = "close" | |||
final val Rate: String = "rate" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like this is used anymore, it can be removed.
dataExpr: DataExpr, | ||
amount: Int | ||
): Unit = { | ||
counts.get(id) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be simplified to:
counts.getOrElseUpdate(id, new RefIntHashMap[DataExpr]).increment(dataExpr, amount)
|
||
import scala.collection.mutable | ||
|
||
class EvalDataRateCollector(timestamp: Long, step: Long) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to have some tests for this class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be covered by FinalExprEvalSuite
.toList | ||
|
||
// Compute the map from DataSource id to DataExprs | ||
dataSourceIdToDataExprs = mutable.Map.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be refactored to keep it immutable since it is never modified later, something like:
dataSourceIdToDataExprs = recipients
.flatMap {
case (styleExpr, ids) => ids.map(_ -> styleExpr.expr.dataExprs.toSet)
}
.foldLeft(Map.empty[String, Set[DataExpr]]) {
case (acc, (id, dataExprs)) =>
acc + acc.get(id).fold(id -> dataExprs) { vs => id -> (dataExprs ++ vs) }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good, made the change with one difference: made the init value of foldLeft mutable map to avoid creating new Map on every update
case (id, dataExprSet) => { | ||
dataExprSet.foreach(dataExpr => { | ||
group.dataExprValues.get(dataExpr).foreach { info => | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This additional level of braces is unnecessary.
@@ -186,19 +182,6 @@ class FinalExprEvalSuite extends AnyFunSuite { | |||
val ts = env.getMessage.asInstanceOf[TimeSeriesMessage] | |||
checkValue(ts, expectedValue) | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this is still pending
37f93d2
to
5118510
Compare
Expose raw data rate (from LWC) and final data out put rate for each DataSource.
Raw data rate is collected at TimedGroup for each DataExpr and carry it to down stream, and summarized in FinalExprEval and emit.