Skip to content

Commit

Permalink
[pulsar-io] pass client builder if no service url provided to debeziu…
Browse files Browse the repository at this point in the history
…m connector (#12145) (#14040)

# Conflicts:
#	pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
#	pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
#	tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
#	tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java

### Motivation

#11293 allows to passing client builder to debezium database history, but it still requires passing `database.history.pulsar.service.url` as well. With client builder, the `database.history.pulsar.service.url` is not been used anymore. 
This PR fixes the logic and only pass client builder with no `database.history.pulsar.service.url` provided. 

Cherry-pick #12145 into branch-2.8
  • Loading branch information
freeznet authored Feb 11, 2022
1 parent 0a635c5 commit 5f9fd68
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.relational.history.DatabaseHistory;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.io.core.SourceContext;
Expand Down Expand Up @@ -50,10 +51,7 @@ public static void throwExceptionIfConfigNotMatch(Map<String, Object> config,
}

public static void setConfigIfNull(Map<String, Object> config, String key, String value) {
Object orig = config.get(key);
if (orig == null) {
config.put(key, value);
}
config.putIfAbsent(key, value);
}

// namespace for output topics, default value is "tenant/namespace"
Expand All @@ -79,12 +77,8 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
// database.history : implementation class for database history.
setConfigIfNull(config, HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY);

// database.history.pulsar.service.url, this is set as the value of pulsar.service.url if null.
String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG);
if (serviceUrl == null) {
throw new IllegalArgumentException("Pulsar service URL not provided.");
}
setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl);
// database.history.pulsar.service.url
String pulsarUrl = (String) config.get(PulsarDatabaseHistory.SERVICE_URL.name());

String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
Expand All @@ -98,8 +92,12 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);

config.put(DatabaseHistory.CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.client.builder",
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder()));
// pass pulsar.client.builder if database.history.pulsar.service.url is not provided
if (StringUtils.isEmpty(pulsarUrl)) {
String pulsarClientBuilder = SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), pulsarClientBuilder);
}

super.open(config, sourceContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ public void configure(
+ getClass().getSimpleName() + "; check the logs for details");
}
this.topicName = config.getString(TOPIC);
if (config.getString(CLIENT_BUILDER) == null && config.getString(SERVICE_URL) == null) {

String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
if (isBlank(clientBuilderBase64Encoded) && isBlank(config.getString(SERVICE_URL))) {
throw new IllegalArgumentException("Neither Pulsar Service URL nor ClientBuilder provided.");
}
String clientBuilderBase64Encoded = config.getString(CLIENT_BUILDER);
this.clientBuilder = PulsarClient.builder();
if (null != clientBuilderBase64Encoded) {
if (!isBlank(clientBuilderBase64Encoded)) {
// deserialize the client builder to the same classloader
this.clientBuilder = (ClientBuilder) SerDeUtils.deserialize(clientBuilderBase64Encoded, this.clientBuilder.getClass().getClassLoader());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class DebeziumMySqlSourceTester extends SourceTester<DebeziumMySQLContain

private final PulsarCluster pulsarCluster;

public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName) {
public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassName,
boolean testWithClientBuilder) {
super(NAME);
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT;
Expand All @@ -61,11 +62,13 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster, String converterClassNam
sourceConfig.put("database.server.id", "184054");
sourceConfig.put("database.server.name", "dbserver1");
sourceConfig.put("database.whitelist", "inventory");
sourceConfig.put("pulsar.service.url", pulsarServiceUrl);
if (!testWithClientBuilder) {
sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl);
}
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-" +
(converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
sourceConfig.put("topic.namespace", "debezium/mysql-"
+ (converterClassName.endsWith("AvroConverter") ? "avro" : "json"));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,26 +42,33 @@ public class PulsarDebeziumSourcesTest extends PulsarIOTestBase {

@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, false);
}

@Test(groups = "source")
public void testDebeziumMySqlSourceJsonWithClientBuilder() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true, true);
}

@Test(groups = "source")
public void testDebeziumMySqlSourceAvro() throws Exception {
testDebeziumMySqlConnect(
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false);
"org.apache.pulsar.kafka.shade.io.confluent.connect.avro.AvroConverter", false, false);
}

@Test(groups = "source")
public void testDebeziumPostgreSqlSource() throws Exception {
testDebeziumPostgreSqlConnect("org.apache.kafka.connect.json.JsonConverter", true);
}


@Test(groups = "source")
public void testDebeziumMongoDbSource() throws Exception{
testDebeziumMongoDbConnect("org.apache.kafka.connect.json.JsonConverter", true);
}

private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope) throws Exception {
private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWithEnvelope,
boolean testWithClientBuilder) throws Exception {

final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
Expand Down Expand Up @@ -96,7 +103,7 @@ private void testDebeziumMySqlConnect(String converterClassName, boolean jsonWit
admin.topics().createNonPartitionedTopic(outputTopicName);

@Cleanup
DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName);
DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster, converterClassName, testWithClientBuilder);
sourceTester.getSourceConfig().put("json-with-envelope", jsonWithEnvelope);

// setup debezium mysql server
Expand Down

0 comments on commit 5f9fd68

Please sign in to comment.