Skip to content

Commit

Permalink
Use KafkaSourceDescriptor as cache key and log entry
Browse files Browse the repository at this point in the history
  • Loading branch information
sjvanrossum committed Oct 24, 2024
1 parent 49f48e8 commit 7d1a399
Showing 1 changed file with 12 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private ReadFromKafkaDoFn(
private transient @Nullable LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator>
offsetEstimatorCache;

private transient @Nullable LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache;
private transient @Nullable LoadingCache<KafkaSourceDescriptor, AverageRecordSize>
avgRecordSizeCache;
private static final long DEFAULT_KAFKA_POLL_TIMEOUT = 2L;
@VisibleForTesting final long consumerPollingTimeout;
@VisibleForTesting final DeserializerProvider<K> keyDeserializerProvider;
Expand Down Expand Up @@ -293,7 +294,7 @@ public OffsetRange initialRestriction(@Element KafkaSourceDescriptor kafkaSource
Map<String, Object> updatedConsumerConfig =
overrideBootstrapServersConfig(consumerConfig, kafkaSourceDescriptor);
TopicPartition partition = kafkaSourceDescriptor.getTopicPartition();
LOG.info("Creating Kafka consumer for initial restriction for {}", partition);
LOG.info("Creating Kafka consumer for initial restriction for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> offsetConsumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(offsetConsumer, ImmutableList.of(partition));
long startOffset;
Expand Down Expand Up @@ -341,10 +342,10 @@ public WatermarkEstimator<Instant> newWatermarkEstimator(
public double getSize(
@Element KafkaSourceDescriptor kafkaSourceDescriptor, @Restriction OffsetRange offsetRange)
throws ExecutionException {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache =
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final @Nullable AverageRecordSize avgRecordSize =
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor.getTopicPartition());
avgRecordSizeCache.getIfPresent(kafkaSourceDescriptor);
double numRecords =
restrictionTracker(kafkaSourceDescriptor, offsetRange).getProgress().getWorkRemaining();
// Before processing elements, we don't have a good estimated size of records and offset gap.
Expand Down Expand Up @@ -380,7 +381,7 @@ public ProcessContinuation processElement(
WatermarkEstimator<Instant> watermarkEstimator,
MultiOutputReceiver receiver)
throws Exception {
final LoadingCache<TopicPartition, AverageRecordSize> avgRecordSizeCache =
final LoadingCache<KafkaSourceDescriptor, AverageRecordSize> avgRecordSizeCache =
Preconditions.checkStateNotNull(this.avgRecordSizeCache);
final LoadingCache<KafkaSourceDescriptor, KafkaLatestOffsetEstimator> offsetEstimatorCache =
Preconditions.checkStateNotNull(this.offsetEstimatorCache);
Expand All @@ -389,7 +390,7 @@ public ProcessContinuation processElement(
final Deserializer<V> valueDeserializerInstance =
Preconditions.checkStateNotNull(this.valueDeserializerInstance);
final TopicPartition topicPartition = kafkaSourceDescriptor.getTopicPartition();
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(topicPartition);
final AverageRecordSize avgRecordSize = avgRecordSizeCache.get(kafkaSourceDescriptor);
// TODO: Metrics should be reported per split instead of partition, add bootstrap server hash?
final Distribution rawSizes =
Metrics.distribution(METRIC_NAMESPACE, RAW_SIZE_METRIC_PREFIX + topicPartition.toString());
Expand All @@ -416,9 +417,7 @@ public ProcessContinuation processElement(
topicPartition, Optional.ofNullable(watermarkEstimator.currentWatermark()));
}

LOG.info(
"Creating Kafka consumer for process continuation for {}",
kafkaSourceDescriptor.getTopicPartition());
LOG.info("Creating Kafka consumer for process continuation for {}", kafkaSourceDescriptor);
try (Consumer<byte[], byte[]> consumer = consumerFactoryFn.apply(updatedConsumerConfig)) {
ConsumerSpEL.evaluateAssign(
consumer, ImmutableList.of(kafkaSourceDescriptor.getTopicPartition()));
Expand Down Expand Up @@ -492,7 +491,7 @@ public ProcessContinuation processElement(
(rawRecord.key() == null ? 0 : rawRecord.key().length)
+ (rawRecord.value() == null ? 0 : rawRecord.value().length);
avgRecordSizeCache
.getUnchecked(kafkaSourceDescriptor.getTopicPartition())
.getUnchecked(kafkaSourceDescriptor)
.update(recordSize, rawRecord.offset() - expectedOffset);
rawSizes.update(recordSize);
expectedOffset = rawRecord.offset() + 1;
Expand Down Expand Up @@ -607,9 +606,10 @@ public void setup() throws Exception {
CacheBuilder.newBuilder()
.maximumSize(1000L)
.build(
new CacheLoader<TopicPartition, AverageRecordSize>() {
new CacheLoader<KafkaSourceDescriptor, AverageRecordSize>() {
@Override
public AverageRecordSize load(TopicPartition topicPartition) throws Exception {
public AverageRecordSize load(KafkaSourceDescriptor kafkaSourceDescriptor)
throws Exception {
return new AverageRecordSize();
}
});
Expand Down

0 comments on commit 7d1a399

Please sign in to comment.