Skip to content

Commit

Permalink
fix(connector): add postgres permission checks in validation phase (r…
Browse files Browse the repository at this point in the history
  • Loading branch information
WillyKidd authored Mar 20, 2023
1 parent ce9e519 commit 32f4925
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,159 @@ private void validateDbProperties(
}
}
}
// check whether user is superuser or replication role
try (var stmt =
conn.prepareStatement(sqlStmts.getProperty("postgres.role.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must be superuser or replication role to start walsender."));
}
}
}
// check whether user has select privilege on table for initial snapshot
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.table_privilege.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.TABLE_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must have select privilege on table "
+ props.get(
DbzConnectorConfig.TABLE_NAME)));
}
}
}
// check whether publication exists
boolean publicationExists = false;
boolean partialPublication = false;
try (var stmt = conn.createStatement()) {
var res =
stmt.executeQuery(
sqlStmts.getProperty("postgres.publication_att_exists"));
while (res.next()) {
partialPublication = res.getBoolean(1);
}
}
// pg 15 and up supports partial publication of table
// check whether publication covers all columns
if (partialPublication) {
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.publication_att"))) {
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
String[] columnsPub =
(String[]) res.getArray("attnames").getArray();
var sourceSchema = validate.getTableSchema();
for (int i = 0; i < sourceSchema.getColumnsCount(); i++) {
String columnName = sourceSchema.getColumns(i).getName();
if (Arrays.stream(columnsPub).noneMatch(columnName::equals)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"The publication 'dbz_publication' does not cover all necessary columns in table "
+ props.get(
DbzConnectorConfig
.TABLE_NAME)));
}
if (i == sourceSchema.getColumnsCount() - 1) {
publicationExists = true;
}
}
if (publicationExists) {
LOG.info("publication exists");
break;
}
}
}
} else { // check directly whether publication exists
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.publication_cnt"))) {
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
if (res.getInt("count") > 0) {
publicationExists = true;
LOG.info("publication exists");
break;
}
}
}
}
// if publication does not exist, check permission to create publication
if (!publicationExists) {
// check create privilege on database
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty(
"postgres.database_privilege.check"))) {
stmt.setString(1, props.get(DbzConnectorConfig.USER));
stmt.setString(2, props.get(DbzConnectorConfig.DB_NAME));
stmt.setString(3, props.get(DbzConnectorConfig.USER));
var res = stmt.executeQuery();
while (res.next()) {
if (!res.getBoolean(1)) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must have create privilege on database"
+ props.get(
DbzConnectorConfig.DB_NAME)));
}
}
}
// check ownership on table
boolean isTableOwner = false;
String owner = null;
// check if user is owner
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.table_owner"))) {
stmt.setString(1, props.get(DbzConnectorConfig.PG_SCHEMA_NAME));
stmt.setString(2, props.get(DbzConnectorConfig.TABLE_NAME));
var res = stmt.executeQuery();
while (res.next()) {
owner = res.getString("tableowner");
if (owner.equals(props.get(DbzConnectorConfig.USER))) {
isTableOwner = true;
break;
}
}
}
// if user is not owner, check if user belongs to owner group
if (!isTableOwner && !owner.isEmpty()) {
try (var stmt =
conn.prepareStatement(
sqlStmts.getProperty("postgres.users_of_group"))) {
stmt.setString(1, owner);
var res = stmt.executeQuery();
while (res.next()) {
String[] users = (String[]) res.getArray("members").getArray();
if (Arrays.stream(users)
.anyMatch(props.get(DbzConnectorConfig.USER)::equals)) {
isTableOwner = true;
break;
}
}
}
}
if (!isTableOwner) {
throw new StatusException(
Status.INTERNAL.withDescription(
"Postgres user must be owner of table "
+ props.get(DbzConnectorConfig.TABLE_NAME)));
}
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,28 @@ postgres.table=SELECT EXISTS ( SELECT FROM pg_tables WHERE schemaname = ? AND ta
postgres.pk=SELECT a.attname, format_type(a.atttypid, a.atttypmod) AS data_type FROM pg_index i JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey) WHERE i.indrelid = ?::regclass AND i.indisprimary
postgres.table_schema=SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ? ORDER BY ordinal_position
postgres.slot.check=SELECT slot_name FROM pg_replication_slots WHERE slot_name = ?
postgres.role.check=SELECT rolreplication OR rolsuper FROM pg_roles WHERE rolname = ?
postgres.database_privilege.check=SELECT has_database_privilege(?, ?, 'create') FROM pg_roles WHERE rolname = ?
postgres.table_privilege.check=SELECT (COUNT(*) = 1) FROM information_schema.role_table_grants WHERE table_name = ? AND grantee = ? and privilege_type = 'SELECT'
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_att=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication'
postgres.publication_cnt=SELECT COUNT(*) AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = 'dbz_publication'
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
INNER JOIN pg_roles r2 ON r2.oid = am.member \
WHERE r1.rolname = ? \
GROUP BY r1.rolname \
) \
UNION ALL ( \
WITH groups AS (SELECT DISTINCT(UNNEST(m)) AS g FROM base) \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
INNER JOIN pg_roles r2 ON r2.oid = am.member \
INNER JOIN groups ON r1.rolname = groups.g \
GROUP BY r1.rolname \
) \
), \
tmp AS (SELECT DISTINCT(UNNEST(m)) AS members FROM base) \
SELECT ARRAY_AGG(members) AS members FROM tmp
12 changes: 12 additions & 0 deletions java/connector-node/risingwave-source-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
</properties>

<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down

0 comments on commit 32f4925

Please sign in to comment.