Skip to content

Commit

Permalink
Do no throw exceptions if commitSync fails in KafkaUnboundedReader. (#…
Browse files Browse the repository at this point in the history
…33402)

Committing consumer offsets to Kafka is not critical for KafkaIO because it relies on the offsets stored in KafkaCheckpointMark, but throwing an exception makes Dataflow retry the same work item unnecessarily.
  • Loading branch information
an2x authored Dec 17, 2024
1 parent 0e37501 commit e27eced
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -606,13 +606,20 @@ private void commitCheckpointMark() {
LOG.debug("{}: Committing finalized checkpoint {}", this, checkpointMark);
Consumer<byte[], byte[]> consumer = Preconditions.checkStateNotNull(this.consumer);

consumer.commitSync(
checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
try {
consumer.commitSync(
checkpointMark.getPartitions().stream()
.filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
.collect(
Collectors.toMap(
p -> new TopicPartition(p.getTopic(), p.getPartition()),
p -> new OffsetAndMetadata(p.getNextOffset()))));
} catch (Exception e) {
// Log but ignore the exception. Committing consumer offsets to Kafka is not critical for
// KafkaIO because it relies on the offsets stored in KafkaCheckpointMark.
LOG.warn(
String.format("%s: Could not commit finalized checkpoint %s", this, checkpointMark), e);
}
}
}

Expand Down

0 comments on commit e27eced

Please sign in to comment.