Skip to content

Commit

Permalink
KAFKA-9228: Restart tasks on runtime-only connector config changes
Browse files Browse the repository at this point in the history
  • Loading branch information
C0urante committed Jun 4, 2024
1 parent 55d38ef commit a0e5870
Show file tree
Hide file tree
Showing 13 changed files with 313 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,8 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String
if (rawTaskProps.size() != currentNumTasks) {
log.debug("Connector {} task count changed from {} to {}", connName, currentNumTasks, rawTaskProps.size());
result = true;
} else {
}
if (!result) {
for (int index = 0; index < currentNumTasks; index++) {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
if (!rawTaskProps.get(index).equals(configState.rawTaskConfig(taskId))) {
Expand All @@ -1054,6 +1055,14 @@ public static boolean taskConfigsChanged(ClusterConfigState configState, String
}
}
}
if (!result) {
Map<String, String> currentConnectorConfig = configState.rawConnectorConfig(connName);
Map<String, String> appliedConnectorConfig = configState.appliedConnectorConfig(connName);
if (!Objects.equals(currentConnectorConfig, appliedConnectorConfig)) {
log.debug("Forcing task restart for connector {} as its configuration appears to be updated", connName);
result = true;
}
}
if (result) {
log.debug("Reconfiguring connector {}: writing new updated configurations for tasks", connName);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ClusterConfigState {
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptyMap(),
Collections.emptySet(),
Collections.emptySet());

Expand All @@ -55,6 +56,7 @@ public class ClusterConfigState {
final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
final Map<String, Integer> connectorTaskCountRecords;
final Map<String, Integer> connectorTaskConfigGenerations;
final Map<String, Map<String, String>> appliedConnectorConfigs;
final Set<String> connectorsPendingFencing;
final Set<String> inconsistentConnectors;

Expand All @@ -66,6 +68,7 @@ public ClusterConfigState(long offset,
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, Map<String, String>> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors) {
this(offset,
Expand All @@ -76,6 +79,7 @@ public ClusterConfigState(long offset,
taskConfigs,
connectorTaskCountRecords,
connectorTaskConfigGenerations,
appliedConnectorConfigs,
connectorsPendingFencing,
inconsistentConnectors,
null);
Expand All @@ -89,6 +93,7 @@ public ClusterConfigState(long offset,
Map<ConnectorTaskId, Map<String, String>> taskConfigs,
Map<String, Integer> connectorTaskCountRecords,
Map<String, Integer> connectorTaskConfigGenerations,
Map<String, Map<String, String>> appliedConnectorConfigs,
Set<String> connectorsPendingFencing,
Set<String> inconsistentConnectors,
WorkerConfigTransformer configTransformer) {
Expand All @@ -100,6 +105,7 @@ public ClusterConfigState(long offset,
this.taskConfigs = taskConfigs;
this.connectorTaskCountRecords = connectorTaskCountRecords;
this.connectorTaskConfigGenerations = connectorTaskConfigGenerations;
this.appliedConnectorConfigs = appliedConnectorConfigs;
this.connectorsPendingFencing = connectorsPendingFencing;
this.inconsistentConnectors = inconsistentConnectors;
this.configTransformer = configTransformer;
Expand Down Expand Up @@ -158,6 +164,17 @@ public Map<String, String> rawConnectorConfig(String connector) {
return connectorConfigs.get(connector);
}

/**
* Get the most recent configuration for the connector from which task configs have
* been generated.
* @param connector name of the connector
* @return the connector config, or null if no config exists from which task configs have
* been generated
*/
public Map<String, String> appliedConnectorConfig(String connector) {
return appliedConnectorConfigs.get(connector);
}

/**
* Get the target state of the connector
* @param connector name of the connector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ public static String LOGGER_CLUSTER_KEY(String namespace) {

final Map<String, Integer> connectorTaskCountRecords = new HashMap<>();
final Map<String, Integer> connectorTaskConfigGenerations = new HashMap<>();
final Map<String, Map<String, String>> appliedConnectorConfigs = new HashMap<>();
final Set<String> connectorsPendingFencing = new HashSet<>();

private final WorkerConfigTransformer configTransformer;
Expand Down Expand Up @@ -478,6 +479,7 @@ public ClusterConfigState snapshot() {
new HashMap<>(taskConfigs),
new HashMap<>(connectorTaskCountRecords),
new HashMap<>(connectorTaskConfigGenerations),
new HashMap<>(appliedConnectorConfigs),
new HashSet<>(connectorsPendingFencing),
new HashSet<>(inconsistent),
configTransformer
Expand Down Expand Up @@ -1123,6 +1125,24 @@ private void processTasksCommitRecord(String connectorName, SchemaAndValue value
connectorTaskConfigGenerations.compute(connectorName, (ignored, generation) -> generation != null ? generation + 1 : 0);
}
inconsistent.remove(connectorName);

Map<String, String> rawConnectorConfig = connectorConfigs.get(connectorName);
Map<String, String> appliedConnectorConfig;
if (configTransformer != null) {
try {
appliedConnectorConfig = configTransformer.transform(rawConnectorConfig);
} catch (Throwable t) {
log.warn("Will not track applied config for connector {} due to error in transformation", connectorName, t);
appliedConnectorConfig = null;
}
} else {
appliedConnectorConfig = rawConnectorConfig;
}
if (appliedConnectorConfig != null) {
appliedConnectorConfigs.put(connectorName, appliedConnectorConfig);
} else {
appliedConnectorConfigs.remove(connectorName);
}
}
// Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
// update, then we need to see a completely fresh set of configs after this commit message, so we don't
Expand Down Expand Up @@ -1261,6 +1281,7 @@ private void processConnectorRemoval(String connectorName) {
connectorTaskCounts.remove(connectorName);
taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
deferredTaskUpdates.remove(connectorName);
appliedConnectorConfigs.remove(connectorName);
}

private ConnectorTaskId parseTaskId(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashMap;
Expand All @@ -36,6 +38,8 @@
*/
public class MemoryConfigBackingStore implements ConfigBackingStore {

private static final Logger log = LoggerFactory.getLogger(MemoryConfigBackingStore.class);

private final Map<String, ConnectorState> connectors = new HashMap<>();
private UpdateListener updateListener;
private WorkerConfigTransformer configTransformer;
Expand All @@ -61,6 +65,7 @@ public synchronized ClusterConfigState snapshot() {
Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
Map<String, TargetState> connectorTargetStates = new HashMap<>();
Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
Map<String, Map<String, String>> appliedConnectorConfigs = new HashMap<>();

for (Map.Entry<String, ConnectorState> connectorStateEntry : connectors.entrySet()) {
String connector = connectorStateEntry.getKey();
Expand All @@ -69,6 +74,9 @@ public synchronized ClusterConfigState snapshot() {
connectorConfigs.put(connector, connectorState.connConfig);
connectorTargetStates.put(connector, connectorState.targetState);
taskConfigs.putAll(connectorState.taskConfigs);
if (connectorState.appliedConnConfig != null) {
appliedConnectorConfigs.put(connector, connectorState.appliedConnConfig);
}
}

return new ClusterConfigState(
Expand All @@ -80,6 +88,7 @@ public synchronized ClusterConfigState snapshot() {
taskConfigs,
Collections.emptyMap(),
Collections.emptyMap(),
appliedConnectorConfigs,
Collections.emptySet(),
Collections.emptySet(),
configTransformer
Expand Down Expand Up @@ -123,6 +132,7 @@ public synchronized void removeTaskConfigs(String connector) {

HashSet<ConnectorTaskId> taskIds = new HashSet<>(state.taskConfigs.keySet());
state.taskConfigs.clear();
state.appliedConnConfig = null;

if (updateListener != null)
updateListener.onTaskConfigUpdate(taskIds);
Expand All @@ -137,6 +147,24 @@ public synchronized void putTaskConfigs(String connector, List<Map<String, Strin
Map<ConnectorTaskId, Map<String, String>> taskConfigsMap = taskConfigListAsMap(connector, configs);
state.taskConfigs = taskConfigsMap;

Map<String, String> rawConnectorConfig = state.connConfig;
Map<String, String> appliedConnectorConfig;
if (configTransformer != null) {
try {
appliedConnectorConfig = configTransformer.transform(rawConnectorConfig);
} catch (Throwable t) {
log.warn("Will not track applied config for connector {} due to error in transformation", connector, t);
appliedConnectorConfig = null;
}
} else {
appliedConnectorConfig = rawConnectorConfig;
}
if (appliedConnectorConfig != null) {
state.appliedConnConfig = appliedConnectorConfig;
} else {
state.appliedConnConfig = null;
}

if (updateListener != null)
updateListener.onTaskConfigUpdate(taskConfigsMap.keySet());
}
Expand Down Expand Up @@ -187,6 +215,7 @@ private static class ConnectorState {
private TargetState targetState;
private Map<String, String> connConfig;
private Map<ConnectorTaskId, Map<String, String>> taskConfigs;
private Map<String, String> appliedConnConfig;

/**
* @param connConfig the connector's configuration
Expand All @@ -197,6 +226,7 @@ public ConnectorState(Map<String, String> connConfig, TargetState targetState) {
this.targetState = targetState == null ? TargetState.STARTED : targetState;
this.connConfig = connConfig;
this.taskConfigs = new HashMap<>();
this.appliedConnConfig = null;
}
}

Expand Down
Loading

0 comments on commit a0e5870

Please sign in to comment.