Skip to content

Commit

Permalink
KAFKA-9228: Restart tasks on runtime-only connector config changes (a…
Browse files Browse the repository at this point in the history
…pache#16053)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
C0urante committed Jun 10, 2024
1 parent 113baae commit facbab2
Show file tree
Hide file tree
Showing 14 changed files with 383 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,8 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String
if (rawTaskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size());
result = true;
} else {
}
if (!result) {
for (int index = 0; index < currentNumTasks; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) {
Expand All @@ -1054,6 +1055,14 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String
}
}
}
if (!result) {
Map<String, String> appliedConnectorConfig = configState.appliedConnectorConfig(connName);
Map<String, String> currentConnectorConfig = configState.connectorConfig(connName);
if (!Objects.equals(appliedConnectorConfig, currentConnectorConfig)) {
log.debug("Forcing task restart for connector {} as its configuration appears to be updated", connName);
result = true;
}
}
if (result) {
log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", connName);
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.storage;

import org.apache.kafka.connect.runtime.WorkerConfigTransformer;

import java.util.Map;

/**
* Wrapper class for a connector configuration that has been used to generate task configurations
* Supports lazy {@link WorkerConfigTransformer#transform(Map) transformation}.
*/
public class AppliedConnectorConfig {

private final Map<String, String> rawConfig;
private Map<String, String> transformedConfig;

/**
* Create a new applied config that has not yet undergone
* {@link WorkerConfigTransformer#transform(Map) transformation}.
* @param rawConfig the non-transformed connector configuration; may be null
*/
public AppliedConnectorConfig(Map<String, String> rawConfig) {
this.rawConfig = rawConfig;
}

/**
* If necessary, {@link WorkerConfigTransformer#transform(Map) transform} the raw
* connector config, then return the result. Transformed configurations are cached and
* returned in all subsequent calls.
* <p>
* This method is thread-safe: different threads may invoke it at any time and the same
* transformed config should always be returned, with transformation still only ever
* taking place once before its results are cached.
* @param configTransformer the transformer to use, if no transformed connector
* config has been cached yet; may be null
* @return the possibly-cached, transformed, connector config; may be null
*/
public synchronized Map<String, String> transformedConfig(WorkerConfigTransformer configTransformer) {
if (transformedConfig != null || rawConfig == null)
return transformedConfig;

if (configTransformer != null) {
transformedConfig = configTransformer.transform(rawConfig);
} else {
transformedConfig = rawConfig;
}

return transformedConfig;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ClusterConfigState {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());

Expand All @@ -55,6 +56,7 @@ public class ClusterConfigState {
final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
final Map<String, Integer> connectorTaskCountRecords;
final Map<String, Integer> connectorTaskConfigGenerations;
final Map<String, AppliedConnectorConfig> appliedConnectorConfigs;
final Set<String> connectorsPendingFencing;
final Set<String> inconsistentConnectors;

Expand All @@ -66,6 +68,7 @@ public ClusterConfigState(long offset,
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, AppliedConnectorConfig> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors) {
this(offset,
Expand All @@ -76,6 +79,7 @@ public ClusterConfigState(long offset,
taskConfigs,
connectorTaskCountRecords,
connectorTaskConfigGenerations,
appliedConnectorConfigs,
connectorsPendingFencing,
inconsistentConnectors,
null);
Expand All @@ -89,6 +93,7 @@ public ClusterConfigState(long offset,
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, AppliedConnectorConfig> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors,
WorkerConfigTransformer configTransformer) {
Expand All @@ -100,6 +105,7 @@ public ClusterConfigState(long offset,
this.taskConfigs = taskConfigs;
this.connectorTaskCountRecords = connectorTaskCountRecords;
this.connectorTaskConfigGenerations = connectorTaskConfigGenerations;
this.appliedConnectorConfigs = appliedConnectorConfigs;
this.connectorsPendingFencing = connectorsPendingFencing;
this.inconsistentConnectors = inconsistentConnectors;
this.configTransformer = configTransformer;
Expand Down Expand Up @@ -158,6 +164,19 @@ public Map<String, String> rawConnectorConfig(String connector) {
return connectorConfigs.get(connector);
}

/**
* Get the most recent configuration for the connector from which task configs have
* been generated. The configuration will have been transformed by
* {@link org.apache.kafka.common.config.ConfigTransformer}
* @param connector name of the connector
* @return the connector config, or null if no config exists from which task configs have
* been generated
*/
public Map<String, String> appliedConnectorConfig(String connector) {
AppliedConnectorConfig appliedConfig = appliedConnectorConfigs.get(connector);
return appliedConfig != null ? appliedConfig.transformedConfig(configTransformer) : null;
}

/**
* Get the target state of the connector
* @param connector name of the connector
Expand Down Expand Up @@ -303,4 +322,5 @@ public int hashCode() {
inconsistentConnectors,
configTransformer);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public static String LOGGER_CLUSTER_KEY(String namespace) {

final Map<String, Integer> connectorTaskCountRecords = new HashMap<>();
final Map<String, Integer> connectorTaskConfigGenerations = new HashMap<>();
final Map<String, AppliedConnectorConfig> appliedConnectorConfigs = new HashMap<>();
final Set<String> connectorsPendingFencing = new HashSet<>();

private final WorkerConfigTransformer configTransformer;
Expand Down Expand Up @@ -478,6 +479,7 @@ public ClusterConfigState snapshot() {
new HashMap<>(taskConfigs),
new HashMap<>(connectorTaskCountRecords),
new HashMap<>(connectorTaskConfigGenerations),
new HashMap<>(appliedConnectorConfigs),
new HashSet<>(connectorsPendingFencing),
new HashSet<>(inconsistent),
configTransformer
Expand Down Expand Up @@ -1065,7 +1067,8 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value
// but compaction took place and both the original connector config and the
// tombstone message for it have been removed from the config topic
// We should ignore these task configs
if (!connectorConfigs.containsKey(connectorName)) {
Map<String, String> appliedConnectorConfig = connectorConfigs.get(connectorName);
if (appliedConnectorConfig == null) {
processConnectorRemoval(connectorName);
log.debug(
"Ignoring task configs for connector {}; it appears that the connector was deleted previously "
Expand Down Expand Up @@ -1123,6 +1126,11 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value
connectorTaskConfigGenerations.compute(connectorName, (ignored, generation) -> generation != null ? generation + 1 : 0);
}
inconsistent.remove(connectorName);

appliedConnectorConfigs.put(
connectorName,
new AppliedConnectorConfig(appliedConnectorConfig)
);
}
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
Expand Down Expand Up @@ -1261,6 +1269,7 @@ private void processConnectorRemoval(String connectorName) {
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
appliedConnectorConfigs.remove(connectorName);
}

private ConnectorTaskId parseTaskId(String key) {
Expand Down Expand Up @@ -1333,5 +1342,6 @@ else if (value instanceof Long)
else
throw new ConnectException("Expected integer value to be either Integer or Long");
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,6 +38,8 @@
*/
public class MemoryConfigBackingStore implements ConfigBackingStore {

private static final Logger log = LoggerFactory.getLogger(MemoryConfigBackingStore.class);

private final Map<String, ConnectorState> connectors = new HashMap<>();
private UpdateListener updateListener;
private WorkerConfigTransformer configTransformer;
Expand All @@ -61,6 +65,7 @@ public synchronized ClusterConfigState snapshot() {
Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
Map<String, TargetState> connectorTargetStates = new HashMap<>();
Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
Map<String, AppliedConnectorConfig> appliedConnectorConfigs = new HashMap<>();

for (Map.Entry<String, ConnectorState> connectorStateEntry : connectors.entrySet()) {
String connector = connectorStateEntry.getKey();
Expand All @@ -69,6 +74,9 @@ public synchronized ClusterConfigState snapshot() {
connectorConfigs.put(connector, connectorState.connConfig);
connectorTargetStates.put(connector, connectorState.targetState);
taskConfigs.putAll(connectorState.taskConfigs);
if (connectorState.appliedConnConfig != null) {
appliedConnectorConfigs.put(connector, connectorState.appliedConnConfig);
}
}

return new ClusterConfigState(
Expand All @@ -80,6 +88,7 @@ public synchronized ClusterConfigState snapshot() {
taskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet(),
configTransformer
Expand Down Expand Up @@ -123,6 +132,7 @@ public synchronized void removeTaskConfigs(String connector) {

HashSet<ConnectorTaskId> taskIds = new HashSet<>(state.taskConfigs.keySet());
state.taskConfigs.clear();
state.appliedConnConfig = null;

if (updateListener != null)
updateListener.onTaskConfigUpdate(taskIds);
Expand All @@ -137,6 +147,8 @@ public synchronized void putTaskConfigs(String connector, List<Map<String, Strin
Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs);
state.taskConfigs = taskConfigsMap;

state.applyConfig();

if (updateListener != null)
updateListener.onTaskConfigUpdate(taskConfigsMap.keySet());
}
Expand Down Expand Up @@ -187,6 +199,7 @@ private static class ConnectorState {
private TargetState targetState;
private Map<String, String> connConfig;
private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
private AppliedConnectorConfig appliedConnConfig;

/**
* @param connConfig the connector's configuration
Expand All @@ -197,6 +210,11 @@ public ConnectorState(Map<String, String> connConfig, TargetState targetState) {
this.targetState = targetState == null ? TargetState.STARTED : targetState;
this.connConfig = connConfig;
this.taskConfigs = new HashMap<>();
this.appliedConnConfig = null;
}

public void applyConfig() {
this.appliedConnConfig = new AppliedConnectorConfig(connConfig);
}
}

Expand Down
Loading

0 comments on commit facbab2

Please sign in to comment.