Skip to content

Commit

Permalink
feat(sink): support columns subset for cassandra and doris sink (#16821)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Jun 18, 2024
1 parent fbb597f commit ab1c71b
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ private static int getCorrespondingCassandraType(DataType dataType) {
public static void checkSchema(
List<ColumnDesc> columnDescs,
Map<CqlIdentifier, ColumnMetadata> cassandraColumnDescMap) {
if (columnDescs.size() != cassandraColumnDescMap.size()) {
if (columnDescs.size() > cassandraColumnDescMap.size()) {
throw Status.FAILED_PRECONDITION
.withDescription("Don't match in the number of columns in the table")
.withDescription(
"The columns of the sink must be equal to or a superset of the target table's columns.")
.asRuntimeException();
}
for (ColumnDesc columnDesc : columnDescs) {
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/clickhouse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ impl ClickHouseSink {
.collect();

if rw_fields_name.len().gt(&clickhouse_columns_desc.len()) {
return Err(SinkError::ClickHouse("The nums of the RisingWave column must be greater than/equal to the length of the Clickhouse column".to_string()));
return Err(SinkError::ClickHouse("The columns of the sink must be equal to or a superset of the target table's columns.".to_string()));
}

for i in rw_fields_name {
Expand Down
10 changes: 8 additions & 2 deletions src/connector/src/sink/doris.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub struct DorisCommon {
pub database: String,
#[serde(rename = "doris.table")]
pub table: String,
#[serde(rename = "doris.partial_update")]
pub partial_update: Option<String>,
}

impl DorisCommon {
Expand Down Expand Up @@ -125,8 +127,11 @@ impl DorisSink {
.collect();

let rw_fields_name = self.schema.fields();
if rw_fields_name.len().ne(&doris_columns_desc.len()) {
return Err(SinkError::Doris("The length of the RisingWave column must be equal to the length of the doris column".to_string()));
if rw_fields_name.len() > doris_columns_desc.len() {
return Err(SinkError::Doris(
"The columns of the sink must be equal to or a superset of the target table's columns."
.to_string(),
));
}

for i in rw_fields_name {
Expand Down Expand Up @@ -273,6 +278,7 @@ impl DorisSinkWriter {
.add_common_header()
.set_user_password(config.common.user.clone(), config.common.password.clone())
.add_json_format()
.set_partial_columns(config.common.partial_update.clone())
.add_read_json_by_line();
let header = if !is_append_only {
header_builder.add_hidden_column().build()
Expand Down
10 changes: 10 additions & 0 deletions src/connector/src/sink/doris_starrocks_connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ impl HeaderBuilder {
self
}

/// Only use in Starrocks
pub fn set_partial_update(mut self, partial_update: Option<String>) -> Self {
self.header.insert(
"partial_update".to_string(),
Expand All @@ -150,6 +151,15 @@ impl HeaderBuilder {
self
}

/// Only use in Doris
pub fn set_partial_columns(mut self, partial_columns: Option<String>) -> Self {
self.header.insert(
"partial_columns".to_string(),
partial_columns.unwrap_or_else(|| "false".to_string()),
);
self
}

/// Only used in Starrocks Transaction API
pub fn set_db(mut self, db: String) -> Self {
self.header.insert("db".to_string(), db);
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/starrocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl StarrocksSink {
) -> Result<()> {
let rw_fields_name = self.schema.fields();
if rw_fields_name.len() > starrocks_columns_desc.len() {
return Err(SinkError::Starrocks("The length of the RisingWave column must be equal or less to the length of the starrocks column".to_string()));
return Err(SinkError::Starrocks("The columns of the sink must be equal to or a superset of the target table's columns.".to_string()));
}

for i in rw_fields_name {
Expand Down
3 changes: 3 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ DorisConfig:
- name: doris.table
field_type: String
required: true
- name: doris.partial_update
field_type: String
required: false
- name: r#type
field_type: String
required: true
Expand Down

0 comments on commit ab1c71b

Please sign in to comment.