-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Samza udf metrics #25068
Samza udf metrics #25068
Conversation
- Implement SamzaMetricsBundleProgressHandler - Modify SamzaDoFnRunners to use SamzaMetricsBundleProgressHandler, instead of BundleProgressHandler.ignored() - Add unit tests - Refactor stepName member field in SamzaDoFnRunners
...a/src/main/java/org/apache/beam/runners/samza/runtime/SamzaMetricsBundleProgressHandler.java
Show resolved
Hide resolved
@ryucc this branch has some conflicts, please solve it. |
@@ -388,6 +407,7 @@ private void emitMetrics() { | |||
final long finishBundleTime = System.nanoTime(); | |||
final long averageProcessTime = (finishBundleTime - startBundleTime) / count; | |||
|
|||
String metricName = "ExecutableStage-" + stepName + "-process-ns"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be extracted to an instance variable since this doesn't seem to change after construction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to use instance variables. We already have a large pool of it, and would like to keep only primitive data in there.
On the same note, I want to keep data as local as possible. This metricName
seems only useful for the emitMetrics method at the moment (it's specific to Executable stage time) , and may conflict with other metric names if the future.
I think putting the computation here improves the code cleanness, and out weights the cost to compute it every time.
Map<String, String> transformFullNameToUniqueName = | ||
pTransformNodes.stream() | ||
.collect( | ||
Collectors.toMap( | ||
pTransformNode -> pTransformNode.getId(), | ||
pTransformNode -> pTransformNode.getTransform().getUniqueName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant computation. Why not store this map instead of the Collection<PipelineNode.PTransformNode>
and compute this mapping once during construction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Think it's okay. If we didn't want redundant computation then the above lines
OutputReceiverFactory receiverFactory =
new OutputReceiverFactory() {
@Override
public FnDataReceiver<FnOutT> create(String pCollectionId) {
return (receivedElement) -> {
// handover to queue, do not block the grpc thread
outputQueue.put(KV.of(pCollectionId, receivedElement));
};
}
};
final Coder<BoundedWindow> windowCoder = windowingStrategy.getWindowFn().windowCoder();
final TimerReceiverFactory timerReceiverFactory =
new TimerReceiverFactory(stageBundleFactory, this::timerDataConsumer, windowCoder);
Should all be in the constructor?
I think the philosophy here is to keep the constructor assign only, and not contain any computation.
Code in the constructor should only look like
this.a = a;
this.b = b;
...
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its good practice to keep constructor light weight that said, it doesn't have to always have assignments within it. Some initialization logic is okay. Other benefits doing so,
- it helps building invariants that this field isn't going to change upon construction
- handle to pTransformNode is unnecessary and can potentially be meddled with which will make this piece of non deterministic breaking the assumption that
transformFullNameToUniqueName
changes. - Ideally, you would want only the names to be dependency injected if you don't need the
pTransformNode
at all.
The code you pasted above is mostly blueprint and not performing any computation per-say. local variables are introduced for readability and might as well replace them with the getters of the already existing instance variables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By computation I mean lines that are not pure assignment.
The constructor is usually not unit tested. If we add computation logic on to it, we would want to add a unit test onto it. Ending in something like
class SdkHarnessDoFnRunner {
@VisibleForTesting
public Map<String, String> getTransformFullNameToUniqueName(){...}
}
class SdkHarnessDoFnRunnerTest {
@Test
public void testConstructor() {
SdkHarnessDoFnRunner runner = new SdkHarnessDoFnRunner(...);
assertEquals(runner.getTransformFullNameToUniqueName, expected);
}
}
I'd rather not have these non-assignment lines inside the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by "pTransformNode can be meddled with"? In what cases will that happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the class holds a reference of Collection<pTransformNode>
and this reference can change over the lifecycle of this class and the transform name map that you produce changes everytime when something changes to this reference and thus causing the invariant of passing potentially different metric names upon the callsite.
By enforcing the computation to occur only once at least guarantees we always pass the same transform name maps and hence easy to reason about what the code does and how it behaves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Collection<pTransformNode>
is extracted from the executableStage
, which is parsed from RunnerApi.ExecutableStagePayload
.
It should be a specification of the pipeline, and not change while we run it.
If this assumption is wrong, I'll change this mapping assignment.
* @param samzaMetricsContainer The destination for publishing the metrics. | ||
* @param transformIdToUniqueName A mapping from transformId to uniqueName for pTransforms. | ||
*/ | ||
public SamzaMetricsBundleProgressHandler( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggest removing Metrics from the name since this isn't tied to metrics alone rather an implementation of BundleProgressHandler
which can expand beyond metrics handling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding Metrics in the class name is pretty intentional. The name keeps the responsibility smaller. If we want a GeneralBundleProgressHandler, we can chain these smaller ones.
I would prefer not to expand this class beyond metrics handling in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think BundleFactory takes multiple BundleProgressHandler
s. The responsibility of BundleProgressHandler
to listen to the signal of progress and take actions. It is agnostic to what actions are taken within and hence metric isn't part of the implementation.
If you want to take multiple actions within the handler, it could be delegated to multiple components but the one front facing component that bridges the actions and listening is done by this implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can write something like
private List<BundleProgressHandler> handlers;
//Constructor
public GeneralBundleProgressHandler(List<BundleProgressHandler> handlers){
this.handlers = handlers;
}
public void handle(ProcessBundleProgressResponse progress) {
this.handlers.forEach(handler -> handler.handle(progress))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like delegating to multiple components. It's also a possible solution in the future. It's an overkill now since we only have 1 component.
I think the naming with Metric will prevent us from writing all code in 1 handler in the future, and make the developer think about either chaining or using components, whichever way we go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The chained bundle handlers pattern is a bit hard to read and follow because you can have multiple of these handlers.
As opposed to having one handler which delegates to appropriate components as part of handling.
I feel removing metrics from the name doesn't take away any of what its doing and keeps the door open regardless of both of the approaches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Delegating handler pattern has the same problem. You will still inject the components in the constructor, unless you hard code them, which makes it less flexible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think keeping Metric in the class name is a stronger name to push for refactoring, if we ever want to handler to do more stuff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@xinyuiscool @alnzng Please help make a final call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's go with the Metric in the name for now. I agree more with Bharath on this one: having a single general handler here might be easier to understand. Chaining might be an overkill. We can discuss this further and refactor in the future.
* @see | ||
* org.apache.beam.runners.core.metrics.MonitoringInfoMetricName#of(MetricsApi.MonitoringInfo) | ||
*/ | ||
public void onCompleted(BeamFnApi.ProcessBundleResponse response) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are the guarantees on when onCompleted
will be invoked? Are we guaranteed to be invoked regardless of bundle success/failure?
What happens if the bundle is stuck? Should we detect them in which case onProgress
needs to implemented as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
public void onCompleted(BeamFnApi.ProcessBundleResponse response) { | ||
for (MetricsApi.MonitoringInfo monitoringInfo : response.getMonitoringInfosList()) { | ||
if (monitoringInfo.getPayload().isEmpty()) { | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not invert the condition? intermittent control flow makes code hard to read.
even otherwise, i think this should be continue
instead of return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will move the code a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inverted condition, fixed the return bug, and added unit test.
MonitoringInfoConstants.Labels.NAMESPACE, monitoringInfo.getUrn()); | ||
String userMetricName = | ||
monitoringInfo.getLabelsOrDefault( | ||
MonitoringInfoConstants.Labels.NAME, monitoringInfo.getLabelsMap().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
monitoringInfo.getLabelsMap().toString()
doesn't seem like useful default. It depends on toString()
implementation which may potentially change result in change in metric name.
Consider having deterministic way of chaining the labels.
Also, what happens if new labels get added. Would that void the old metric?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This default behavior is copying the Flink implementation. The monitoring infos look like this, and we don't have much to work with. Do you have any suggestions on constructing a metric name, when the NAME
label doesn't exist?
monitoring_infos {
urn: "beam:metric:user:sum_int64:v1"
type: "beam:metrics:sum_int64:v1"
payload: "\n"
labels {
key: "NAME"
value: "count101"
}
labels {
key: "NAMESPACE"
value: "org.apache.beam.runners.samza.portable.SamzaPortableTest"
}
labels {
key: "PTRANSFORM"
value: "Kati-Step-2-ParMultiDo-Anonymous-"
}
}
MetricName metricName = MetricName.named(className, userMetricName); | ||
|
||
switch (monitoringInfo.getType()) { | ||
case SUM_INT64_TYPE: | ||
Counter counter = metricsContainer.getCounter(metricName); | ||
counter.inc(decodeInt64Counter(monitoringInfo.getPayload())); | ||
break; | ||
|
||
case DISTRIBUTION_INT64_TYPE: | ||
Distribution distribution = metricsContainer.getDistribution(metricName); | ||
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload()); | ||
distribution.update(data.sum(), data.count(), data.min(), data.max()); | ||
break; | ||
|
||
case LATEST_INT64_TYPE: | ||
Gauge gauge = metricsContainer.getGauge(metricName); | ||
// Gauge doesn't expose update as public. This will reset the timestamp. | ||
|
||
gauge.set(decodeInt64Gauge(monitoringInfo.getPayload()).value()); | ||
break; | ||
|
||
default: | ||
LOG.warn("Unsupported metric type {}", monitoringInfo.getType()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks like a responsibility of MetricsContainerImpl
. I'd move this within SamzaMetricsContainer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MonitoringInfo
is the data object used between the sdk worker and the runner; DistributionData/integer metric values are the data objected used within the runner.
I think it is the BundleProgressHandler
to translate the external data type to internal data types, and pass it to the internal services.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't agree. MonitoringInfo
is an API defined as part of portability and isn't tied to worker/runner.
the object mutated is not an object belonging to this class (BundleProgressHandler
). The handle is fetched from MetricsContainer
which can change its own code and that percolates all the way to this class.
If you were to have a contract of updateMetrics(monitoringInfo)
exposed by MetricsContainer
this class doesn't have to deal with the evolution of this logic and stays core to its purpose of just signaling components to do the right thing instead of owning what that right thing is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dropping the assumption of MonitoringInfo
is the contract between worker/runner.
I'm okay with putting the code in SamzaMetricsContainer
, but not MetricsContainerImpl
.
Flink put some updateMetrics(monitoringInfo)
code in MetricsContainerImpl
, but as part of the translation, a MetricName has to be defined. The Flink formatting is different from Samza, making it not directly usable for us.
This is a violation of the dependency inversion principle, since it is no longer an abstraction, and doesn't work regardless who the client is. What would be better is MetricsContainerImpl
provides methods usable by everyone, or just rename as FlinkMetricsContainerImpl
.
After encountering this problem, I wanted to pull the translation to as close to the BundleProgressHandler
as possible. Maybe I moved it too far up.
One concern for putting it in SamzaMetricsContainer
though, is SamzaMetricsContainer
serves both portability mode and classic mode. Why should we put a method only used by portable mode in the common space?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
portability or not, one can argue the responsibility is metrics handling and hence the logic should stay within SamzaMetricsContainer. Especially most of what the code does is pertained to metrics container and its internals.
IMO, the above code violates closed principle and leads to interactions with the component internals that ProgressHandler
depends on it.
The suggestion to keep this outside this class isn't motivated by how things are implemented in flink. In fact, I haven't looked at it how it does. My rationale is purely from how responsibilities are structured and the impact evolution of these responsibilities.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"interactions with the component internals" is true, but it was a pattern already existing in SamzaMetricsContainer
.
I can change it from
Distribution distribution = metricsContainer.getDistribution(metricName);
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
distribution.update(data.sum(), data.count(), data.min(), data.max());
to
DistributionData data = decodeInt64Distribution(monitoringInfo.getPayload());
Distribution distribution = metricsContainer.setDistribution(metricName, data);
but I think the parsing stays here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Support DoFn metrics in portable Samza Runner (apache#25068)
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.