Skip to content

Commit

Permalink
feat(sink): reject invalid options when creating sink (risingwavelabs…
Browse files Browse the repository at this point in the history
…#8757)

Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
xx01cyx and github-actions[bot] authored Apr 3, 2023
1 parent 36c4605 commit 806c623
Show file tree
Hide file tree
Showing 30 changed files with 538 additions and 229 deletions.
10 changes: 5 additions & 5 deletions e2e_test/sink/append_only_sink.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ statement ok
create table t (v1 int, v2 int);

statement ok
create sink s1 from t with (connector = 'console');
create sink s1 from t with (connector = 'blackhole');

statement ok
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'console');
create sink s2 as select avg(v1), v2 from t group by v2 with (connector = 'blackhole');

statement error The sink cannot be append-only
create sink s3 from t with (connector = 'console', type = 'append-only');
create sink s3 from t with (connector = 'blackhole', type = 'append-only');

statement ok
create sink s3 from t with (connector = 'console', type = 'append-only', force_append_only = 'true');
create sink s3 from t with (connector = 'blackhole', type = 'append-only', force_append_only = 'true');

statement error Cannot force the sink to be append-only
create sink s4 from t with (connector = 'console', type = 'upsert', force_append_only = 'true');
create sink s4 from t with (connector = 'blackhole', type = 'upsert', force_append_only = 'true');

statement ok
drop sink s1
Expand Down
6 changes: 2 additions & 4 deletions e2e_test/sink/create_sink_as.slt
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# the sink requires a primary index for efficient execution
# which will be enforced by schema precheck in the future

statement ok
CREATE TABLE t4 (v1 int primary key, v2 int);

Expand All @@ -11,7 +8,8 @@ statement ok
CREATE SINK s4 AS select mv4.v1 as v1, mv4.v2 as v2 from mv4 WITH (
connector = 'jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name = 't4'
table.name = 't4',
type = 'upsert'
);

statement ok
Expand Down
6 changes: 4 additions & 2 deletions e2e_test/sink/remote/jdbc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@ statement ok
CREATE SINK s_postgres FROM mv_remote WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db:5432/test?user=test&password=connector',
table.name='t_remote'
table.name='t_remote',
type='upsert'
);

statement ok
CREATE SINK s_mysql FROM mv_remote WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://mysql:3306/test?user=mysqluser&password=mysqlpw',
table.name='t_remote'
table.name='t_remote',
type='upsert'
);

statement ok
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/mysql-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ FROM
target_count WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:mysql://mysql:3306/mydb?user=root&password=123456',
table.name = 'target_count'
table.name = 'target_count',
type = 'upsert'
);
3 changes: 2 additions & 1 deletion integration_tests/postgres-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ FROM
target_count WITH (
connector = 'jdbc',
jdbc.url = 'jdbc:postgresql://postgres:5432/mydb?user=myuser&password=123456',
table.name = 'target_count'
table.name = 'target_count',
type = 'upsert'
);
3 changes: 2 additions & 1 deletion integration_tests/tidb-cdc-sink/create_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@ CREATE SINK hot_hashtags_sink FROM hot_hashtags
WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://tidb:4000/test?user=root&password=',
table.name='hot_hashtags'
table.name='hot_hashtags',
type='upsert'
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector.api.sink;

public class CommonSinkConfig {
private String connector;

public String getConnector() {
return connector;
}

public void setConnector(String connector) {
this.connector = connector;
}
}
5 changes: 1 addition & 4 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,7 @@ def test_upsert_iceberg_sink(input_file):
def test_deltalake_sink(input_file):
test_sink("deltalake",
{"location":"minio://minioadmin:[email protected]:9000/bucket/delta",
"location.type":"minio",
"storage_options.s3a_endpoint":"http://127.0.0.1:9000",
"storage_options.s3a_access_key":"minioadmin",
"storage_options.s3a_secret_key":"minioadmin"},
"location.type":"minio"},
input_file)

