Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KafkaIO] Fix per-split metric updates for KafkaUnboundedReader and ReadFromKafkaDoFn #32921

Merged
merged 5 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -227,10 +226,6 @@ public boolean advance() throws IOException {
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + pState.topicPartition.toString());
rawSizes.update(recordSize);

for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
backlogBytesOfSplit.set(backlogSplit.getValue());
}

// Pass metrics to container.
kafkaResults.updateKafkaMetrics();
return true;
Expand Down Expand Up @@ -349,7 +344,6 @@ public long getSplitBacklogBytes() {
private final Counter bytesReadBySplit;
private final Gauge backlogBytesOfSplit;
private final Gauge backlogElementsOfSplit;
private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;
private final Counter checkpointMarkCommitsEnqueued =
Metrics.counter(METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_ENQUEUED_METRIC);
// Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
Expand Down Expand Up @@ -506,10 +500,6 @@ Instant updateAndGetWatermark() {
lastWatermark = timestampPolicy.getWatermark(mkTimestampPolicyContext());
return lastWatermark;
}

String name() {
return this.topicPartition.toString();
}
}

KafkaUnboundedReader(
Expand Down Expand Up @@ -554,16 +544,14 @@ String name() {
prevWatermark = Optional.of(new Instant(ckptMark.getWatermarkMillis()));
}

PartitionState<K, V> state =
new PartitionState<K, V>(
states.add(
new PartitionState<>(
tp,
nextOffset,
source
.getSpec()
.getTimestampPolicyFactory()
.createTimestampPolicy(tp, prevWatermark));
states.add(state);
perPartitionBacklogMetrics.put(state.name(), 0L);
.createTimestampPolicy(tp, prevWatermark)));
}

partitionStates = ImmutableList.copyOf(states);
Expand Down Expand Up @@ -680,6 +668,8 @@ private void nextBatch() throws IOException {

partitionStates.forEach(p -> p.recordIter = records.records(p.topicPartition).iterator());

reportBacklog();

// cycle through the partitions in order to interleave records from each.
curBatch = Iterators.cycle(new ArrayList<>(partitionStates));
}
Expand Down Expand Up @@ -758,7 +748,6 @@ private long getSplitBacklogMessageCount() {
if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
return UnboundedReader.BACKLOG_UNKNOWN;
}
perPartitionBacklogMetrics.put(p.name(), pBacklog);
backlogCount += pBacklog;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -222,13 +225,12 @@ private ReadFromKafkaDoFn(
// Valid between bundle start and bundle finish.
private transient @Nullable Deserializer<K> keyDeserializerInstance = null;
private transient @Nullable Deserializer<V> valueDeserializerInstance = null;
private transient @Nullable Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCache;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, AverageRecordSize>
avgRecordSizeCache;
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;

private HashMap<String, Long> perPartitionBacklogMetrics = new HashMap<String, Long>();;

@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
@VisibleForTesting final DeserializerProvider<V> valueDeserializerProvider;
Expand Down Expand Up @@ -290,7 +292,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
Expand Down Expand Up @@ -337,63 +339,42 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
@GetSize
public double getSize(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange)
throws Exception {
throws ExecutionException {
// If present, estimates the record size to offset gap ratio. Compacted topics may hold less
// records than the estimated offset range due to record deletion within a partition.
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final @Nullable AverageRecordSize avgRecordSize =
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor);
// The tracker estimates the offset range by subtracting the last claimed position from the
// currently observed end offset for the partition belonging to this split.
double estimatedOffsetRange =
restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
// Before processing elements, we don't have a good estimated size of records and offset gap.
// Return the estimated offset range without scaling by a size to gap ratio.
if (!avgRecordSize.asMap().containsKey(kafkaSourceDescriptor.getTopicPartition())) {
if (avgRecordSize == null) {
return estimatedOffsetRange;
}
if (offsetEstimatorCache != null) {
for (Map.Entry<TopicPartition, KafkaLatestOffsetEstimator> tp :
offsetEstimatorCache.entrySet()) {
perPartitionBacklogMetrics.put(tp.getKey().toString(), tp.getValue().estimate());
}
}

// When processing elements, a moving average estimates the size of records and offset gap.
// Return the estimated offset range scaled by the estimated size to gap ratio.
return estimatedOffsetRange
* avgRecordSize
.get(kafkaSourceDescriptor.getTopicPartition())
.estimateRecordByteSizeToOffsetCountRatio();
return estimatedOffsetRange * avgRecordSize.estimateRecordByteSizeToOffsetCountRatio();
}

@NewTracker
public OffsetRangeTracker restrictionTracker(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction) {
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange restriction)
throws ExecutionException {
if (restriction.getTo() < Long.MAX_VALUE) {
return new OffsetRangeTracker(restriction);
}

// OffsetEstimators are cached for each topic-partition because they hold a stateful connection,
// so we want to minimize the amount of connections that we start and track with Kafka. Another
// point is that it has a memoized backlog, and this should make that more reusable estimations.
final Map<TopicPartition, KafkaLatestOffsetEstimator> offsetEstimatorCacheInstance =
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
KafkaLatestOffsetEstimator offsetEstimator = offsetEstimatorCacheInstance.get(topicPartition);
if (offsetEstimator == null) {
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);

LOG.info("Creating Kafka consumer for offset estimation for {}", topicPartition);

Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition, offsetConsumerConfig, updatedConsumerConfig));
offsetEstimator = new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
offsetEstimatorCacheInstance.put(topicPartition, offsetEstimator);
}
final KafkaLatestOffsetEstimator offsetEstimator =
offsetEstimatorCache.get(kafkaSourceDescriptor);

