diff --git a/e2e_test/sink/append_only_sink.slt b/e2e_test/sink/append_only_sink.slt index 57f194f033f4f..b656b1dfdc914 100644 --- a/e2e_test/sink/append_only_sink.slt +++ b/e2e_test/sink/append_only_sink.slt @@ -2,19 +2,19 @@ statement ok create table t (v1 int, v2 int); statement ok -create sink s1 from t with (connector = 'console'); +create sink s1 from t with (connector = 'blackhole'); statement ok -create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console'); +create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'blackhole'); statement error The sink cannot be append-only -create sink s3 from t with (connector = 'console', type = 'append-only'); +create sink s3 from t with (connector = 'blackhole', type = 'append-only'); statement ok -create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true'); +create sink s3 from t with (connector = 'blackhole', type = 'append-only', force_append_only = 'true'); statement error Cannot force the sink to be append-only -create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true'); +create sink s4 from t with (connector = 'blackhole', type = 'upsert', force_append_only = 'true'); statement ok drop sink s1 diff --git a/e2e_test/sink/create_sink_as.slt b/e2e_test/sink/create_sink_as.slt index 9968512f7d7ba..7f1cd19731a98 100644 --- a/e2e_test/sink/create_sink_as.slt +++ b/e2e_test/sink/create_sink_as.slt @@ -1,6 +1,3 @@ -# the sink requires a primary index for efficient execution -# which will be enforced by schema precheck in the future - statement ok CREATE TABLE t4 (v1 int primary key, v2 int); @@ -11,7 +8,8 @@ statement ok CREATE SINK s4 AS select mv4.v1 as v1, mv4.v2 as v2 from mv4 WITH ( connector = 'jdbc', jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', - table.name = 't4' + table.name = 't4', + type = 'upsert' ); statement ok diff --git a/e2e_test/sink/remote/jdbc.load.slt b/e2e_test/sink/remote/jdbc.load.slt index eb4cd70806f8d..d75a6922a0c98 100644 --- a/e2e_test/sink/remote/jdbc.load.slt +++ b/e2e_test/sink/remote/jdbc.load.slt @@ -18,14 +18,16 @@ statement ok CREATE SINK s_postgres FROM mv_remote WITH ( connector='jdbc', jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector', - table.name='t_remote' + table.name='t_remote', + type='upsert' ); statement ok CREATE SINK s_mysql FROM mv_remote WITH ( connector='jdbc', jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw', - table.name='t_remote' + table.name='t_remote', + type='upsert' ); statement ok diff --git a/integration_tests/mysql-sink/create_mv.sql b/integration_tests/mysql-sink/create_mv.sql index ea3ab9d71272a..5196258a669c9 100644 --- a/integration_tests/mysql-sink/create_mv.sql +++ b/integration_tests/mysql-sink/create_mv.sql @@ -12,5 +12,6 @@ FROM target_count WITH ( connector = 'jdbc', jdbc.url = 'jdbc:mysql://mysql:3306/mydb?user=root&password=123456', - table.name = 'target_count' + table.name = 'target_count', + type = 'upsert' ); \ No newline at end of file diff --git a/integration_tests/postgres-sink/create_mv.sql b/integration_tests/postgres-sink/create_mv.sql index e9bdce80d7749..9870d62c45a1c 100644 --- a/integration_tests/postgres-sink/create_mv.sql +++ b/integration_tests/postgres-sink/create_mv.sql @@ -12,5 +12,6 @@ FROM target_count WITH ( connector = 'jdbc', jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456', - table.name = 'target_count' + table.name = 'target_count', + type = 'upsert' ); \ No newline at end of file diff --git a/integration_tests/tidb-cdc-sink/create_mv.sql b/integration_tests/tidb-cdc-sink/create_mv.sql index 042da6cf585b3..b826c626df6e7 100644 --- a/integration_tests/tidb-cdc-sink/create_mv.sql +++ b/integration_tests/tidb-cdc-sink/create_mv.sql @@ -27,5 +27,6 @@ CREATE SINK hot_hashtags_sink FROM hot_hashtags WITH ( connector='jdbc', jdbc.url='jdbc:mysql://tidb:4000/test?user=root&password=', - table.name='hot_hashtags' + table.name='hot_hashtags', + type='upsert' ); diff --git a/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CommonSinkConfig.java b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CommonSinkConfig.java new file mode 100644 index 0000000000000..de8ebc0a32dd9 --- /dev/null +++ b/java/connector-node/connector-api/src/main/java/com/risingwave/connector/api/sink/CommonSinkConfig.java @@ -0,0 +1,29 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector.api.sink; + +public class CommonSinkConfig { + private String connector; + + public String getConnector() { + return connector; + } + + public void setConnector(String connector) { + this.connector = connector; + } +} diff --git a/java/connector-node/python-client/integration_tests.py b/java/connector-node/python-client/integration_tests.py index 7480902519e61..de3ff36190a1b 100644 --- a/java/connector-node/python-client/integration_tests.py +++ b/java/connector-node/python-client/integration_tests.py @@ -185,10 +185,7 @@ def test_upsert_iceberg_sink(input_file): def test_deltalake_sink(input_file): test_sink("deltalake", {"location":"minio://minioadmin:minioadmin@127.0.0.1:9000/bucket/delta", - "location.type":"minio", - "storage_options.s3a_endpoint":"http://127.0.0.1:9000", - "storage_options.s3a_access_key":"minioadmin", - "storage_options.s3a_secret_key":"minioadmin"}, + "location.type":"minio"}, input_file) if __name__ == "__main__": diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java index 085ae800bdb0f..24575b54a4262 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSink.java @@ -32,13 +32,14 @@ public class FileSink extends SinkBase { private final FileWriter sinkWriter; - private String sinkPath; + private FileSinkConfig config; private boolean closed = false; - public FileSink(String sinkPath, TableSchema tableSchema) { + public FileSink(FileSinkConfig config, TableSchema tableSchema) { super(tableSchema); - this.sinkPath = sinkPath; + + String sinkPath = config.getSinkPath(); try { new File(sinkPath).mkdirs(); Path path = Paths.get(sinkPath, UUID.randomUUID() + ".dat"); @@ -48,10 +49,12 @@ public FileSink(String sinkPath, TableSchema tableSchema) { throw INTERNAL.withDescription("failed to create file: " + path) .asRuntimeException(); } - this.sinkPath = path.toString(); + config.setSinkPath(path.toString()); } catch (IOException e) { throw INTERNAL.withCause(e).asRuntimeException(); } + + this.config = config; } @Override @@ -103,7 +106,7 @@ public void drop() { } public String getSinkPath() { - return sinkPath; + return config.getSinkPath(); } public boolean isClosed() { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkConfig.java new file mode 100644 index 0000000000000..7bd8d7c870369 --- /dev/null +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkConfig.java @@ -0,0 +1,38 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class FileSinkConfig extends CommonSinkConfig { + private String sinkPath; + + @JsonCreator + public FileSinkConfig(@JsonProperty(value = "output.path") String sinkPath) { + this.sinkPath = sinkPath; + } + + public String getSinkPath() { + return sinkPath; + } + + public void setSinkPath(String sinkPath) { + this.sinkPath = sinkPath; + } +} diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkFactory.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkFactory.java index 52cc1f125a48c..92046ec64d276 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkFactory.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/FileSinkFactory.java @@ -14,8 +14,8 @@ package com.risingwave.connector; -import static io.grpc.Status.*; - +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkBase; import com.risingwave.connector.api.sink.SinkFactory; @@ -23,21 +23,18 @@ import java.util.Map; public class FileSinkFactory implements SinkFactory { - public static final String OUTPUT_PATH_PROP = "output.path"; - @Override public SinkBase create(TableSchema tableSchema, Map tableProperties) { - String sinkPath = tableProperties.get(OUTPUT_PATH_PROP); - return new FileSink(sinkPath, tableSchema); + ObjectMapper mapper = new ObjectMapper(); + FileSinkConfig config = mapper.convertValue(tableProperties, FileSinkConfig.class); + return new FileSink(config, tableSchema); } @Override public void validate( TableSchema tableSchema, Map tableProperties, SinkType sinkType) { - if (!tableProperties.containsKey(OUTPUT_PATH_PROP)) { - throw INVALID_ARGUMENT - .withDescription(String.format("%s is not specified", OUTPUT_PATH_PROP)) - .asRuntimeException(); - } + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + mapper.convertValue(tableProperties, FileSinkConfig.class); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java index 9c6cbfcfa5a39..8bff6482b6e0e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/SinkValidationHandler.java @@ -19,6 +19,8 @@ import com.risingwave.proto.ConnectorServiceProto; import com.risingwave.proto.ConnectorServiceProto.SinkConfig; import io.grpc.stub.StreamObserver; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +39,30 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) { TableSchema tableSchema = TableSchema.fromProto(sinkConfig.getTableSchema()); SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getConnectorType()); sinkFactory.validate(tableSchema, sinkConfig.getPropertiesMap(), request.getSinkType()); + + } catch (IllegalArgumentException e) { + LOG.error("sink validation failed", e); + // Extract useful information from the error thrown by Jackson and convert it into a + // more concise message. + String errorMessage = e.getLocalizedMessage(); + Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'"); + Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\""); + Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage); + Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage); + if (missingFieldMatcher.find()) { + errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`"; + } else if (unrecognizedFieldMatcher.find()) { + errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`"; + } + responseObserver.onNext( + ConnectorServiceProto.ValidateSinkResponse.newBuilder() + .setError( + ConnectorServiceProto.ValidationError.newBuilder() + .setErrorMessage(errorMessage) + .build()) + .build()); + responseObserver.onCompleted(); + } catch (Exception e) { LOG.error("sink validation failed", e); responseObserver.onNext( diff --git a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java index 1ec9eda586c9f..0dacfe758a539 100644 --- a/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java +++ b/java/connector-node/risingwave-connector-service/src/test/java/com/risingwave/connector/FileSinkTest.java @@ -35,7 +35,8 @@ public void testSync() throws IOException { Files.createDirectories(Paths.get(path)); } - FileSink sink = new FileSink(path, TableSchema.getMockTableSchema()); + FileSinkConfig config = new FileSinkConfig(path); + FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); String filePath = sink.getSinkPath(); Path file = Paths.get(filePath); @@ -76,7 +77,8 @@ public void testWrite() throws IOException { if (!Paths.get(path).toFile().isDirectory()) { Files.createDirectories(Paths.get(path)); } - FileSink sink = new FileSink(path, TableSchema.getMockTableSchema()); + FileSinkConfig config = new FileSinkConfig(path); + FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); String filePath = sink.getSinkPath(); try { @@ -104,7 +106,8 @@ public void testDrop() throws IOException { if (!Paths.get(path).toFile().isDirectory()) { Files.createDirectories(Paths.get(path)); } - FileSink sink = new FileSink(path, TableSchema.getMockTableSchema()); + FileSinkConfig config = new FileSinkConfig(path); + FileSink sink = new FileSink(config, TableSchema.getMockTableSchema()); sink.drop(); diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkConfig.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkConfig.java new file mode 100644 index 0000000000000..b171141e3f5b9 --- /dev/null +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class DeltaLakeSinkConfig extends CommonSinkConfig { + private String location; + + private String locationType; + + private String sinkType; + + @JsonProperty(value = "force_append_only") + private Boolean forceAppendOnly; + + @JsonCreator + public DeltaLakeSinkConfig( + @JsonProperty(value = "location") String location, + @JsonProperty(value = "location.type") String locationType, + @JsonProperty(value = "type") String sinkType) { + this.location = location; + this.locationType = locationType; + this.sinkType = sinkType; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public String getLocationType() { + return locationType; + } + + public void setLocationType(String locationType) { + this.locationType = locationType; + } +} diff --git a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkFactory.java b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkFactory.java index 57b83bf8d0ca2..e2a754dfea034 100644 --- a/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkFactory.java +++ b/java/connector-node/risingwave-sink-deltalake/src/main/java/com/risingwave/connector/DeltaLakeSinkFactory.java @@ -16,6 +16,8 @@ import static io.grpc.Status.*; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkBase; import com.risingwave.connector.api.sink.SinkFactory; @@ -30,19 +32,18 @@ public class DeltaLakeSinkFactory implements SinkFactory { - private static final String LOCATION_PROP = "location"; - private static final String LOCATION_TYPE_PROP = "location.type"; private static final String confEndpoint = "fs.s3a.endpoint"; private static final String confKey = "fs.s3a.access.key"; private static final String confSecret = "fs.s3a.secret.key"; @Override public SinkBase create(TableSchema tableSchema, Map tableProperties) { - String location = tableProperties.get(LOCATION_PROP); - String locationType = tableProperties.get(LOCATION_TYPE_PROP); + ObjectMapper mapper = new ObjectMapper(); + DeltaLakeSinkConfig config = + mapper.convertValue(tableProperties, DeltaLakeSinkConfig.class); Configuration hadoopConf = new Configuration(); - location = getConfig(location, locationType, hadoopConf); + String location = getConfig(config.getLocation(), config.getLocationType(), hadoopConf); DeltaLog log = DeltaLog.forTable(hadoopConf, location); StructType schema = log.snapshot().getMetadata().getSchema(); @@ -59,17 +60,13 @@ public void validate( .asRuntimeException(); } - if (!tableProperties.containsKey(LOCATION_PROP) - || !tableProperties.containsKey(LOCATION_TYPE_PROP)) { - throw INVALID_ARGUMENT - .withDescription( - String.format( - "%s or %s is not specified", LOCATION_PROP, LOCATION_TYPE_PROP)) - .asRuntimeException(); - } + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + DeltaLakeSinkConfig config = + mapper.convertValue(tableProperties, DeltaLakeSinkConfig.class); - String location = tableProperties.get(LOCATION_PROP); - String locationType = tableProperties.get(LOCATION_TYPE_PROP); + String location = config.getLocation(); + String locationType = config.getLocationType(); Configuration hadoopConf = new Configuration(); location = getConfig(location, locationType, hadoopConf); diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java new file mode 100644 index 0000000000000..03b4d867644e3 --- /dev/null +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkConfig.java @@ -0,0 +1,126 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class IcebergSinkConfig extends CommonSinkConfig { + private String sinkType; + + private String warehousePath; + + private String databaseName; + + private String tableName; + + @JsonProperty(value = "s3.access.key") + private String s3AccessKey; + + @JsonProperty(value = "s3.secret.key") + private String s3SecretKey; + + @JsonProperty(value = "s3.endpoint") + private String s3Endpoint; + + @JsonProperty(value = "force_append_only") + private Boolean forceAppendOnly; + + @JsonProperty(value = "primary_key") + private String primaryKey; + + @JsonCreator + public IcebergSinkConfig( + @JsonProperty(value = "type") String sinkType, + @JsonProperty(value = "warehouse.path") String warehousePath, + @JsonProperty(value = "database.name") String databaseName, + @JsonProperty(value = "table.name") String tableName) { + this.sinkType = sinkType; + this.warehousePath = warehousePath; + this.databaseName = databaseName; + this.tableName = tableName; + } + + public String getSinkType() { + return sinkType; + } + + public void setSinkType(String sinkType) { + this.sinkType = sinkType; + } + + public String getWarehousePath() { + return warehousePath; + } + + public void setWarehousePath(String warehousePath) { + this.warehousePath = warehousePath; + } + + public String getDatabaseName() { + return databaseName; + } + + public void setDatabaseName(String databaseName) { + this.databaseName = databaseName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String getS3AccessKey() { + return s3AccessKey; + } + + public void setS3AccessKey(String s3AccessKey) { + this.s3AccessKey = s3AccessKey; + } + + public String getS3SecretKey() { + return s3SecretKey; + } + + public void setS3SecretKey(String s3SecretKey) { + this.s3SecretKey = s3SecretKey; + } + + public String getS3Endpoint() { + return s3Endpoint; + } + + public void setS3Endpoint(String s3Endpoint) { + this.s3Endpoint = s3Endpoint; + } + + public boolean hasS3Endpoint() { + return s3Endpoint != null; + } + + public boolean hasS3AccessKey() { + return s3AccessKey != null; + } + + public boolean hasS3SecretKey() { + return s3SecretKey != null; + } +} diff --git a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java index 1cdb1edf0e468..1e6f8ba43a000 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java +++ b/java/connector-node/risingwave-sink-iceberg/src/main/java/com/risingwave/connector/IcebergSinkFactory.java @@ -17,6 +17,8 @@ import static io.grpc.Status.INVALID_ARGUMENT; import static io.grpc.Status.UNIMPLEMENTED; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkBase; import com.risingwave.connector.api.sink.SinkFactory; @@ -39,13 +41,6 @@ public class IcebergSinkFactory implements SinkFactory { private static final Logger LOG = LoggerFactory.getLogger(IcebergSinkFactory.class); - public static final String SINK_TYPE_PROP = "type"; - public static final String WAREHOUSE_PATH_PROP = "warehouse.path"; - public static final String DATABASE_NAME_PROP = "database.name"; - public static final String TABLE_NAME_PROP = "table.name"; - public static final String S3_ACCESS_KEY_PROP = "s3.access.key"; - public static final String S3_SECRET_KEY_PROP = "s3.secret.key"; - public static final String S3_ENDPOINT_PROP = "s3.endpoint"; public static final FileFormat FILE_FORMAT = FileFormat.PARQUET; // hadoop catalog config @@ -58,20 +53,28 @@ public class IcebergSinkFactory implements SinkFactory { @Override public SinkBase create(TableSchema tableSchema, Map tableProperties) { - String mode = tableProperties.get(SINK_TYPE_PROP); - String warehousePath = getWarehousePath(tableProperties); - String databaseName = tableProperties.get(DATABASE_NAME_PROP); - String tableName = tableProperties.get(TABLE_NAME_PROP); + ObjectMapper mapper = new ObjectMapper(); + IcebergSinkConfig config = mapper.convertValue(tableProperties, IcebergSinkConfig.class); + String warehousePath = getWarehousePath(config); + config.setWarehousePath(warehousePath); String scheme = parseWarehousePathScheme(warehousePath); + TableIdentifier tableIdentifier = + TableIdentifier.of(config.getDatabaseName(), config.getTableName()); + Configuration hadoopConf = createHadoopConf(scheme, config); + SinkBase sink = null; - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Configuration hadoopConf = createHadoopConf(scheme, tableProperties); - HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); - Table icebergTable; - try { - icebergTable = hadoopCatalog.loadTable(tableIdentifier); - hadoopCatalog.close(); + try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) { + Table icebergTable = hadoopCatalog.loadTable(tableIdentifier); + String sinkType = config.getSinkType(); + if (sinkType.equals("append-only")) { + sink = new IcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT); + } else if (sinkType.equals("upsert")) { + sink = + new UpsertIcebergSink( + tableSchema, hadoopCatalog, + icebergTable, FILE_FORMAT); + } } catch (Exception e) { throw Status.FAILED_PRECONDITION .withDescription( @@ -80,43 +83,26 @@ public SinkBase create(TableSchema tableSchema, Map tablePropert .asRuntimeException(); } - if (mode.equals("append-only")) { - return new IcebergSink(tableSchema, hadoopCatalog, icebergTable, FILE_FORMAT); - } else if (mode.equals("upsert")) { - return new UpsertIcebergSink( - tableSchema, hadoopCatalog, - icebergTable, FILE_FORMAT); + if (sink == null) { + throw UNIMPLEMENTED + .withDescription("unsupported mode: " + config.getSinkType()) + .asRuntimeException(); } - throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException(); + return sink; } @Override public void validate( TableSchema tableSchema, Map tableProperties, SinkType sinkType) { - if (!tableProperties.containsKey(SINK_TYPE_PROP) // only append-only, upsert - || !tableProperties.containsKey(WAREHOUSE_PATH_PROP) - || !tableProperties.containsKey(DATABASE_NAME_PROP) - || !tableProperties.containsKey(TABLE_NAME_PROP)) { - throw INVALID_ARGUMENT - .withDescription( - String.format( - "%s, %s, %s or %s is not specified", - SINK_TYPE_PROP, - WAREHOUSE_PATH_PROP, - DATABASE_NAME_PROP, - TABLE_NAME_PROP)) - .asRuntimeException(); - } - - String mode = tableProperties.get(SINK_TYPE_PROP); - String databaseName = tableProperties.get(DATABASE_NAME_PROP); - String tableName = tableProperties.get(TABLE_NAME_PROP); - String warehousePath = getWarehousePath(tableProperties); + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + IcebergSinkConfig config = mapper.convertValue(tableProperties, IcebergSinkConfig.class); - String schema = parseWarehousePathScheme(warehousePath); - - TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, tableName); - Configuration hadoopConf = createHadoopConf(schema, tableProperties); + String warehousePath = getWarehousePath(config); + String scheme = parseWarehousePathScheme(warehousePath); + TableIdentifier tableIdentifier = + TableIdentifier.of(config.getDatabaseName(), config.getTableName()); + Configuration hadoopConf = createHadoopConf(scheme, config); try (HadoopCatalog hadoopCatalog = new HadoopCatalog(hadoopConf, warehousePath); ) { @@ -153,8 +139,10 @@ public void validate( .asRuntimeException(); } - if (!mode.equals("append-only") && !mode.equals("upsert")) { - throw UNIMPLEMENTED.withDescription("unsupported mode: " + mode).asRuntimeException(); + if (!config.getSinkType().equals("append-only") && !config.getSinkType().equals("upsert")) { + throw UNIMPLEMENTED + .withDescription("unsupported mode: " + config.getSinkType()) + .asRuntimeException(); } switch (sinkType) { @@ -174,8 +162,8 @@ public void validate( } } - private static String getWarehousePath(Map tableProperties) { - String warehousePath = tableProperties.get(WAREHOUSE_PATH_PROP); + private static String getWarehousePath(IcebergSinkConfig config) { + String warehousePath = config.getWarehousePath(); // unify s3 and s3a if (warehousePath.startsWith("s3://")) { return warehousePath.replace("s3://", "s3a://"); @@ -202,7 +190,7 @@ private static String parseWarehousePathScheme(String warehousePath) { } } - private Configuration createHadoopConf(String scheme, Map tableProperties) { + private Configuration createHadoopConf(String scheme, IcebergSinkConfig config) { switch (scheme) { case "file": return new Configuration(); @@ -210,20 +198,20 @@ private Configuration createHadoopConf(String scheme, Map tableP Configuration hadoopConf = new Configuration(); hadoopConf.set(confIoImpl, s3FileIOImpl); hadoopConf.setBoolean(confPathStyleAccess, true); - if (!tableProperties.containsKey(S3_ENDPOINT_PROP)) { + if (!config.hasS3Endpoint()) { throw INVALID_ARGUMENT .withDescription( String.format( - "Should set %s for warehouse with scheme %s", - S3_ENDPOINT_PROP, scheme)) + "Should set `s3.endpoint` for warehouse with scheme %s", + scheme)) .asRuntimeException(); } - hadoopConf.set(confEndpoint, tableProperties.get(S3_ENDPOINT_PROP)); - if (tableProperties.containsKey(S3_ACCESS_KEY_PROP)) { - hadoopConf.set(confKey, tableProperties.get(S3_ACCESS_KEY_PROP)); + hadoopConf.set(confEndpoint, config.getS3Endpoint()); + if (config.hasS3AccessKey()) { + hadoopConf.set(confKey, config.getS3AccessKey()); } - if (tableProperties.containsKey(S3_SECRET_KEY_PROP)) { - hadoopConf.set(confSecret, tableProperties.get(S3_SECRET_KEY_PROP)); + if (config.hasS3SecretKey()) { + hadoopConf.set(confSecret, config.getS3SecretKey()); } return hadoopConf; default: diff --git a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java index 832ac2746f973..5961c78de4caf 100644 --- a/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java +++ b/java/connector-node/risingwave-sink-iceberg/src/test/java/com/risingwave/connector/IcebergSinkFactoryTest.java @@ -64,13 +64,13 @@ public void testCreate() throws IOException { sinkFactory.create( TableSchema.getMockTableSchema(), Map.of( - IcebergSinkFactory.SINK_TYPE_PROP, + "type", sinkMode, - IcebergSinkFactory.WAREHOUSE_PATH_PROP, + "warehouse.path", warehousePath, - IcebergSinkFactory.DATABASE_NAME_PROP, + "database.name", databaseName, - IcebergSinkFactory.TABLE_NAME_PROP, + "table.name", tableName)); try { assertTrue( diff --git a/java/connector-node/risingwave-sink-jdbc/pom.xml b/java/connector-node/risingwave-sink-jdbc/pom.xml index 65c59aa7c3b5e..52e85d07a4b1c 100644 --- a/java/connector-node/risingwave-sink-jdbc/pom.xml +++ b/java/connector-node/risingwave-sink-jdbc/pom.xml @@ -35,6 +35,17 @@ log4j-core + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + + com.fasterxml.jackson.core + jackson-core + ${jackson.version} + + junit junit diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java index 4119aa7256a94..9c1f95a178db6 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java @@ -34,9 +34,8 @@ public class JDBCSink extends SinkBase { private static final String UPDATE_TEMPLATE = "UPDATE %s SET %s WHERE %s"; private static final String ERROR_REPORT_TEMPLATE = "Error when exec %s, message %s"; - private final String tableName; + private final JDBCSinkConfig config; private final Connection conn; - private final String jdbcUrl; private final List pkColumnNames; public static final String JDBC_COLUMN_NAME_KEY = "COLUMN_NAME"; @@ -45,15 +44,14 @@ public class JDBCSink extends SinkBase { private static final Logger LOG = LoggerFactory.getLogger(JDBCSink.class); - public JDBCSink(String tableName, String jdbcUrl, TableSchema tableSchema) { + public JDBCSink(JDBCSinkConfig config, TableSchema tableSchema) { super(tableSchema); - this.tableName = tableName; - this.jdbcUrl = jdbcUrl; + this.config = config; try { - this.conn = DriverManager.getConnection(jdbcUrl); + this.conn = DriverManager.getConnection(config.getJdbcUrl()); this.conn.setAutoCommit(false); - this.pkColumnNames = getPkColumnNames(conn, tableName); + this.pkColumnNames = getPkColumnNames(conn, config.getTableName()); } catch (SQLException e) { throw Status.INTERNAL .withDescription( @@ -79,10 +77,9 @@ private static List getPkColumnNames(Connection conn, String tableName) return pkColumnNames; } - public JDBCSink(Connection conn, TableSchema tableSchema, String tableName) { + public JDBCSink(Connection conn, TableSchema tableSchema, String tableName, String sinkType) { super(tableSchema); - this.tableName = tableName; - this.jdbcUrl = null; + this.config = new JDBCSinkConfig(null, tableName, sinkType); this.conn = conn; this.pkColumnNames = getPkColumnNames(conn, tableName); } @@ -97,7 +94,8 @@ private PreparedStatement prepareStatement(SinkRow row) { .map((Object o) -> "?") .collect(Collectors.joining(",")); String insertStmt = - String.format(INSERT_TEMPLATE, tableName, columnsRepr, valuesRepr); + String.format( + INSERT_TEMPLATE, config.getTableName(), columnsRepr, valuesRepr); try { PreparedStatement stmt = conn.prepareStatement(insertStmt, Statement.RETURN_GENERATED_KEYS); @@ -128,7 +126,8 @@ private PreparedStatement prepareStatement(SinkRow row) { .map(key -> key + " = ?") .collect(Collectors.joining(" AND ")); } - String deleteStmt = String.format(DELETE_TEMPLATE, tableName, deleteCondition); + String deleteStmt = + String.format(DELETE_TEMPLATE, config.getTableName(), deleteCondition); try { int placeholderIdx = 1; PreparedStatement stmt = @@ -194,7 +193,7 @@ private PreparedStatement prepareStatement(SinkRow row) { String updateStmt = String.format( UPDATE_TEMPLATE, - tableName, + config.getTableName(), updateColumns, updateDeleteConditionBuffer); try { @@ -287,6 +286,6 @@ public void drop() { } public String getTableName() { - return tableName; + return config.getTableName(); } } diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java new file mode 100644 index 0000000000000..1c37e2a66986a --- /dev/null +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java @@ -0,0 +1,58 @@ +/* + * Copyright 2023 RisingWave Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.risingwave.connector; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.risingwave.connector.api.sink.CommonSinkConfig; + +public class JDBCSinkConfig extends CommonSinkConfig { + private String jdbcUrl; + + private String tableName; + + private String sinkType; + + @JsonProperty(value = "force_append_only") + private Boolean forceAppendOnly; + + @JsonCreator + public JDBCSinkConfig( + @JsonProperty(value = "jdbc.url") String jdbcUrl, + @JsonProperty(value = "table.name") String tableName, + @JsonProperty(value = "type") String sinkType) { + this.jdbcUrl = jdbcUrl; + this.tableName = tableName; + this.sinkType = sinkType; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } +} diff --git a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java index 8e03db0032432..885549e5b07c2 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java +++ b/java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkFactory.java @@ -14,6 +14,8 @@ package com.risingwave.connector; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import com.risingwave.connector.api.TableSchema; import com.risingwave.connector.api.sink.SinkBase; import com.risingwave.connector.api.sink.SinkFactory; @@ -35,25 +37,20 @@ public class JDBCSinkFactory implements SinkFactory { @Override public SinkBase create(TableSchema tableSchema, Map tableProperties) { - String tableName = tableProperties.get(TABLE_NAME_PROP); - String jdbcUrl = tableProperties.get(JDBC_URL_PROP); - return new JDBCSink(tableName, jdbcUrl, tableSchema); + ObjectMapper mapper = new ObjectMapper(); + JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class); + return new JDBCSink(config, tableSchema); } @Override public void validate( TableSchema tableSchema, Map tableProperties, SinkType sinkType) { - if (!tableProperties.containsKey(JDBC_URL_PROP) - || !tableProperties.containsKey(TABLE_NAME_PROP)) { - throw Status.INVALID_ARGUMENT - .withDescription( - String.format( - "%s or %s is not specified", JDBC_URL_PROP, TABLE_NAME_PROP)) - .asRuntimeException(); - } + ObjectMapper mapper = new ObjectMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true); + JDBCSinkConfig config = mapper.convertValue(tableProperties, JDBCSinkConfig.class); - String jdbcUrl = tableProperties.get(JDBC_URL_PROP); - String tableName = tableProperties.get(TABLE_NAME_PROP); + String jdbcUrl = config.getJdbcUrl(); + String tableName = config.getTableName(); Set jdbcColumns = new HashSet<>(); Set jdbcPk = new HashSet<>(); Set jdbcTableNames = new HashSet<>(); diff --git a/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java b/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java index 62a8f63c207ab..cd9b3e803a021 100644 --- a/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java +++ b/java/connector-node/risingwave-sink-jdbc/src/test/java/com/risingwave/connector/JDBCSinkTest.java @@ -30,7 +30,7 @@ public class JDBCSinkTest { @Test public void testJDBCSync() throws SQLException { Connection conn = DriverManager.getConnection(connectionURL); - JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test"); + JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test", "upsert"); createMockTable(conn, sink.getTableName()); sink.write(Iterators.forArray(new ArraySinkRow(Op.INSERT, 1, "Alice"))); @@ -66,7 +66,7 @@ private void createMockTable(Connection conn, String tableName) throws SQLExcept } try { - stmt.execute("create table " + tableName + " (id int, name varchar(255))"); + stmt.execute("create table " + tableName + " (id int primary key, name varchar(255))"); conn.commit(); } catch (SQLException e) { throw io.grpc.Status.INTERNAL.withCause(e).asRuntimeException(); @@ -78,7 +78,7 @@ private void createMockTable(Connection conn, String tableName) throws SQLExcept @Test public void testJDBCWrite() throws SQLException { Connection conn = DriverManager.getConnection(connectionURL); - JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test"); + JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test", "upsert"); createMockTable(conn, sink.getTableName()); sink.write( @@ -106,7 +106,7 @@ public void testJDBCWrite() throws SQLException { @Test public void testJDBCDrop() throws SQLException { Connection conn = DriverManager.getConnection(connectionURL); - JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test"); + JDBCSink sink = new JDBCSink(conn, TableSchema.getMockTableSchema(), "test", "upsert"); sink.drop(); try { assertTrue(conn.isClosed()); diff --git a/src/connector/src/sink/console.rs b/src/connector/src/sink/console.rs index a8985dbb83929..0d3d762c56376 100644 --- a/src/connector/src/sink/console.rs +++ b/src/connector/src/sink/console.rs @@ -14,29 +14,38 @@ use std::collections::HashMap; +use anyhow::anyhow; use async_trait::async_trait; use itertools::join; use risingwave_common::array::{Op, StreamChunk}; use risingwave_common::catalog::Schema; use risingwave_common::row::Row; use risingwave_common::types::{DatumRef, ScalarRefImpl}; +use serde::Deserialize; +use super::SinkError; use crate::sink::{Result, Sink}; pub const CONSOLE_SINK: &str = "console"; -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Deserialize)] +#[serde(deny_unknown_fields)] pub struct ConsoleConfig { + #[serde(skip_serializing)] + pub connector: String, + + #[serde(default)] pub prefix: Option, + + #[serde(default)] pub suffix: Option, } impl ConsoleConfig { pub fn from_hashmap(values: HashMap) -> Result { - Ok(ConsoleConfig { - prefix: values.get("prefix").cloned(), - suffix: values.get("suffix").cloned(), - }) + let config = serde_json::from_value::(serde_json::to_value(values).unwrap()) + .map_err(|e| SinkError::Config(anyhow!(e)))?; + Ok(config) } } @@ -178,6 +187,7 @@ mod test { let mut console_sink = ConsoleSink::new( ConsoleConfig { + connector: "console".to_string(), prefix: Option::from("[CONSOLE] ".to_string()), suffix: Option::from(";".to_string()), }, diff --git a/src/connector/src/sink/kafka.rs b/src/connector/src/sink/kafka.rs index 36e3ba985e7ea..5e44e8c0bea8c 100644 --- a/src/connector/src/sink/kafka.rs +++ b/src/connector/src/sink/kafka.rs @@ -56,12 +56,19 @@ const fn _default_use_transaction() -> bool { } #[derive(Debug, Clone, Deserialize)] +#[serde(deny_unknown_fields)] pub struct KafkaConfig { + #[serde(skip_serializing)] + pub connector: String, // Must be "kafka" here. + #[serde(flatten)] pub common: KafkaCommon, pub r#type: String, // accept "append-only", "debezium", or "upsert" + #[serde(default)] + pub force_append_only: Option, + pub identifier: String, #[serde( @@ -554,14 +561,15 @@ mod test { #[test] fn parse_kafka_config() { let properties: HashMap = hashmap! { + "connector".to_string() => "kafka".to_string(), "properties.bootstrap.server".to_string() => "localhost:9092".to_string(), "topic".to_string() => "test".to_string(), "type".to_string() => "append-only".to_string(), "use_transaction".to_string() => "False".to_string(), - "security_protocol".to_string() => "SASL".to_string(), - "sasl_mechanism".to_string() => "SASL".to_string(), - "sasl_username".to_string() => "test".to_string(), - "sasl_password".to_string() => "test".to_string(), + "properties.security.protocol".to_string() => "SASL".to_string(), + "properties.sasl.mechanism".to_string() => "SASL".to_string(), + "properties.sasl.username".to_string() => "test".to_string(), + "properties.sasl.password".to_string() => "test".to_string(), "identifier".to_string() => "test_sink_1".to_string(), "properties.timeout".to_string() => "5s".to_string(), }; @@ -574,10 +582,10 @@ mod test { #[tokio::test] async fn test_kafka_producer() -> Result<()> { let properties = hashmap! { - "kafka.brokers".to_string() => "localhost:29092".to_string(), + "properties.bootstrap.server".to_string() => "localhost:29092".to_string(), "identifier".to_string() => "test_sink_1".to_string(), "type".to_string() => "append-only".to_string(), - "kafka.topic".to_string() => "test_topic".to_string(), + "topic".to_string() => "test_topic".to_string(), }; let schema = Schema::new(vec![ Field { diff --git a/src/connector/src/sink/mod.rs b/src/connector/src/sink/mod.rs index 3552290c245d4..694e8a727d53f 100644 --- a/src/connector/src/sink/mod.rs +++ b/src/connector/src/sink/mod.rs @@ -43,6 +43,8 @@ use crate::sink::kafka::{KafkaConfig, KafkaSink, KAFKA_SINK}; use crate::sink::redis::{RedisConfig, RedisSink}; use crate::sink::remote::{RemoteConfig, RemoteSink}; use crate::ConnectorParams; + +pub const DOWNSTREAM_SINK_KEY: &str = "connector"; pub const SINK_TYPE_OPTION: &str = "type"; pub const SINK_TYPE_APPEND_ONLY: &str = "append-only"; pub const SINK_TYPE_DEBEZIUM: &str = "debezium"; diff --git a/src/frontend/src/optimizer/plan_node/stream_sink.rs b/src/frontend/src/optimizer/plan_node/stream_sink.rs index 8139991de51c0..1442a85db5e75 100644 --- a/src/frontend/src/optimizer/plan_node/stream_sink.rs +++ b/src/frontend/src/optimizer/plan_node/stream_sink.rs @@ -154,6 +154,20 @@ impl StreamSink { } } + if properties.contains_key(SINK_USER_FORCE_APPEND_ONLY_OPTION) + && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "true") + && !properties.value_eq_ignore_case(SINK_USER_FORCE_APPEND_ONLY_OPTION, "false") + { + return Err(ErrorCode::SinkError(Box::new(Error::new( + ErrorKind::InvalidInput, + format!( + "`{}` must be true or false", + SINK_USER_FORCE_APPEND_ONLY_OPTION + ), + ))) + .into()); + } + let frontend_derived_append_only = input_append_only; let user_defined_append_only = properties.value_eq_ignore_case(SINK_TYPE_OPTION, SINK_TYPE_APPEND_ONLY); diff --git a/src/meta/src/stream/sink.rs b/src/meta/src/stream/sink.rs index 5fd707941e277..f75ffb9a925fc 100644 --- a/src/meta/src/stream/sink.rs +++ b/src/meta/src/stream/sink.rs @@ -14,7 +14,8 @@ use anyhow::anyhow; use risingwave_connector::sink::catalog::SinkCatalog; -use risingwave_connector::sink::{SinkConfig, SinkImpl}; +use risingwave_connector::sink::kafka::KAFKA_SINK; +use risingwave_connector::sink::{SinkConfig, SinkImpl, DOWNSTREAM_SINK_KEY}; use risingwave_pb::catalog::PbSink; use crate::{MetaError, MetaResult}; @@ -26,7 +27,9 @@ pub async fn validate_sink( let sink_catalog = SinkCatalog::from(prost_sink_catalog); let mut properties = sink_catalog.properties.clone(); // Insert a value as the `identifier` field to get parsed by serde. - properties.insert("identifier".to_string(), u64::MAX.to_string()); + if let Some(connector) = properties.get(DOWNSTREAM_SINK_KEY) && connector == KAFKA_SINK { + properties.insert("identifier".to_string(), u64::MAX.to_string()); + } let sink_config = SinkConfig::from_hashmap(properties) .map_err(|err| MetaError::from(anyhow!(err.to_string())))?; diff --git a/src/stream/src/executor/sink.rs b/src/stream/src/executor/sink.rs index 434fbe8c10ed2..c42a0f56af1d9 100644 --- a/src/stream/src/executor/sink.rs +++ b/src/stream/src/executor/sink.rs @@ -202,67 +202,6 @@ mod test { use super::*; use crate::executor::test_utils::*; - #[ignore] - #[tokio::test] - async fn test_mysqlsink() { - use risingwave_common::array::stream_chunk::StreamChunk; - use risingwave_common::array::StreamChunkTestExt; - use risingwave_common::catalog::Field; - use risingwave_common::types::DataType; - - use crate::executor::Barrier; - - let properties = maplit::hashmap! { - "connector".into() => "mysql".into(), - "endpoint".into() => "127.0.0.1:3306".into(), - "database".into() => "db".into(), - "table".into() => "t".into(), - "user".into() => "root".into() - }; - let schema = Schema::new(vec![ - Field::with_name(DataType::Int32, "v1"), - Field::with_name(DataType::Int32, "v2"), - Field::with_name(DataType::Int32, "v3"), - ]); - let pk = vec![]; - - // Mock `child` - let mock = MockSource::with_messages( - schema.clone(), - pk.clone(), - vec![ - Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( - " I I I - + 3 2 1", - ))), - Message::Barrier(Barrier::new_test_barrier(1)), - Message::Chunk(std::mem::take(&mut StreamChunk::from_pretty( - " I I I - + 6 5 4", - ))), - ], - ); - - let config = SinkConfig::from_hashmap(properties).unwrap(); - let sink_executor = SinkExecutor::new( - Box::new(mock), - Arc::new(StreamingMetrics::unused()), - config, - 0, - Default::default(), - schema.clone(), - pk.clone(), - SinkType::AppendOnly, - ); - - let mut executor = SinkExecutor::execute(Box::new(sink_executor)); - - executor.next().await.unwrap().unwrap(); - executor.next().await.unwrap().unwrap(); - executor.next().await.unwrap().unwrap(); - executor.next().await.unwrap().unwrap(); - } - #[tokio::test] async fn test_force_append_only_sink() { use risingwave_common::array::stream_chunk::StreamChunk; @@ -273,8 +212,8 @@ mod test { use crate::executor::Barrier; let properties = maplit::hashmap! { - "connector".into() => "console".into(), - "format".into() => "append_only".into(), + "connector".into() => "blackhole".into(), + "type".into() => "append-only".into(), "force_append_only".into() => "true".into() }; let schema = Schema::new(vec![ diff --git a/src/stream/src/from_proto/sink.rs b/src/stream/src/from_proto/sink.rs index f200d7211dc1d..a35580999c9be 100644 --- a/src/stream/src/from_proto/sink.rs +++ b/src/stream/src/from_proto/sink.rs @@ -13,7 +13,8 @@ // limitations under the License. use risingwave_connector::sink::catalog::SinkType; -use risingwave_connector::sink::SinkConfig; +use risingwave_connector::sink::kafka::KAFKA_SINK; +use risingwave_connector::sink::{SinkConfig, DOWNSTREAM_SINK_KEY}; use risingwave_pb::stream_plan::SinkNode; use super::*; @@ -44,10 +45,12 @@ impl ExecutorBuilder for SinkExecutorBuilder { let schema = sink_desc.columns.iter().map(Into::into).collect(); // This field can be used to distinguish a specific actor in parallelism to prevent // transaction execution errors - properties.insert( - "identifier".to_string(), - format!("sink-{:?}", params.executor_id), - ); + if let Some(connector) = properties.get(DOWNSTREAM_SINK_KEY) && connector == KAFKA_SINK { + properties.insert( + "identifier".to_string(), + format!("sink-{:?}", params.executor_id), + ); + } let config = SinkConfig::from_hashmap(properties).map_err(StreamExecutorError::from)?; Ok(Box::new(SinkExecutor::new(