Skip to content

Commit

Permalink
fixing a bug related to collection-unaware/topic-agnostic CDC handler…
Browse files Browse the repository at this point in the history
… config (#83)
  • Loading branch information
hpgrahsl authored May 31, 2019
1 parent aea167d commit 66c3405
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>at.grahsl.kafka.connect</groupId>
<artifactId>kafka-connect-mongodb</artifactId>
<version>1.3.1</version>
<version>1.3.2-SNAPSHOT</version>
<packaging>jar</packaging>

<name>kafka-connect-mongodb</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ public Map<String,CdcHandler> getCdcHandlers() {

Map<String, CdcHandler> cdcHandlers = new HashMap<>();

if(isUsingCdcHandler("")) {
cdcHandlers.put(TOPIC_AGNOSTIC_KEY_NAME,getCdcHandler(""));
}

splitAndTrimAndRemoveConfigListEntries(getString(MONGODB_COLLECTIONS_CONF))
.forEach(collection -> {
CdcHandler candidate = cdcHandlers.put(collection,getCdcHandler(collection));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ Map<String, MongoDbSinkRecordBatches> createSinkRecordBatchesPerTopic(Collection
LOGGER.debug("building CDC write model for {} record(s) into collection {}", records.size(), collectionName);
return records.stream()
.map(sinkConverter::convert)
.map(cdcHandlers.get(collectionName)::handle)
.map(cdcHandlers.getOrDefault(collectionName,
cdcHandlers.get(MongoDbSinkConnectorConfig.TOPIC_AGNOSTIC_KEY_NAME))::handle)
.flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty))
.collect(Collectors.toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import at.grahsl.kafka.connect.mongodb.cdc.CdcHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.mongodb.MongoDbHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.RdbmsHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.mysql.MysqlHandler;
import at.grahsl.kafka.connect.mongodb.cdc.debezium.rdbms.postgres.PostgresHandler;
import at.grahsl.kafka.connect.mongodb.processor.*;
Expand Down Expand Up @@ -615,7 +616,7 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS

List<DynamicTest> tests = new ArrayList<>();

Map<String, Class> canditates = new HashMap<String, Class>() {{
Map<String, Class> candidates = new HashMap<String, Class>() {{
put("", ReplaceOneDefaultStrategy.class);
put("collection-1", ReplaceOneDefaultStrategy.class);
put("collection-2", ReplaceOneBusinessKeyStrategy.class);
Expand All @@ -624,10 +625,10 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS
}};

HashMap<String,String> map = new HashMap<>();
canditates.entrySet().forEach(entry ->
candidates.entrySet().forEach(entry ->
map.put(MONGODB_WRITEMODEL_STRATEGY+"."+entry.getKey(),entry.getValue().getName())
);
map.put(MONGODB_COLLECTIONS_CONF, canditates.keySet().stream()
map.put(MONGODB_COLLECTIONS_CONF, candidates.keySet().stream()
.collect(Collectors.joining(FIELD_LIST_SPLIT_CHAR)));

MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Expand All @@ -640,9 +641,9 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidWriteModelS
: "check write model strategy for config "+MONGODB_WRITEMODEL_STRATEGY +"."+entry.getKey(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(entry.getValue(), "write model strategy was null"),
() -> assertTrue(canditates.get(TOPIC_AGNOSTIC_KEY_NAME.equals(entry.getKey())
() -> assertTrue(candidates.get(TOPIC_AGNOSTIC_KEY_NAME.equals(entry.getKey())
? "" : entry.getKey()).isInstance(entry.getValue()),
"write model strategy NOT of type " + canditates.get(entry.getKey()))
"write model strategy NOT of type " + candidates.get(entry.getKey()))
))
)
);
Expand All @@ -658,6 +659,7 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {

HashMap<String,Class> candidates = new HashMap<String,Class>() {{
put(MongoDbHandler.class.getName(),MongoDbHandler.class);
put(RdbmsHandler.class.getName(), RdbmsHandler.class);
put(MysqlHandler.class.getName(),MysqlHandler.class);
put(PostgresHandler.class.getName(),PostgresHandler.class);
}};
Expand All @@ -666,8 +668,8 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {
HashMap<String,String> map = new HashMap<>();
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER,entry.getKey());
MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
CdcHandler cdc = cfg.getCdcHandler();
tests.add(dynamicTest("check cdc handler for config"
CdcHandler cdc = cfg.getCdcHandler("");
tests.add(dynamicTest("check cdc handler for config "
+ MONGODB_CHANGE_DATA_CAPTURE_HANDLER + "="+entry.getKey(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(cdc, "cdc handler was null"),
Expand All @@ -677,9 +679,9 @@ public Collection<DynamicTest> testGetSingleValidCdcHandler() {
));
});

tests.add(dynamicTest("check cdc handler for config"
tests.add(dynamicTest("check cdc handler for config "
+ MONGODB_CHANGE_DATA_CAPTURE_HANDLER + "=",
() -> assertNull(new MongoDbSinkConnectorConfig(new HashMap<>()).getCdcHandler(),
() -> assertNull(new MongoDbSinkConnectorConfig(new HashMap<>()).getCdcHandler(""),
"cdc handler was not null")
)
);
Expand All @@ -693,34 +695,70 @@ public Collection<DynamicTest> testGetMultipleCollectionSpecificValidCdcHandlers

List<DynamicTest> tests = new ArrayList<>();

Map<String, Class> canditates = new HashMap<String, Class>() {{
Map<String, Class> candidates = new HashMap<String, Class>() {{
put("collection-1", MongoDbHandler.class);
put("collection-2", MysqlHandler.class);
put("collection-3", PostgresHandler.class);
put("collection-2", RdbmsHandler.class);
put("collection-3", MysqlHandler.class);
put("collection-4", PostgresHandler.class);
}};

HashMap<String,String> map = new HashMap<>();
canditates.entrySet().forEach(entry ->
candidates.entrySet().forEach(entry ->
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER+"."+entry.getKey(),entry.getValue().getName())
);
map.put(MONGODB_COLLECTIONS_CONF, canditates.keySet().stream()
map.put(MONGODB_COLLECTIONS_CONF, candidates.keySet().stream()
.collect(Collectors.joining(FIELD_LIST_SPLIT_CHAR)));

MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Map<String, CdcHandler> cdc = cfg.getCdcHandlers();

cdc.entrySet().forEach(entry ->
tests.add(dynamicTest("check cdc handler for config " +
MONGODB_CHANGE_DATA_CAPTURE_HANDLER +"."+entry.getKey(),
MONGODB_CHANGE_DATA_CAPTURE_HANDLER +"."+entry.getKey()
+"="+entry.getValue().getClass().getName(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(entry.getValue(), "cdc handler was null"),
() -> assertTrue(canditates.get(entry.getKey()).isInstance(entry.getValue()),
"cdc handler NOT of type " + canditates.get(entry.getKey()))
() -> assertTrue(candidates.get(entry.getKey()).isInstance(entry.getValue()),
"cdc handler NOT of type " + candidates.get(entry.getKey()))
))
)
);

return tests;
}

@TestFactory
@DisplayName("test collection unspecific CDC handlers")
public Collection<DynamicTest> testCollectionUnspecificCdcHandlers() {

List<DynamicTest> tests = new ArrayList<>();

Set<Class> candidates = new HashSet<Class>() {{
add(MongoDbHandler.class);
add(RdbmsHandler.class);
add(MysqlHandler.class);
add(PostgresHandler.class);
}};

candidates.forEach(entry -> {

HashMap<String,String> map = new HashMap<>();
map.put(MONGODB_CHANGE_DATA_CAPTURE_HANDLER,entry.getName());
map.put(MONGODB_COLLECTION_CONF,"whatever");
MongoDbSinkConnectorConfig cfg = new MongoDbSinkConnectorConfig(map);
Map<String, CdcHandler> cdc = cfg.getCdcHandlers();
CdcHandler which = cdc.getOrDefault(cfg.getString(MONGODB_COLLECTION_CONF),cdc.get(TOPIC_AGNOSTIC_KEY_NAME));
tests.add(dynamicTest("check cdc handler for config " +
MONGODB_CHANGE_DATA_CAPTURE_HANDLER+"="+entry.getName(),
() -> assertAll("check for non-null and correct type",
() -> assertNotNull(which, "cdc handler was null"),
() -> assertTrue(entry.isInstance(which),
"cdc handler NOT of type " + entry)
)));
});

return tests;

}

}

0 comments on commit 66c3405

Please sign in to comment.