diff --git a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java index 4f7cc8592d9bf5..54b02280571997 100644 --- a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java +++ b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java @@ -78,6 +78,12 @@ public void open(Map config, SourceContext sourceContext) throws // database.history : implementation class for database history. setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); + // database.history.pulsar.service.url + String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name()); + if (StringUtils.isEmpty(pulsarUrl)) { + throw new IllegalArgumentException("Pulsar service URL for History Database not provided."); + } + String topicNamespace = topicNamespace(sourceContext); // topic.namespace setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); diff --git a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml index 6350e202fd3895..8c1564bf4e63a6 100644 --- a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml +++ b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml @@ -33,3 +33,5 @@ configs: mongodb.password: "dbz" mongodb.task.id: "1" database.whitelist: "inventory" + + database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" diff --git a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml index f581700fef4c37..7a51b0092ac22a 100644 --- a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml +++ b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml @@ -36,6 +36,7 @@ configs: database.whitelist: "inventory" database.history.pulsar.topic: "mysql-history-topic" + database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" offset.storage.topic: "mysql-offset-topic" diff --git a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml index 151b409951422f..3f6b7eaaba21c6 100644 --- a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml +++ b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml @@ -35,5 +35,4 @@ configs: database.server.name: "dbserver1" schema.whitelist: "inventory" - - + database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java index 6eb4f06777ea94..110ff11c00d070 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java @@ -50,7 +50,7 @@ public DebeziumMongoDbSourceTester(PulsarCluster cluster) { sourceConfig.put("mongodb.password", "dbz"); sourceConfig.put("mongodb.task.id","1"); sourceConfig.put("database.whitelist", "inventory"); - sourceConfig.put("pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/mongodb"); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java index ec7e07ccf0be2d..3cb64db8a7de04 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java @@ -61,7 +61,7 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam sourceConfig.put("database.server.id", "184054"); sourceConfig.put("database.server.name", "dbserver1"); sourceConfig.put("database.whitelist", "inventory"); - sourceConfig.put("pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("key.converter", converterClassName); sourceConfig.put("value.converter", converterClassName); sourceConfig.put("topic.namespace", "debezium/mysql-" + diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java index 192597e7827180..4b66d00709d23e 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java @@ -63,7 +63,7 @@ public DebeziumPostgreSqlSourceTester(PulsarCluster cluster) { sourceConfig.put("database.dbname", "postgres"); sourceConfig.put("schema.whitelist", "inventory"); sourceConfig.put("table.blacklist", "inventory.spatial_ref_sys,inventory.geom"); - sourceConfig.put("pulsar.service.url", pulsarServiceUrl); + sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); sourceConfig.put("topic.namespace", "debezium/postgresql"); }