Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support columns subset for cassandra and doris sink #16821

Merged
merged 7 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ private static int getCorrespondingCassandraType(DataType dataType) {
public static void checkSchema(
List<ColumnDesc> columnDescs,
Map<CqlIdentifier, ColumnMetadata> cassandraColumnDescMap) {
if (columnDescs.size() != cassandraColumnDescMap.size()) {
if (columnDescs.size() > cassandraColumnDescMap.size()) {
throw Status.FAILED_PRECONDITION
.withDescription("Don't match in the number of columns in the table")
.withDescription(
"The columns of the sink must be equal to or a superset of the target table's columns.")
.asRuntimeException();
}
for (ColumnDesc columnDesc : columnDescs) {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl ClickHouseSink {
.collect();

if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
return Err(SinkError::ClickHouse("The nums of the RisingWave column must be greater than/equal to the length of the Clickhouse column".to_string()));
return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_string()));
}

for i in rw_fields_name {
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct DorisCommon {
pub database: String,
#[serde(rename = "doris.table")]
pub table: String,
#[serde(rename = "doris.partial_update")]
pub partial_update: Option<String>,
}

impl DorisCommon {
Expand Down Expand Up @@ -125,8 +127,11 @@ impl DorisSink {
.collect();

let rw_fields_name = self.schema.fields();
if rw_fields_name.len().ne(&doris_columns_desc.len()) {
return Err(SinkError::Doris("The length of the RisingWave column must be equal to the length of the doris column".to_string()));
if rw_fields_name.len() > doris_columns_desc.len() {
return Err(SinkError::Doris(
"The columns of the sink must be equal to or a superset of the target table's columns."
.to_string(),
));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -273,6 +278,7 @@ impl DorisSinkWriter {
.add_common_header()
.set_user_password(config.common.user.clone(), config.common.password.clone())
.add_json_format()
.set_partial_columns(config.common.partial_update.clone())
.add_read_json_by_line();
let header = if !is_append_only {
header_builder.add_hidden_column().build()
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl HeaderBuilder {
self
}

/// Only use in Starrocks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does user to enable partial_update on starrocks sink? I think they can reuse the same with options on user side, but just set different header for different implementations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like create sink with(partial_update = 'true');

pub fn set_partial_update(mut self, partial_update: Option<String>) -> Self {
self.header.insert(
"partial_update".to_string(),
Expand All @@ -150,6 +151,15 @@ impl HeaderBuilder {
self
}

/// Only use in Doris
pub fn set_partial_columns(mut self, partial_columns: Option<String>) -> Self {
self.header.insert(
"partial_columns".to_string(),
partial_columns.unwrap_or_else(|| "false".to_string()),
);
self
}

/// Only used in Starrocks Transaction API
pub fn set_db(mut self, db: String) -> Self {
self.header.insert("db".to_string(), db);
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl StarrocksSink {
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if rw_fields_name.len() > starrocks_columns_desc.len() {
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string()));
return Err(SinkError::Starrocks("The columns of the sink must be equal to or a superset of the target table's columns.".to_string()));
}

for i in rw_fields_name {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ DorisConfig:
- name: doris.table
field_type: String
required: true
- name: doris.partial_update
field_type: String
required: false
- name: r#type
field_type: String
required: true
Expand Down
Loading