From facbab272fb411ec3f056720eaab1df05ccefb9c Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Mon, 10 Jun 2024 23:02:08 +0200 Subject: [PATCH] KAFKA-9228: Restart tasks on runtime-only connector config changes (#16053) Reviewers: Greg Harris --- .../kafka/connect/runtime/AbstractHerder.java | 11 +- .../storage/AppliedConnectorConfig.java | 66 +++++++++ .../connect/storage/ClusterConfigState.java | 20 +++ .../storage/KafkaConfigBackingStore.java | 12 +- .../storage/MemoryConfigBackingStore.java | 18 +++ .../ConnectWorkerIntegrationTest.java | 135 ++++++++++++++++++ .../connect/runtime/AbstractHerderTest.java | 41 ++++++ .../kafka/connect/runtime/WorkerTest.java | 5 + .../connect/runtime/WorkerTestUtils.java | 10 +- .../distributed/DistributedHerderTest.java | 26 ++++ .../IncrementalCooperativeAssignorTest.java | 7 + .../distributed/WorkerCoordinatorTest.java | 10 ++ .../standalone/StandaloneHerderTest.java | 26 +++- ...rg.apache.kafka.connect.sink.SinkConnector | 3 +- 14 files changed, 383 insertions(+), 7 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index c6aeea80a26f9..e50019676ac2f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1045,7 +1045,8 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String if (rawTaskProps.size() != currentNumTasks) { log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size()); result = true; - } else { + } + if (!result) { for (int index = 0; index < currentNumTasks; index++) { ConnectorTaskId taskId = new ConnectorTaskId(connName, index); if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) { @@ -1054,6 +1055,14 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String } } } + if (!result) { + Map appliedConnectorConfig = configState.appliedConnectorConfig(connName); + Map currentConnectorConfig = configState.connectorConfig(connName); + if (!Objects.equals(appliedConnectorConfig, currentConnectorConfig)) { + log.debug("Forcing task restart for connector {} as its configuration appears to be updated", connName); + result = true; + } + } if (result) { log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", connName); } else { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java new file mode 100644 index 0000000000000..22f20e4b4a198 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/AppliedConnectorConfig.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.runtime.WorkerConfigTransformer; + +import java.util.Map; + +/** + * Wrapper class for a connector configuration that has been used to generate task configurations + * Supports lazy {@link WorkerConfigTransformer#transform(Map) transformation}. + */ +public class AppliedConnectorConfig { + + private final Map rawConfig; + private Map transformedConfig; + + /** + * Create a new applied config that has not yet undergone + * {@link WorkerConfigTransformer#transform(Map) transformation}. + * @param rawConfig the non-transformed connector configuration; may be null + */ + public AppliedConnectorConfig(Map rawConfig) { + this.rawConfig = rawConfig; + } + + /** + * If necessary, {@link WorkerConfigTransformer#transform(Map) transform} the raw + * connector config, then return the result. Transformed configurations are cached and + * returned in all subsequent calls. + *

+ * This method is thread-safe: different threads may invoke it at any time and the same + * transformed config should always be returned, with transformation still only ever + * taking place once before its results are cached. + * @param configTransformer the transformer to use, if no transformed connector + * config has been cached yet; may be null + * @return the possibly-cached, transformed, connector config; may be null + */ + public synchronized Map transformedConfig(WorkerConfigTransformer configTransformer) { + if (transformedConfig != null || rawConfig == null) + return transformedConfig; + + if (configTransformer != null) { + transformedConfig = configTransformer.transform(rawConfig); + } else { + transformedConfig = rawConfig; + } + + return transformedConfig; + } + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java index 1025373042f11..df5b00926fc7a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/ClusterConfigState.java @@ -43,6 +43,7 @@ public class ClusterConfigState { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()); @@ -55,6 +56,7 @@ public class ClusterConfigState { final Map> taskConfigs; final Map connectorTaskCountRecords; final Map connectorTaskConfigGenerations; + final Map appliedConnectorConfigs; final Set connectorsPendingFencing; final Set inconsistentConnectors; @@ -66,6 +68,7 @@ public ClusterConfigState(long offset, Map> taskConfigs, Map connectorTaskCountRecords, Map connectorTaskConfigGenerations, + Map appliedConnectorConfigs, Set connectorsPendingFencing, Set inconsistentConnectors) { this(offset, @@ -76,6 +79,7 @@ public ClusterConfigState(long offset, taskConfigs, connectorTaskCountRecords, connectorTaskConfigGenerations, + appliedConnectorConfigs, connectorsPendingFencing, inconsistentConnectors, null); @@ -89,6 +93,7 @@ public ClusterConfigState(long offset, Map> taskConfigs, Map connectorTaskCountRecords, Map connectorTaskConfigGenerations, + Map appliedConnectorConfigs, Set connectorsPendingFencing, Set inconsistentConnectors, WorkerConfigTransformer configTransformer) { @@ -100,6 +105,7 @@ public ClusterConfigState(long offset, this.taskConfigs = taskConfigs; this.connectorTaskCountRecords = connectorTaskCountRecords; this.connectorTaskConfigGenerations = connectorTaskConfigGenerations; + this.appliedConnectorConfigs = appliedConnectorConfigs; this.connectorsPendingFencing = connectorsPendingFencing; this.inconsistentConnectors = inconsistentConnectors; this.configTransformer = configTransformer; @@ -158,6 +164,19 @@ public Map rawConnectorConfig(String connector) { return connectorConfigs.get(connector); } + /** + * Get the most recent configuration for the connector from which task configs have + * been generated. The configuration will have been transformed by + * {@link org.apache.kafka.common.config.ConfigTransformer} + * @param connector name of the connector + * @return the connector config, or null if no config exists from which task configs have + * been generated + */ + public Map appliedConnectorConfig(String connector) { + AppliedConnectorConfig appliedConfig = appliedConnectorConfigs.get(connector); + return appliedConfig != null ? appliedConfig.transformedConfig(configTransformer) : null; + } + /** * Get the target state of the connector * @param connector name of the connector @@ -303,4 +322,5 @@ public int hashCode() { inconsistentConnectors, configTransformer); } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java index 7981f4425dcb4..1360765964814 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java @@ -318,6 +318,7 @@ public static String LOGGER_CLUSTER_KEY(String namespace) { final Map connectorTaskCountRecords = new HashMap<>(); final Map connectorTaskConfigGenerations = new HashMap<>(); + final Map appliedConnectorConfigs = new HashMap<>(); final Set connectorsPendingFencing = new HashSet<>(); private final WorkerConfigTransformer configTransformer; @@ -478,6 +479,7 @@ public ClusterConfigState snapshot() { new HashMap<>(taskConfigs), new HashMap<>(connectorTaskCountRecords), new HashMap<>(connectorTaskConfigGenerations), + new HashMap<>(appliedConnectorConfigs), new HashSet<>(connectorsPendingFencing), new HashSet<>(inconsistent), configTransformer @@ -1065,7 +1067,8 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value // but compaction took place and both the original connector config and the // tombstone message for it have been removed from the config topic // We should ignore these task configs - if (!connectorConfigs.containsKey(connectorName)) { + Map appliedConnectorConfig = connectorConfigs.get(connectorName); + if (appliedConnectorConfig == null) { processConnectorRemoval(connectorName); log.debug( "Ignoring task configs for connector {}; it appears that the connector was deleted previously " @@ -1123,6 +1126,11 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value connectorTaskConfigGenerations.compute(connectorName, (ignored, generation) -> generation != null ? generation + 1 : 0); } inconsistent.remove(connectorName); + + appliedConnectorConfigs.put( + connectorName, + new AppliedConnectorConfig(appliedConnectorConfig) + ); } // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent // update, then we need to see a completely fresh set of configs after this commit message, so we don't @@ -1261,6 +1269,7 @@ private void processConnectorRemoval(String connectorName) { connectorTaskCounts.remove(connectorName); taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName)); deferredTaskUpdates.remove(connectorName); + appliedConnectorConfigs.remove(connectorName); } private ConnectorTaskId parseTaskId(String key) { @@ -1333,5 +1342,6 @@ else if (value instanceof Long) else throw new ConnectException("Expected integer value to be either Integer or Long"); } + } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java index 3b9ba966ca20a..0fe3a5d8117c7 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryConfigBackingStore.java @@ -21,6 +21,8 @@ import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.WorkerConfigTransformer; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; @@ -36,6 +38,8 @@ */ public class MemoryConfigBackingStore implements ConfigBackingStore { + private static final Logger log = LoggerFactory.getLogger(MemoryConfigBackingStore.class); + private final Map connectors = new HashMap<>(); private UpdateListener updateListener; private WorkerConfigTransformer configTransformer; @@ -61,6 +65,7 @@ public synchronized ClusterConfigState snapshot() { Map> connectorConfigs = new HashMap<>(); Map connectorTargetStates = new HashMap<>(); Map> taskConfigs = new HashMap<>(); + Map appliedConnectorConfigs = new HashMap<>(); for (Map.Entry connectorStateEntry : connectors.entrySet()) { String connector = connectorStateEntry.getKey(); @@ -69,6 +74,9 @@ public synchronized ClusterConfigState snapshot() { connectorConfigs.put(connector, connectorState.connConfig); connectorTargetStates.put(connector, connectorState.targetState); taskConfigs.putAll(connectorState.taskConfigs); + if (connectorState.appliedConnConfig != null) { + appliedConnectorConfigs.put(connector, connectorState.appliedConnConfig); + } } return new ClusterConfigState( @@ -80,6 +88,7 @@ public synchronized ClusterConfigState snapshot() { taskConfigs, Collections.emptyMap(), Collections.emptyMap(), + appliedConnectorConfigs, Collections.emptySet(), Collections.emptySet(), configTransformer @@ -123,6 +132,7 @@ public synchronized void removeTaskConfigs(String connector) { HashSet taskIds = new HashSet<>(state.taskConfigs.keySet()); state.taskConfigs.clear(); + state.appliedConnConfig = null; if (updateListener != null) updateListener.onTaskConfigUpdate(taskIds); @@ -137,6 +147,8 @@ public synchronized void putTaskConfigs(String connector, List> taskConfigsMap = taskConfigListAsMap(connector, configs); state.taskConfigs = taskConfigsMap; + state.applyConfig(); + if (updateListener != null) updateListener.onTaskConfigUpdate(taskConfigsMap.keySet()); } @@ -187,6 +199,7 @@ private static class ConnectorState { private TargetState targetState; private Map connConfig; private Map> taskConfigs; + private AppliedConnectorConfig appliedConnConfig; /** * @param connConfig the connector's configuration @@ -197,6 +210,11 @@ public ConnectorState(Map connConfig, TargetState targetState) { this.targetState = targetState == null ? TargetState.STARTED : targetState; this.connConfig = connConfig; this.taskConfigs = new HashMap<>(); + this.appliedConnConfig = null; + } + + public void applyConfig() { + this.appliedConnConfig = new AppliedConnectorConfig(connConfig); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 24cbd1c280e1c..83fce9231f7e5 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -18,19 +18,27 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.config.provider.FileConfigProvider; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.LogCaptureAppender; +import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.json.JsonConverterConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.apache.kafka.connect.runtime.distributed.DistributedHerder; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.sink.SinkConnector; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.SinkUtils; import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster; import org.apache.kafka.connect.util.clusters.WorkerHandle; import org.apache.kafka.test.IntegrationTest; @@ -48,6 +56,7 @@ import java.io.File; import java.io.FileOutputStream; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -57,6 +66,8 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; @@ -1322,6 +1333,74 @@ public void testReconfigureConnectorWithFailingTaskConfigs() throws Exception { ); } + @Test + public void testRuntimePropertyReconfiguration() throws Exception { + final int offsetCommitIntervalMs = 1_000; + // force fast offset commits + workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Integer.toString(offsetCommitIntervalMs)); + connect = connectBuilder.build(); + // start the clusters + connect.start(); + + connect.assertions().assertAtLeastNumWorkersAreUp( + NUM_WORKERS, + "Initial group of workers did not start in time." + ); + + final String topic = "kafka9228"; + connect.kafka().createTopic(topic, 1); + connect.kafka().produce(topic, "non-json-value"); + + Map connectorConfig = new HashMap<>(); + connectorConfig.put(CONNECTOR_CLASS_CONFIG, EmptyTaskConfigsConnector.class.getName()); + connectorConfig.put(TASKS_MAX_CONFIG, "1"); + connectorConfig.put(TOPICS_CONFIG, topic); + // Initially configure the connector to use the JSON converter, which should cause task failure(s) + connectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, JsonConverter.class.getName()); + connectorConfig.put( + VALUE_CONVERTER_CLASS_CONFIG + "." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, + "false" + ); + + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorIsRunningAndTasksHaveFailed( + CONNECTOR_NAME, + 1, + "Connector did not start or task did not fail in time" + ); + assertEquals( + "Connector should not have any committed offsets when only task fails on first record", + new ConnectorOffsets(Collections.emptyList()), + connect.connectorOffsets(CONNECTOR_NAME) + ); + + // Reconfigure the connector to use the string converter, which should not cause any more task failures + connectorConfig.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName()); + connectorConfig.remove( + KEY_CONVERTER_CLASS_CONFIG + "." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG + ); + connect.configureConnector(CONNECTOR_NAME, connectorConfig); + connect.assertions().assertConnectorAndExactlyNumTasksAreRunning( + CONNECTOR_NAME, + 1, + "Connector or tasks did not start in time" + ); + + Map expectedOffsetKey = new HashMap<>(); + expectedOffsetKey.put(SinkUtils.KAFKA_TOPIC_KEY, topic); + expectedOffsetKey.put(SinkUtils.KAFKA_PARTITION_KEY, 0); + Map expectedOffsetValue = Collections.singletonMap(SinkUtils.KAFKA_OFFSET_KEY, 1); + ConnectorOffset expectedOffset = new ConnectorOffset(expectedOffsetKey, expectedOffsetValue); + ConnectorOffsets expectedOffsets = new ConnectorOffsets(Collections.singletonList(expectedOffset)); + + // Wait for it to commit offsets, signaling that it has successfully processed the record we produced earlier + waitForCondition( + () -> expectedOffsets.equals(connect.connectorOffsets(CONNECTOR_NAME)), + offsetCommitIntervalMs * 2, + "Task did not successfully process record and/or commit offsets in time" + ); + } + private Map defaultSourceConnectorProps(String topic) { // setup props for the source connector Map props = new HashMap<>(); @@ -1336,4 +1415,60 @@ private Map defaultSourceConnectorProps(String topic) { props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1)); return props; } + + public static class EmptyTaskConfigsConnector extends SinkConnector { + @Override + public String version() { + return "0.0"; + } + + @Override + public void start(Map props) { + // no-op + } + + @Override + public Class taskClass() { + return SimpleTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + return IntStream.range(0, maxTasks) + .mapToObj(i -> Collections.emptyMap()) + .collect(Collectors.toList()); + } + + @Override + public void stop() { + // no-op + } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } + } + + public static class SimpleTask extends SinkTask { + @Override + public String version() { + return "0.0"; + } + + @Override + public void start(Map props) { + // no-op + } + + @Override + public void put(Collection records) { + // no-op + } + + @Override + public void stop() { + // no-op + } + } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 5bfbe2498cad5..da8fed5b66d34 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -47,6 +47,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorType; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -149,6 +150,7 @@ public class AbstractHerderTest { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); private static final ClusterConfigState SNAPSHOT_NO_TASKS = new ClusterConfigState( @@ -160,6 +162,7 @@ public class AbstractHerderTest { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); @@ -1143,6 +1146,44 @@ public void testTaskConfigComparison() { verify(snapshot, never()).taskConfig(any()); } + @Test + public void testTaskConfigsChangedWhenAppliedConnectorConfigDiffers() { + assertFalse(AbstractHerder.taskConfigsChanged(SNAPSHOT, CONN1, TASK_CONFIGS)); + + ClusterConfigState snapshotWithNoAppliedConfig = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), + Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptySet(), + Collections.emptySet() + ); + assertTrue(AbstractHerder.taskConfigsChanged(snapshotWithNoAppliedConfig, CONN1, TASK_CONFIGS)); + + Map appliedConfig = new HashMap<>(CONN1_CONFIG); + String newTopicsProperty = appliedConfig.getOrDefault(SinkConnectorConfig.TOPICS_CONFIG, "foo") + ",newTopic"; + appliedConfig.put(SinkConnectorConfig.TOPICS_CONFIG, newTopicsProperty); + ClusterConfigState snapshotWithDifferentAppliedConfig = new ClusterConfigState( + 1, + null, + Collections.singletonMap(CONN1, 3), + Collections.singletonMap(CONN1, CONN1_CONFIG), + Collections.singletonMap(CONN1, TargetState.STARTED), + TASK_CONFIGS_MAP, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(appliedConfig)), + Collections.emptySet(), + Collections.emptySet() + ); + assertTrue(AbstractHerder.taskConfigsChanged(snapshotWithDifferentAppliedConfig, CONN1, TASK_CONFIGS)); + } + protected void addConfigKey(Map keys, String name, String group) { ConfigDef configDef = new ConfigDef().define(name, ConfigDef.Type.STRING, null, null, ConfigDef.Importance.HIGH, "doc", group, 10, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index b51b84d1ac623..4c5a04533e226 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -73,6 +73,7 @@ import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.CloseableOffsetStorageReader; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConnectorOffsetBackingStore; @@ -634,6 +635,7 @@ public void testAddRemoveSourceTask() { Collections.singletonMap(TASK_ID, origProps), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)), Collections.emptySet(), Collections.emptySet() ); @@ -689,6 +691,7 @@ public void testAddRemoveSinkTask() { Collections.singletonMap(TASK_ID, origProps), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)), Collections.emptySet(), Collections.emptySet() ); @@ -759,6 +762,7 @@ public void testAddRemoveExactlyOnceSourceTask() { Collections.singletonMap(TASK_ID, origProps), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_ID, new AppliedConnectorConfig(connectorConfigs)), Collections.emptySet(), Collections.emptySet() ); @@ -2728,6 +2732,7 @@ private void testStartTaskWithTooManyTaskConfigs(boolean enforced) { Collections.singletonMap(TASK_ID, origProps), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(connName, new AppliedConnectorConfig(connectorConfigs)), Collections.emptySet(), Collections.emptySet() ); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java index 084d865cc5eca..6101dc48c6b11 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment; import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState; @@ -63,15 +64,22 @@ public static List newTasks(int start, int end) { public static ClusterConfigState clusterConfigState(long offset, int connectorNum, int taskNum) { + Map> connectorConfigs = connectorConfigs(1, connectorNum); + Map appliedConnectorConfigs = connectorConfigs.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new AppliedConnectorConfig(e.getValue()) + )); return new ClusterConfigState( offset, null, connectorTaskCounts(1, connectorNum, taskNum), - connectorConfigs(1, connectorNum), + connectorConfigs, connectorTargetStates(1, connectorNum, TargetState.STARTED), taskConfigs(0, connectorNum, connectorNum * taskNum), Collections.emptyMap(), Collections.emptyMap(), + appliedConnectorConfigs, Collections.emptySet(), Collections.emptySet()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index f69f586bc90cf..f2b73e9699743 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -64,6 +64,7 @@ import org.apache.kafka.connect.source.ExactlyOnceSupport; import org.apache.kafka.connect.source.SourceConnector; import org.apache.kafka.connect.source.SourceTask; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.ConfigBackingStore; import org.apache.kafka.connect.storage.StatusBackingStore; @@ -220,6 +221,7 @@ public class DistributedHerderTest { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); private static final ClusterConfigState SNAPSHOT_PAUSED_CONN1 = new ClusterConfigState( @@ -231,6 +233,7 @@ public class DistributedHerderTest { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); private static final ClusterConfigState SNAPSHOT_STOPPED_CONN1 = new ClusterConfigState( @@ -242,6 +245,7 @@ public class DistributedHerderTest { Collections.emptyMap(), // Stopped connectors should have an empty set of task configs Collections.singletonMap(CONN1, 3), Collections.singletonMap(CONN1, 10), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.singleton(CONN1), Collections.emptySet()); @@ -254,6 +258,7 @@ public class DistributedHerderTest { Collections.emptyMap(), Collections.singletonMap(CONN1, 0), Collections.singletonMap(CONN1, 11), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState( @@ -265,6 +270,7 @@ public class DistributedHerderTest { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG_UPDATED)), Collections.emptySet(), Collections.emptySet()); @@ -632,6 +638,7 @@ public void revokeAndReassign(boolean incompleteRebalance) throws TimeoutExcepti TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet() ); @@ -1616,6 +1623,7 @@ public void testConnectorConfigUpdateFailedTransformation() throws Exception { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet(), configTransformer @@ -2220,6 +2228,7 @@ public void testAccessors() throws Exception { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet(), configTransformer); @@ -2351,6 +2360,7 @@ public void testPatchConnectorConfigNotFound() { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(clusterConfigState); @@ -2380,6 +2390,7 @@ public void testPatchConnectorConfigNotALeader() { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(originalSnapshot); @@ -2420,6 +2431,7 @@ public void testPatchConnectorConfig() throws Exception { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(originalSnapshot); @@ -2490,6 +2502,7 @@ public void testKeyRotationWhenWorkerBecomesLeader() throws Exception { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(snapshotWithKey); @@ -2536,6 +2549,7 @@ public void testKeyRotationDisabledWhenWorkerBecomesFollower() throws Exception TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); expectConfigRefreshAndSnapshot(snapshotWithKey); @@ -2737,6 +2751,7 @@ public void testFailedToReadBackNewlyWrittenSessionKey() throws Exception { TASK_CONFIGS_MAP, Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); @@ -3221,6 +3236,7 @@ public void testVerifyTaskGeneration() { TASK_CONFIGS_MAP, Collections.emptyMap(), taskConfigGenerations, + Collections.singletonMap(CONN1, new AppliedConnectorConfig(CONN1_CONFIG)), Collections.emptySet(), Collections.emptySet()); @@ -4138,6 +4154,15 @@ private ClusterConfigState exactlyOnceSnapshot( Map> connectorConfigs = connectors.stream() .collect(Collectors.toMap(Function.identity(), c -> CONN1_CONFIG)); + Map appliedConnectorConfigs = taskConfigs.keySet().stream() + .map(ConnectorTaskId::connector) + .distinct() + .filter(connectorConfigs::containsKey) + .collect(Collectors.toMap( + Function.identity(), + connector -> new AppliedConnectorConfig(connectorConfigs.get(connector)) + )); + return new ClusterConfigState( 1, sessionKey, @@ -4147,6 +4172,7 @@ private ClusterConfigState exactlyOnceSnapshot( taskConfigs, taskCountRecords, taskConfigGenerations, + appliedConnectorConfigs, pendingFencing, Collections.emptySet()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index 319bdc9f9f8ef..48b7973fdc22e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.TargetState; import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -1396,6 +1397,11 @@ private ClusterConfigState configState() { Function.identity(), connectorTaskId -> Collections.emptyMap() )); + Map appliedConnectorConfigs = connectorConfigs.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new AppliedConnectorConfig(e.getValue()) + )); return new ClusterConfigState( CONFIG_OFFSET, null, @@ -1405,6 +1411,7 @@ private ClusterConfigState configState() { taskConfigs, Collections.emptyMap(), Collections.emptyMap(), + appliedConnectorConfigs, Collections.emptySet(), Collections.emptySet()); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 861e98bfeb367..1911eb10b1303 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.runtime.TargetState; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.storage.KafkaConfigBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -61,6 +62,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER; @@ -171,6 +173,7 @@ public void setup() { Collections.singletonMap(taskId1x0, new HashMap<>()), Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet() ); @@ -197,6 +200,7 @@ public void setup() { configState2TaskConfigs, Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptySet(), Collections.emptySet() ); @@ -217,6 +221,11 @@ public void setup() { configStateSingleTaskConnectorsTaskConfigs.put(taskId1x0, new HashMap<>()); configStateSingleTaskConnectorsTaskConfigs.put(taskId2x0, new HashMap<>()); configStateSingleTaskConnectorsTaskConfigs.put(taskId3x0, new HashMap<>()); + Map appliedConnectorConfigs = configStateSingleTaskConnectorsConnectorConfigs.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> new AppliedConnectorConfig(e.getValue()) + )); configStateSingleTaskConnectors = new ClusterConfigState( 12L, null, @@ -226,6 +235,7 @@ public void setup() { configStateSingleTaskConnectorsTaskConfigs, Collections.emptyMap(), Collections.emptyMap(), + appliedConnectorConfigs, Collections.emptySet(), Collections.emptySet() ); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java index e8ab2add18152..92ab6bd149a7a 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java @@ -42,6 +42,7 @@ import org.apache.kafka.connect.runtime.distributed.SampleConnectorClientConfigOverridePolicy; import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.rest.entities.Message; +import org.apache.kafka.connect.storage.AppliedConnectorConfig; import org.apache.kafka.connect.storage.ClusterConfigState; import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; @@ -387,6 +388,7 @@ public void testRestartTask() throws Exception { Collections.singletonMap(taskId, taskConfig(SourceSink.SOURCE)), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet<>(), new HashSet<>(), transformer); @@ -420,6 +422,7 @@ public void testRestartTaskFailureOnStart() throws Exception { Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE)), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet<>(), new HashSet<>(), transformer); @@ -555,6 +558,7 @@ public void testRestartConnectorAndTasksOnlyTasks() throws Exception { Collections.singletonMap(taskId, taskConfig(SourceSink.SINK)), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet<>(), new HashSet<>(), transformer); @@ -609,6 +613,7 @@ public void testRestartConnectorAndTasksBoth() throws Exception { Collections.singletonMap(taskId, taskConfig(SourceSink.SINK)), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet<>(), new HashSet<>(), transformer); @@ -729,10 +734,14 @@ public void testPutConnectorConfig() throws Exception { return true; }).when(worker).startConnector(eq(CONNECTOR_NAME), capturedConfig.capture(), any(), eq(herder), eq(TargetState.STARTED), onStart.capture()); - // Generate same task config, which should result in no additional action to restart tasks + ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0); + // Generate same task config, but from different connector config, resulting + // in task restarts when(worker.connectorTaskConfigs(CONNECTOR_NAME, new SourceConnectorConfig(plugins, newConnConfig, true))) .thenReturn(singletonList(taskConfig(SourceSink.SOURCE))); - + doNothing().when(worker).stopAndAwaitTasks(Collections.singletonList(taskId)); + doNothing().when(statusBackingStore).put(new TaskStatus(taskId, TaskStatus.State.DESTROYED, WORKER_ID, 0)); + when(worker.startSourceTask(eq(taskId), any(), eq(newConnConfig), eq(taskConfig(SourceSink.SOURCE)), eq(herder), eq(TargetState.STARTED))).thenReturn(true); herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback); Herder.Created connectorInfo = createCallback.get(WAIT_TIME_MS, TimeUnit.MILLISECONDS); @@ -928,6 +937,8 @@ public void testModifyConnectorOffsetsUnknownConnector() { @Test public void testModifyConnectorOffsetsConnectorNotInStoppedState() { + Map connectorConfig = connectorConfig(SourceSink.SOURCE); + herder.configState = new ClusterConfigState( 10, null, @@ -937,6 +948,7 @@ public void testModifyConnectorOffsetsConnectorNotInStoppedState() { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet() ); @@ -963,6 +975,8 @@ public void testAlterConnectorOffsets() throws Exception { return null; }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), any(Map.class), workerCallbackCapture.capture()); + Map connectorConfig = connectorConfig(SourceSink.SOURCE); + herder.configState = new ClusterConfigState( 10, null, @@ -972,6 +986,7 @@ public void testAlterConnectorOffsets() throws Exception { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet() ); @@ -992,6 +1007,8 @@ public void testResetConnectorOffsets() throws Exception { return null; }).when(worker).modifyConnectorOffsets(eq(CONNECTOR_NAME), eq(connectorConfig(SourceSink.SOURCE)), isNull(), workerCallbackCapture.capture()); + Map connectorConfig = connectorConfig(SourceSink.SOURCE); + herder.configState = new ClusterConfigState( 10, null, @@ -1001,6 +1018,7 @@ public void testResetConnectorOffsets() throws Exception { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), Collections.emptySet(), Collections.emptySet() ); @@ -1071,6 +1089,7 @@ private void expectAdd(SourceSink sourceSink) { // And we should instantiate the tasks. For a sink task, we should see added properties for the input topic partitions + Map connectorConfig = connectorConfig(sourceSink); Map generatedTaskProps = taskConfig(sourceSink); when(worker.connectorTaskConfigs(CONNECTOR_NAME, connConfig)) @@ -1080,11 +1099,12 @@ private void expectAdd(SourceSink sourceSink) { -1, null, Collections.singletonMap(CONNECTOR_NAME, 1), - Collections.singletonMap(CONNECTOR_NAME, connectorConfig(sourceSink)), + Collections.singletonMap(CONNECTOR_NAME, connectorConfig), Collections.singletonMap(CONNECTOR_NAME, TargetState.STARTED), Collections.singletonMap(new ConnectorTaskId(CONNECTOR_NAME, 0), generatedTaskProps), Collections.emptyMap(), Collections.emptyMap(), + Collections.singletonMap(CONNECTOR_NAME, new AppliedConnectorConfig(connectorConfig)), new HashSet<>(), new HashSet<>(), transformer); diff --git a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector index 4c26fece18405..56e054ddbeb9a 100644 --- a/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector +++ b/connect/runtime/src/test/resources/META-INF/services/org.apache.kafka.connect.sink.SinkConnector @@ -17,4 +17,5 @@ org.apache.kafka.connect.integration.BlockingConnectorTest$BlockingSinkConnector org.apache.kafka.connect.integration.BlockingConnectorTest$TaskInitializeBlockingSinkConnector org.apache.kafka.connect.integration.ErrantRecordSinkConnector org.apache.kafka.connect.integration.MonitorableSinkConnector -org.apache.kafka.connect.runtime.SampleSinkConnector \ No newline at end of file +org.apache.kafka.connect.runtime.SampleSinkConnector +org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest$EmptyTaskConfigsConnector \ No newline at end of file