if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@
public class FileSink extends SinkBase {
private final FileWriter sinkWriter;

private String sinkPath;
private FileSinkConfig config;

private boolean closed = false;

public FileSink(String sinkPath, TableSchema tableSchema) {
public FileSink(FileSinkConfig config, TableSchema tableSchema) {
super(tableSchema);
this.sinkPath = sinkPath;

String sinkPath = config.getSinkPath();
try {
new File(sinkPath).mkdirs();
Path path = Paths.get(sinkPath, UUID.randomUUID() + ".dat");
Expand All @@ -48,10 +49,12 @@ public FileSink(String sinkPath, TableSchema tableSchema) {
throw INTERNAL.withDescription("failed to create file: " + path)
.asRuntimeException();
}
this.sinkPath = path.toString();
config.setSinkPath(path.toString());
} catch (IOException e) {
throw INTERNAL.withCause(e).asRuntimeException();
}

this.config = config;
}

@Override
Expand Down Expand Up @@ -103,7 +106,7 @@ public void drop() {
}

public String getSinkPath() {
return sinkPath;
return config.getSinkPath();
}

public boolean isClosed() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class FileSinkConfig extends CommonSinkConfig {
private String sinkPath;

@JsonCreator
public FileSinkConfig(@JsonProperty(value = "output.path") String sinkPath) {
this.sinkPath = sinkPath;
}

public String getSinkPath() {
return sinkPath;
}

public void setSinkPath(String sinkPath) {
this.sinkPath = sinkPath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,27 @@

package com.risingwave.connector;

import static io.grpc.Status.*;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.SinkBase;
import com.risingwave.connector.api.sink.SinkFactory;
import com.risingwave.proto.Catalog.SinkType;
import java.util.Map;

public class FileSinkFactory implements SinkFactory {
public static final String OUTPUT_PATH_PROP = "output.path";

@Override
public SinkBase create(TableSchema tableSchema, Map<String, String> tableProperties) {
String sinkPath = tableProperties.get(OUTPUT_PATH_PROP);
return new FileSink(sinkPath, tableSchema);
ObjectMapper mapper = new ObjectMapper();
FileSinkConfig config = mapper.convertValue(tableProperties, FileSinkConfig.class);
return new FileSink(config, tableSchema);
}

@Override
public void validate(
TableSchema tableSchema, Map<String, String> tableProperties, SinkType sinkType) {
if (!tableProperties.containsKey(OUTPUT_PATH_PROP)) {
throw INVALID_ARGUMENT
.withDescription(String.format("%s is not specified", OUTPUT_PATH_PROP))
.asRuntimeException();
}
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_MISSING_CREATOR_PROPERTIES, true);
mapper.convertValue(tableProperties, FileSinkConfig.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.risingwave.proto.ConnectorServiceProto;
import com.risingwave.proto.ConnectorServiceProto.SinkConfig;
import io.grpc.stub.StreamObserver;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +39,30 @@ public void handle(ConnectorServiceProto.ValidateSinkRequest request) {
TableSchema tableSchema = TableSchema.fromProto(sinkConfig.getTableSchema());
SinkFactory sinkFactory = SinkUtils.getSinkFactory(sinkConfig.getConnectorType());
sinkFactory.validate(tableSchema, sinkConfig.getPropertiesMap(), request.getSinkType());

} catch (IllegalArgumentException e) {
LOG.error("sink validation failed", e);
// Extract useful information from the error thrown by Jackson and convert it into a
// more concise message.
String errorMessage = e.getLocalizedMessage();
Pattern missingFieldPattern = Pattern.compile("Missing creator property '([^']*)'");
Pattern unrecognizedFieldPattern = Pattern.compile("Unrecognized field \"([^\"]*)\"");
Matcher missingFieldMatcher = missingFieldPattern.matcher(errorMessage);
Matcher unrecognizedFieldMatcher = unrecognizedFieldPattern.matcher(errorMessage);
if (missingFieldMatcher.find()) {
errorMessage = "missing field `" + missingFieldMatcher.group(1) + "`";
} else if (unrecognizedFieldMatcher.find()) {
errorMessage = "unknown field `" + unrecognizedFieldMatcher.group(1) + "`";
}
responseObserver.onNext(
ConnectorServiceProto.ValidateSinkResponse.newBuilder()
.setError(
ConnectorServiceProto.ValidationError.newBuilder()
.setErrorMessage(errorMessage)
.build())
.build());
responseObserver.onCompleted();

} catch (Exception e) {
LOG.error("sink validation failed", e);
responseObserver.onNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ public void testSync() throws IOException {
Files.createDirectories(Paths.get(path));
}

FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());
String filePath = sink.getSinkPath();

Path file = Paths.get(filePath);
Expand Down Expand Up @@ -76,7 +77,8 @@ public void testWrite() throws IOException {
if (!Paths.get(path).toFile().isDirectory()) {
Files.createDirectories(Paths.get(path));
}
FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());

String filePath = sink.getSinkPath();
try {
Expand Down Expand Up @@ -104,7 +106,8 @@ public void testDrop() throws IOException {
if (!Paths.get(path).toFile().isDirectory()) {
Files.createDirectories(Paths.get(path));
}
FileSink sink = new FileSink(path, TableSchema.getMockTableSchema());
FileSinkConfig config = new FileSinkConfig(path);
FileSink sink = new FileSink(config, TableSchema.getMockTableSchema());

sink.drop();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2023 RisingWave Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.risingwave.connector;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.risingwave.connector.api.sink.CommonSinkConfig;

public class DeltaLakeSinkConfig extends CommonSinkConfig {
private String location;

private String locationType;

private String sinkType;

@JsonProperty(value = "force_append_only")
private Boolean forceAppendOnly;

@JsonCreator
public DeltaLakeSinkConfig(
@JsonProperty(value = "location") String location,
@JsonProperty(value = "location.type") String locationType,
@JsonProperty(value = "type") String sinkType) {
this.location = location;
this.locationType = locationType;
this.sinkType = sinkType;
}

public String getLocation() {
return location;
}

public void setLocation(String location) {
this.location = location;
}

public String getLocationType() {
return locationType;
}

public void setLocationType(String locationType) {
this.locationType = locationType;
}
}
Loading

0 comments on commit 806c623

Please sign in to comment.