Skip to content

Commit

Permalink
[pulsar-io] pass the pulsar service url to debezium source for histor…
Browse files Browse the repository at this point in the history
…y database (apache#11251)

### Motivation

The Debezium requires pulsar a service URL for history database usage. 

In apache#11056 , the `service.url` field from `PulsarKafkaWorkerConfig` is no longer available. And the value is also deleted from multiple yaml config files in this [commit](apache@3ce24c9). This causes the integration test for Debezium connector to fail.

Based on the Debezium [paradigm](https://debezium.io/documentation/reference/1.5/connectors/mysql.html#debezium-mysql-connector-database-history-configuration-properties), all configurations should be passed as strings. There's no easy way to inject a PulsarClient via configuration.

We need to ask user to provide the pulsar url explicitly and probably auth info also.


### Modifications

1. Make the `database.history.pulsar.service.url` field required
2. Add the config value back to example yaml files
3. Update the integration test config

### Verifying this change

- [ ] Make sure that the change passes the CI checks.
  • Loading branch information
nlu90 authored and ciaocloud committed Oct 16, 2021
1 parent 006d932 commit 2846456
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());
if (StringUtils.isEmpty(pulsarUrl)) {
throw new IllegalArgumentException("Pulsar service URL for History Database not provided.");
}

String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ configs:
mongodb.password: "dbz"
mongodb.task.id: "1"
database.whitelist: "inventory"

database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ configs:
database.whitelist: "inventory"

database.history.pulsar.topic: "mysql-history-topic"
database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
offset.storage.topic: "mysql-offset-topic"


Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,4 @@ configs:
database.server.name: "dbserver1"
schema.whitelist: "inventory"



database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) {
sourceConfig.put("mongodb.password", "dbz");
sourceConfig.put("mongodb.task.id","1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mongodb");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) {
sourceConfig.put("database.dbname", "postgres");
sourceConfig.put("schema.whitelist", "inventory");
sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom");
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/postgresql");
}

Expand Down

0 comments on commit 2846456

Please sign in to comment.