Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): support private link for kafka connector #8247

Merged
merged 32 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
72e9100
patch rdkafka for rewriting broker addresses
wangrunji0408 Feb 15, 2023
9f597c6
kafka private link examples
StrikeW Feb 20, 2023
d44119a
private link demo
StrikeW Feb 21, 2023
5fb1945
connection catalog and DDL rpc
StrikeW Feb 22, 2023
9de0357
persist Connection to catalog
StrikeW Feb 25, 2023
f24c196
add Connection rpc to risectl
StrikeW Feb 26, 2023
cf89158
fix create_connection bug
StrikeW Feb 26, 2023
6e8a993
save private link dns names into with properties
StrikeW Feb 26, 2023
2b1ded9
rewrite kafka broker addresses with custom client context
StrikeW Feb 26, 2023
1d9199d
fix clippy
StrikeW Feb 27, 2023
0128ce9
source private link support
StrikeW Feb 28, 2023
a562826
add security group id
StrikeW Mar 1, 2023
14719fb
Merge remote-tracking branch 'origin/main' into siyuan/aws-private-links
StrikeW Mar 1, 2023
2fc0c61
update
StrikeW Mar 1, 2023
313a9ff
fix
StrikeW Mar 1, 2023
756d4ea
fix clippy
StrikeW Mar 1, 2023
0e4a845
update dashboard
StrikeW Mar 1, 2023
133e43f
fix madsim-rdkafka
wangrunji0408 Mar 1, 2023
8a84a21
fix comments
StrikeW Mar 3, 2023
0b5171d
update
StrikeW Mar 3, 2023
b628da8
minor
StrikeW Mar 7, 2023
7c82192
fix comments
StrikeW Mar 7, 2023
dd4c5cd
minor
StrikeW Mar 7, 2023
d50414a
dedup subnet by availability_zone
StrikeW Mar 14, 2023
1135acf
refine error message
StrikeW Mar 14, 2023
3175b16
Merge remote-tracking branch 'origin/main' into siyuan/aws-private-links
StrikeW Mar 14, 2023
e5616b2
use az_id as the key to target dns name
StrikeW Mar 16, 2023
27aa398
fix serde
StrikeW Mar 16, 2023
af3ed91
error if no available target dns
StrikeW Mar 16, 2023
b826fbc
minor
StrikeW Mar 17, 2023
f1b6393
fix dashboard
StrikeW Mar 17, 2023
0628311
Merge remote-tracking branch 'origin/main' into siyuan/aws-private-links
StrikeW Mar 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ repository = "https://github.com/risingwavelabs/risingwave"
aws-config = { version = "0.51", default-features = false, features = ["rt-tokio", "native-tls"] }
aws-sdk-kinesis = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] }
aws-sdk-s3 = { version = "0.21", default-features = false, features = ["rt-tokio","native-tls"] }
aws-sdk-ec2 = { version = "0.21", default-features = false, features = ["rt-tokio","native-tls"] }
aws-sdk-sqs = { version = "0.21", default-features = false, features = ["rt-tokio", "native-tls"] }
aws-smithy-http = "0.51"
aws-smithy-types = "0.51"
Expand Down Expand Up @@ -123,3 +124,4 @@ tokio-stream = { git = "https://github.com/madsim-rs/tokio.git", rev = "0c25710"
tokio-retry = { git = "https://github.com/madsim-rs/rust-tokio-retry.git", rev = "95e2fd3" }
tokio-postgres = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
postgres-types = { git = "https://github.com/madsim-rs/rust-postgres.git", rev = "87ca1dc" }
madsim-rdkafka = { git = "https://github.com/madsim-rs/madsim.git", rev = "06a7af8" }
15 changes: 15 additions & 0 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,21 @@ message Sink {
string definition = 13;
}


message Connection {
message PrivateLinkService {
string endpoint_id = 1;
map<string, string> dns_entries = 2;
}
tabVersion marked this conversation as resolved.
Show resolved Hide resolved

uint32 id = 1;
string name = 2;
oneof info {
PrivateLinkService private_link_service = 3;
}

}

message Index {
uint32 id = 1;
uint32 schema_id = 2;
Expand Down
30 changes: 30 additions & 0 deletions proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,34 @@ message GetDdlProgressResponse {
repeated DdlProgress ddl_progress = 1;
}


message CreateConnectionRequest {
message PrivateLink {
string provider = 1;
string service_name = 2;
repeated string availability_zones = 3;
}

oneof payload {
PrivateLink private_link = 1;
}
}

message CreateConnectionResponse {
uint32 connection_id = 1;
// global catalog version
uint64 version = 2;
}

message ListConnectionRequest {
}

message ListConnectionResponse {
repeated catalog.Connection connections = 1;
// global catalog version
uint64 version = 2;
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
}

service DdlService {
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse);
Expand All @@ -260,4 +288,6 @@ service DdlService {
rpc ReplaceTablePlan(ReplaceTablePlanRequest) returns (ReplaceTablePlanResponse);
rpc GetTable(GetTableRequest) returns (GetTableResponse);
rpc GetDdlProgress(GetDdlProgressRequest) returns (GetDdlProgressResponse);
rpc CreateConnection(CreateConnectionRequest) returns (CreateConnectionResponse);
rpc ListConnections(ListConnectionRequest) returns (ListConnectionResponse);
}
3 changes: 2 additions & 1 deletion src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ anyhow = "1"
apache-avro = { git = "https://github.com/risingwavelabs/avro", branch = "waruto/modify-decimal", features = ["snappy", "zstandard", "bzip", "xz"] }
async-trait = "0.1"
aws-config = { workspace = true }
aws-sdk-ec2 = { workspace = true }
aws-sdk-kinesis = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-sdk-s3 = { workspace = true }
aws-smithy-http = { workspace = true }
aws-types = { workspace = true }
bincode = "1"
Expand Down
16 changes: 16 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,27 @@ use crate::source::kinesis::config::AwsConfigInfo;
// The file describes the common abstractions for each connector and can be used in both source and
// sink.

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsPrivateLinkItem {
pub service_name: String,
pub availability_zones: Vec<String>,
pub port: u16,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsPrivateLinks {
pub provider: String,
pub infos: Vec<AwsPrivateLinkItem>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,

#[serde(rename = "private.links.dns.names")]
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
pub private_link_dns_names: Option<String>,

#[serde(rename = "topic", alias = "kafka.topic")]
pub topic: String,

Expand Down
12 changes: 8 additions & 4 deletions src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::collections::HashMap;

use anyhow::anyhow;
use async_trait::async_trait;
use rdkafka::consumer::{BaseConsumer, Consumer, DefaultConsumerContext};
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaResult;
use rdkafka::{Offset, TopicPartitionList};

use crate::source::base::SplitEnumerator;
use crate::source::kafka::split::KafkaSplit;
use crate::source::kafka::{KafkaProperties, KAFKA_SYNC_CALL_TIMEOUT};
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext, KAFKA_SYNC_CALL_TIMEOUT};

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum KafkaEnumeratorOffset {
Expand All @@ -35,7 +35,7 @@ pub enum KafkaEnumeratorOffset {
pub struct KafkaSplitEnumerator {
broker_address: String,
topic: String,
client: BaseConsumer,
client: BaseConsumer<PrivateLinkConsumerContext>,
start_offset: KafkaEnumeratorOffset,

// maybe used in the future for batch processing
Expand All @@ -54,10 +54,12 @@ impl SplitEnumerator for KafkaSplitEnumerator {
let common_props = &properties.common;

let broker_address = common_props.brokers.clone();
let private_links = common_props.private_link_dns_names.clone();
let topic = common_props.topic.clone();
config.set("bootstrap.servers", &broker_address);
common_props.set_security_properties(&mut config);

tracing::info!("private_links: {:?}", private_links);
StrikeW marked this conversation as resolved.
Show resolved Hide resolved
let mut scan_start_offset = match properties
.scan_startup_mode
.as_ref()
Expand All @@ -79,7 +81,9 @@ impl SplitEnumerator for KafkaSplitEnumerator {
scan_start_offset = KafkaEnumeratorOffset::Timestamp(time_offset)
}

let client: BaseConsumer = config.create_with_context(DefaultConsumerContext).await?;
let client_ctx = PrivateLinkConsumerContext::new(&broker_address, &private_links);
let client: BaseConsumer<PrivateLinkConsumerContext> =
config.create_with_context(client_ctx).await?;

Ok(Self {
broker_address,
Expand Down
5 changes: 4 additions & 1 deletion src/connector/src/source/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@ use std::time::Duration;
use serde::Deserialize;

pub mod enumerator;
pub mod private_link;
pub mod source;
pub mod split;

pub use enumerator::*;
pub use private_link::*;
pub use source::*;
pub use split::*;

use crate::common::KafkaCommon;

pub const KAFKA_CONNECTOR: &str = "kafka";
pub const KAFKA_PROPS_BROKER_KEY: &str = "properties.bootstrap.server";
pub const KAFKA_PROPS_BROKER_KEY_ALIAS: &str = "kafka.brokers";

#[derive(Clone, Debug, Deserialize)]
pub struct KafkaProperties {
Expand Down
69 changes: 69 additions & 0 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::BTreeMap;

use itertools::Itertools;
use rdkafka::client::BrokerAddr;
use rdkafka::consumer::ConsumerContext;
use rdkafka::ClientContext;
use risingwave_common::util::iter_util::ZipEqFast;

pub struct PrivateLinkConsumerContext {
rewrite_map: BTreeMap<BrokerAddr, BrokerAddr>,
}

impl PrivateLinkConsumerContext {
pub fn new(brokers: &str, private_links: &Option<String>) -> Self {
let mut rewrite_map = BTreeMap::new();
if let Some(private_links) = private_links {
let dns_names = private_links.split(',').collect_vec();
let broker_adds = brokers.split(',').collect_vec();

broker_adds
.into_iter()
.zip_eq_fast(dns_names.into_iter())
.for_each(|(broker_addr, dns_name)| {
let broker_addr = broker_addr.split(':').collect_vec();
let dns_name = dns_name.split(':').collect_vec();
let old_addr = BrokerAddr {
host: broker_addr[0].to_string(),
port: broker_addr[1].to_string(),
};
let new_addr = BrokerAddr {
host: dns_name[0].to_string(),
port: dns_name[1].to_string(),
};
tabVersion marked this conversation as resolved.
Show resolved Hide resolved
rewrite_map.insert(old_addr, new_addr);
});
}
tracing::info!("broker addr rewrite map {:?}", rewrite_map);
Self { rewrite_map }
}
}

impl ClientContext for PrivateLinkConsumerContext {
fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
match self.rewrite_map.get(&addr) {
None => addr,
Some(new_addr) => {
tracing::debug!("broker addr {:?} rewrote to {:?}", addr, new_addr);
new_addr.clone()
}
}
}
}

// required by the trait bound of BaseConsumer
impl ConsumerContext for PrivateLinkConsumerContext {}
12 changes: 7 additions & 5 deletions src/connector/src/source/kafka/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message, Offset, TopicPartitionList};

use crate::impl_common_split_reader_logic;
use crate::parser::ParserConfig;
use crate::source::base::{SourceMessage, MAX_CHUNK_SIZE};
use crate::source::kafka::KafkaProperties;
use crate::source::kafka::{KafkaProperties, PrivateLinkConsumerContext};
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SplitId, SplitImpl, SplitMetaData,
SplitReader,
Expand All @@ -35,7 +35,7 @@ use crate::source::{
impl_common_split_reader_logic!(KafkaSplitReader, KafkaProperties);

pub struct KafkaSplitReader {
consumer: StreamConsumer<DefaultConsumerContext>,
consumer: StreamConsumer<PrivateLinkConsumerContext>,
start_offset: Option<i64>,
stop_offset: Option<i64>,
bytes_per_second: usize,
Expand All @@ -61,6 +61,7 @@ impl SplitReader for KafkaSplitReader {
let mut config = ClientConfig::new();

let bootstrap_servers = &properties.common.brokers;
let private_links = properties.common.private_link_dns_names.clone();

// disable partition eof
config.set("enable.partition.eof", "false");
Expand All @@ -83,9 +84,10 @@ impl SplitReader for KafkaSplitReader {
);
}

let consumer: StreamConsumer = config
let client_ctx = PrivateLinkConsumerContext::new(bootstrap_servers, &private_links);
let consumer: StreamConsumer<PrivateLinkConsumerContext> = config
.set_log_level(RDKafkaLogLevel::Info)
.create_with_context(DefaultConsumerContext)
.create_with_context(client_ctx)
.await
.map_err(|e| anyhow!("failed to create kafka consumer: {}", e))?;

Expand Down
2 changes: 2 additions & 0 deletions src/ctl/src/cmd_impl/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

mod backup_meta;
mod cluster_info;
mod connection;
mod pause_resume;
mod reschedule;

pub use backup_meta::*;
pub use cluster_info::*;
pub use connection::*;
pub use pause_resume::*;
pub use reschedule::*;
Loading