return new GrowableOffsetRangeTracker(restriction.getFrom(), offsetEstimator);
}
Expand All @@ -405,22 +386,22 @@ public ProcessContinuation processElement(
WatermarkEstimator<Instant> watermarkEstimator,
MultiOutputReceiver receiver)
throws Exception {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSize =
Preconditions.checkStateNotNull(this.avgRecordSize);
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
final Deserializer<K> keyDeserializerInstance =
Preconditions.checkStateNotNull(this.keyDeserializerInstance);
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor);
// TODO: Metrics should be reported per split instead of partition, add bootstrap server hash?
final Distribution rawSizes =
Metrics.distribution(
METRIC_NAMESPACE,
RAW_SIZE_METRIC_PREFIX + kafkaSourceDescriptor.getTopicPartition().toString());
for (Map.Entry<String, Long> backlogSplit : perPartitionBacklogMetrics.entrySet()) {
Gauge backlog =
Metrics.gauge(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + backlogSplit.getKey());
backlog.set(backlogSplit.getValue());
}
Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
final Gauge backlogBytes =
Metrics.gauge(
METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + "backlogBytes_" + topicPartition.toString());

// Stop processing current TopicPartition when it's time to stop.
if (checkStopReadingFn != null
Expand All @@ -438,13 +419,10 @@ public ProcessContinuation processElement(
if (timestampPolicyFactory != null) {
timestampPolicy =
timestampPolicyFactory.createTimestampPolicy(
kafkaSourceDescriptor.getTopicPartition(),
Optional.ofNullable(watermarkEstimator.currentWatermark()));
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
}

LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
Expand Down Expand Up @@ -518,8 +496,8 @@ public ProcessContinuation processElement(
int recordSize =
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSize
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Expand Down Expand Up @@ -551,6 +529,15 @@ public ProcessContinuation processElement(
}
}
}

backlogBytes.set(
(long)
(BigDecimal.valueOf(
Preconditions.checkStateNotNull(
offsetEstimatorCache.get(kafkaSourceDescriptor).estimate()))
.subtract(BigDecimal.valueOf(expectedOffset), MathContext.DECIMAL128)
.doubleValue()
* avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
}
}
}
Expand Down Expand Up @@ -611,19 +598,44 @@ public Coder<OffsetRange> restrictionCoder() {
@Setup
public void setup() throws Exception {
// Start to track record size and offset gap per bundle.
avgRecordSize =
avgRecordSizeCache =
CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<TopicPartition, AverageRecordSize>() {
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
keyDeserializerInstance = keyDeserializerProvider.getDeserializer(consumerConfig, true);
valueDeserializerInstance = valueDeserializerProvider.getDeserializer(consumerConfig, false);
offsetEstimatorCache = new HashMap<>();
offsetEstimatorCache =
CacheBuilder.newBuilder()
.weakValues()
.expireAfterAccess(1, TimeUnit.MINUTES)
.build(
new CacheLoader<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>() {
@Override
public KafkaLatestOffsetEstimator load(
KafkaSourceDescriptor kafkaSourceDescriptor) throws Exception {
LOG.info(
"Creating Kafka consumer for offset estimation for {}",
kafkaSourceDescriptor);

TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
Consumer<byte[], byte[]> offsetConsumer =
consumerFactoryFn.apply(
KafkaIOUtils.getOffsetConsumerConfig(
"tracker-" + topicPartition,
offsetConsumerConfig,
updatedConsumerConfig));
return new KafkaLatestOffsetEstimator(offsetConsumer, topicPartition);
}
});
if (checkStopReadingFn != null) {
checkStopReadingFn.setup();
}
Expand All @@ -645,7 +657,7 @@ public void teardown() throws Exception {
}

if (offsetEstimatorCache != null) {
offsetEstimatorCache.clear();
offsetEstimatorCache.invalidateAll();
}
if (checkStopReadingFn != null) {
checkStopReadingFn.teardown();
Expand Down
Loading