Skip to content

Commit

Permalink
fix: specify isolation.level in kafka source (#9033)
Browse files Browse the repository at this point in the history
Signed-off-by: tabVersion <[email protected]>
  • Loading branch information
tabVersion authored Apr 11, 2023
1 parent bcc00c0 commit 351b1bc
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 2 deletions.
5 changes: 4 additions & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use rdkafka::{Offset, TopicPartitionList};

use crate::source::base::SplitEnumerator;
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_SYNC_CALL_TIMEOUT};
use crate::source::kafka::{
KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL, KAFKA_SYNC_CALL_TIMEOUT,
};

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Expand Down Expand Up @@ -57,6 +59,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let broker_rewrite_map = common_props.broker_rewrite_map.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
common_props.set_security_properties(&mut config);
let mut scan_start_offset = match properties
.scan_startup_mode
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ pub struct KafkaProperties {
}

const KAFKA_SYNC_CALL_TIMEOUT: Duration = Duration::from_secs(1);
const KAFKA_ISOLATION_LEVEL: &str = "read_committed";
3 changes: 2 additions & 1 deletion src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};
use crate::impl_common_split_reader_logic;
use crate::parser::ParserConfig;
use crate::source::base::{SourceMessage, MAX_CHUNK_SIZE};
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext};
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_ISOLATION_LEVEL};
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData,
SplitReader,
Expand Down Expand Up @@ -67,6 +67,7 @@ impl SplitReader for KafkaSplitReader {
config.set("enable.partition.eof", "false");
config.set("enable.auto.commit", "false");
config.set("auto.offset.reset", "smallest");
config.set("isolation.level", KAFKA_ISOLATION_LEVEL);
config.set("bootstrap.servers", bootstrap_servers);

properties.common.set_security_properties(&mut config);
Expand Down

0 comments on commit 351b1bc

Please sign in to comment.