Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-9228: Restart tasks on runtime-only connector config changes #16053

Merged
merged 5 commits into from
Jun 10, 2024

Conversation

C0urante
Copy link
Contributor

@C0urante C0urante commented May 23, 2024

Jira

This uses a much simpler approach than the one pursued in #16001. Once task configs are detected for a connector, the most-recent config for that connector is tracked by the herder as the "applied" config. When new task configs are generated by the connector, the latest config for the connector is compared to the "applied" connector config. If a difference is detected, or there is no "applied" config, then tasks are automatically published to the config topic.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Contributor Author

C0urante commented Jun 3, 2024

@gharris1727 I mentioned this as an alternative to #16001, can you take a look when you have a moment and see if the general strategy looks alright?

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me explain the behavior to myself to make sure I understand:

1: When a worker finishes starting up, joins the group, and is assigned connector C, it will have an empty appliedConnectorConfigs.
2: When a connector starts up it will try to generate a reconfiguration. If it generates task configs and appliedConnectorConfigs is empty, it will publish them
3=1+2: When a connector is assigned to a worker immediately following a restart, the task configs are regenerated. This clears any bad state following an upgrade, but also forces at least one reconfiguration on cluster rolls. This is acceptable IMHO. Cooperative rebalancing should keep the connector from being relocated unnecessarily.

4: Once the task configs are written to the config topic and read back by the worker, appliedConnectorConfigs is set
5. If connector C is assigned to an existing worker in the cluster that has seen a task config generation, it won't generate additional configs
6=4+5: Rebalancing a connector among existing workers in a cluster won't force new task configs to be generated.

7: Tasks are compared on post-configprovider-equality, but the configprovider is evaluated at two different times on the same machine
7->8: If the value of a config provider changes over time, or across restarts, the next time the connector starts it will be forced to regenerate task configs.

9: For environments that constantly rewrite the same connector config, the appliedConnectorConfigs is not updated
10: Connector config updates are followed by a restart
11=9+10: Rewriting the same connector config repeatedly doesn't force task config regeneration

I think this has all the properties we want, and is indeed much more simple than the other approach. LGTM!

@@ -1137,4 +1216,60 @@ private Map<String, String> defaultSourceConnectorProps(String topic) {
props.put(DEFAULT_TOPIC_CREATION_PREFIX + PARTITIONS_CONFIG, String.valueOf(1));
return props;
}

public static class SimpleConnector extends SinkConnector {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: EmptyTaskConfigsConnector?

@C0urante
Copy link
Contributor Author

C0urante commented Jun 3, 2024

Thanks @gharris1727! I've updated the patch to apply to both distributed and standalone mode, and fixed failing unit tests.

I've also tweaked the logic so that applied connector configs are tracked regardless of whether the worker has completed startup or not, which should eliminate unnecessary cluster churn on upgrades. This still "primes the pump" for future connector config updates though, which I believe should make us both very happy!

Going through the scenarios you listed in your analysis (which was correct based on the implementation you reviewed):

  • The first one (steps 1-3) should never occur now. If a connector has generated task configs after its latest configuration was submitted by the user, the worker should have an applied config for it; if the connector has not generated task configs, then we should force a reconfiguration attempt, just like we would today.
  • The second one (steps 4-6) should remain correct ("Rebalancing a connector among existing workers in a cluster won't force new task configs to be generated"), because workers must be caught up on the config topic before joining a group, and before resuming work after rejoining a group
  • The third one (steps 7-8) should remain as correct as it was in the original implementation. We eagerly transform applied connector configs as soon as the corresponding set of task configs is generated, which does mean that there may be some lag time between when the connector config was submitted and when we performed the transform. Ultimately though, there is an eventual consistency-esque guarantee that if a config provider changes periodically, new task configs will at some point be forcibly written to the config topic.
  • The fourth one (steps 9-11) should remain correct as well.

@C0urante C0urante marked this pull request as ready for review June 3, 2024 20:42
@C0urante C0urante force-pushed the kafka-9228-in-memory-config-tracking branch from b21ddee to a0e5870 Compare June 4, 2024 17:35
Map<String, String> appliedConnectorConfig;
if (configTransformer != null) {
try {
appliedConnectorConfig = configTransformer.transform(rawConnectorConfig);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's say that there's 1000 connector configs in the config topic. Today I think the configTransformer only transforms that config when it's in-use (like a connector being started, fenced, updated, etc). That should only be a handful of times after the startup completes.

With the code as-written, I think this would make 1000 configTransformer requests, including ones on out-of-date configurations. If the ConfigProvider is making a network request, this has the potential to be slow/expensive, and generate a spike of load that wasn't there before.

Do you think there's a simple way to avoid this burst of ConfigProvider calls during startup? The built-in AK providers can handle the burst safely, but I wonder if there are third-party providers that could have bad side-effects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! I've tweaked the PR to lazily apply config providers only when we actually make use of applied connector configs (i.e., when checking to see if we should publish a new set of task configs).

This does come with a small downside in that config transformation takes place a little bit later than it could, so it may take longer for a new set of task configs to be published if a connector config changes as a result of new values being applied by its config provider. But I'm hopeful that this can be left as a follow-up task if the basic approach in my latest commit looks good. Eager to hear your thoughts!

Copy link
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-affirming my earlier approval.

I really like the AppliedConnectorConfig object as a caching element, big 👍

I had one optional nit, feel free to merge without addressing it.

@C0urante
Copy link
Contributor Author

Last changes are trivial and build and unit tests passed locally, as well as the newly-introduced integration test. Merging...

@C0urante C0urante merged commit eec8fd6 into apache:trunk Jun 10, 2024
1 check was pending
@C0urante C0urante deleted the kafka-9228-in-memory-config-tracking branch June 10, 2024 21:02
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants