Skip to content

Commit

Permalink
Stop paying the iterator object creation tax in MultiplexingMetricTra…
Browse files Browse the repository at this point in the history
…ckingFnDataReceiver (#25540)

* Stop paying the iterator object creation tax in MultiplexingMetricTrackingFnDataReceiver

This removes the overhead shown in https://user-images.githubusercontent.com/10078956/219762523-1e76e849-18b9-4c40-a513-000364baea52.png

This is for #21250
  • Loading branch information
lukecwik authored Feb 18, 2023
1 parent 9075332 commit de7eb2d
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder;
import org.apache.beam.sdk.util.common.ElementByteSizeObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;

/**
* The {@code PCollectionConsumerRegistry} is used to maintain a collection of consuming
Expand Down Expand Up @@ -209,8 +208,7 @@ public FnDataReceiver<WindowedValue<?>> getMultiplexingConsumer(String pCollecti
come up in the existing SDF expansion, but might be useful to support fused SDF nodes.
This would require dedicated delivery of the split results to each of the consumers
separately. */
return new MultiplexingMetricTrackingFnDataReceiver(
pcId, coder, ImmutableList.copyOf(consumerAndMetadatas));
return new MultiplexingMetricTrackingFnDataReceiver(pcId, coder, consumerAndMetadatas);
}
});
}
Expand Down Expand Up @@ -351,8 +349,10 @@ public void accept(WindowedValue<T> input) throws Exception {

// Use the ExecutionStateTracker and enter an appropriate state to track the
// Process Bundle Execution time metric and also ensure user counters can get an appropriate
// metrics container.
for (ConsumerAndMetadata consumerAndMetadata : consumerAndMetadatas) {
// metrics container. We specifically don't use a for-each loop since it creates an iterator
// on a hot path.
for (int size = consumerAndMetadatas.size(), i = 0; i < size; ++i) {
ConsumerAndMetadata consumerAndMetadata = consumerAndMetadatas.get(i);
ExecutionState state = consumerAndMetadata.getExecutionState();
state.activate();
try {
Expand Down

0 comments on commit de7eb2d

Please sign in to comment.