diff --git a/src/connector/src/source/kafka/enumerator/client.rs b/src/connector/src/source/kafka/enumerator/client.rs index b86aa4fd0a4d2..cb2d8fa705b53 100644 --- a/src/connector/src/source/kafka/enumerator/client.rs +++ b/src/connector/src/source/kafka/enumerator/client.rs @@ -22,7 +22,9 @@ use rdkafka::{Offset, TopicPartitionList}; use crate::source::base::SplitEnumerator; use crate::source::kafka::split::KafkaSplit; -use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_SYNC_CALL_TIMEOUT}; +use crate::source::kafka::{ + KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, KAFKA_SYNC_CALL_TIMEOUT, +}; #[derive(Debug, Copy, Clone, Eq, PartialEq)] pub enum KafkaEnumeratorOffset { @@ -57,6 +59,7 @@ impl SplitEnumerator for KafkaSplitEnumerator { let broker_rewrite_map = common_props.broker_rewrite_map.clone(); let topic = common_props.topic.clone(); config.set("bootstrap.servers", &broker_address); + config.set("isolation.level", KAFKA_ISOLATION_LEVEL); common_props.set_security_properties(&mut config); let mut scan_start_offset = match properties .scan_startup_mode diff --git a/src/connector/src/source/kafka/mod.rs b/src/connector/src/source/kafka/mod.rs index afd039e922160..9bb55bca5128a 100644 --- a/src/connector/src/source/kafka/mod.rs +++ b/src/connector/src/source/kafka/mod.rs @@ -68,3 +68,4 @@ pub struct KafkaProperties { } const KAFKA_SYNC_CALL_TIMEOUT: Duration = Duration::from_secs(1); +const KAFKA_ISOLATION_LEVEL: &str = "read_committed"; diff --git a/src/connector/src/source/kafka/source/reader.rs b/src/connector/src/source/kafka/source/reader.rs index 63380043632e8..d9e109279b6b4 100644 --- a/src/connector/src/source/kafka/source/reader.rs +++ b/src/connector/src/source/kafka/source/reader.rs @@ -26,7 +26,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList}; use crate::impl_common_split_reader_logic; use crate::parser::ParserConfig; use crate::source::base::{SourceMessage, MAX_CHUNK_SIZE}; -use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext}; +use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL}; use crate::source::{ BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData, SplitReader, @@ -67,6 +67,7 @@ impl SplitReader for KafkaSplitReader { config.set("enable.partition.eof", "false"); config.set("enable.auto.commit", "false"); config.set("auto.offset.reset", "smallest"); + config.set("isolation.level", KAFKA_ISOLATION_LEVEL); config.set("bootstrap.servers", bootstrap_servers); properties.common.set_security_properties(&mut config);