diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java index fe0b35a15b048..e1ed443c67fc6 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/PostgresSourceTest.java @@ -17,12 +17,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import com.risingwave.proto.ConnectorServiceProto; -import io.grpc.Grpc; -import io.grpc.InsecureChannelCredentials; -import io.grpc.Server; -import io.grpc.ServerBuilder; +import com.risingwave.proto.Data; +import io.grpc.*; import java.io.IOException; import java.sql.Connection; import java.sql.ResultSet; @@ -41,7 +40,7 @@ public class PostgresSourceTest { private static final Logger LOG = LoggerFactory.getLogger(PostgresSourceTest.class.getName()); private static final PostgreSQLContainer pg = - new PostgreSQLContainer<>("postgres:12.3-alpine") + new PostgreSQLContainer<>("postgres:15-alpine") .withDatabaseName("test") .withUsername("postgres") .withCommand("postgres -c wal_level=logical -c max_wal_senders=10"); @@ -145,6 +144,69 @@ public void testLines() throws InterruptedException, SQLException { connection.close(); } + // test whether validation catches permission errors + @Test + public void testPermissionCheck() { + Connection connection = SourceTestClient.connect(pgDataSource); + String query = + "CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))"; + SourceTestClient.performQuery(connection, query); + // create a partial publication, check whether error is reported + query = "CREATE PUBLICATION dbz_publication FOR TABLE orders (o_key)"; + SourceTestClient.performQuery(connection, query); + ConnectorServiceProto.TableSchema tableSchema = + ConnectorServiceProto.TableSchema.newBuilder() + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("o_key") + .setDataType(Data.DataType.TypeName.INT64) + .build()) + .addColumns( + ConnectorServiceProto.TableSchema.Column.newBuilder() + .setName("o_val") + .setDataType(Data.DataType.TypeName.INT32) + .build()) + .addPkIndices(0) + .build(); + Iterator eventStream1 = + testClient.getEventStreamValidate( + pg, + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + StatusRuntimeException exception1 = + assertThrows( + StatusRuntimeException.class, + () -> { + eventStream1.hasNext(); + }); + assertEquals( + exception1.getMessage(), + "INVALID_ARGUMENT: INTERNAL: The publication 'dbz_publication' does not cover all necessary columns in table orders"); + query = "DROP PUBLICATION dbz_publication"; + SourceTestClient.performQuery(connection, query); + // revoke superuser and replication, check if reports error + query = "ALTER USER " + pg.getUsername() + " nosuperuser noreplication"; + SourceTestClient.performQuery(connection, query); + Iterator eventStream2 = + testClient.getEventStreamValidate( + pg, + ConnectorServiceProto.SourceType.POSTGRES, + tableSchema, + "test", + "orders"); + StatusRuntimeException exception2 = + assertThrows( + StatusRuntimeException.class, + () -> { + eventStream2.hasNext(); + }); + assertEquals( + exception2.getMessage(), + "INVALID_ARGUMENT: INTERNAL: Postgres user must be superuser or replication role to start walsender."); + } + // generates test cases for the risingwave debezium parser @Ignore @Test diff --git a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java index ea0a601fbf699..950e847f6d65e 100644 --- a/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java +++ b/java/connector-node/risingwave-source-test/src/test/java/com/risingwave/connector/SourceTestClient.java @@ -88,6 +88,40 @@ protected static DataSource getDataSource(JdbcDatabaseContainer container) { return new HikariDataSource(hikariConfig); } + protected Iterator getEventStreamValidate( + JdbcDatabaseContainer container, + ConnectorServiceProto.SourceType sourceType, + ConnectorServiceProto.TableSchema tableSchema, + String databaseName, + String tableName) { + String port = String.valueOf(URI.create(container.getJdbcUrl().substring(5)).getPort()); + ConnectorServiceProto.GetEventStreamRequest req = + ConnectorServiceProto.GetEventStreamRequest.newBuilder() + .setValidate( + ConnectorServiceProto.GetEventStreamRequest.ValidateProperties + .newBuilder() + .setSourceId(0) + .setSourceType(sourceType) + .setTableSchema(tableSchema) + .putProperties("hostname", container.getHost()) + .putProperties("port", port) + .putProperties("username", container.getUsername()) + .putProperties("password", container.getPassword()) + .putProperties("database.name", databaseName) + .putProperties("table.name", tableName) + .putProperties("schema.name", "public") // pg only + .putProperties("slot.name", "orders") // pg only + .putProperties("server.id", "1")) // mysql only + .build(); + Iterator responses = null; + try { + responses = blockingStub.getEventStream(req); + } catch (StatusRuntimeException e) { + fail("RPC failed: {}", e.getStatus()); + } + return responses; + } + protected Iterator getEventStreamStart( JdbcDatabaseContainer container, ConnectorServiceProto.SourceType sourceType,