Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize supervisor stop logic to make it run faster #17535

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

georgew5656
Copy link
Contributor

Sometimes the LifecycleStop method of SupervisorManager (SupervisorManager.stop()) can take a long time to run. This is because the method iterates through all running supervisors and calls stop on them serially. Each streaming supervisor.stop() call tries to push a ShutdownNotice to its notice queue and then wait for the ShutdownNotice to run and set stopped = true up to tuningConfig.shutdownTimeout. This means the total run time can be the sum of tuningConfig.shutdownTimeout (default 80 seconds) across all supervisors.

This long stop time can cause lots of issues, most notably overlord leadership issues if the ZK leader is terminated (but the ensemble maintains quorum). This is because a overlord pod can get becomeLeader queued up behind stopLeader if it disconnects and then reconnects to ZK (the giant lock shared between the two methods).

This PR attempts to ensure SupervisorManager completes faster to prevent this issue. (although I feel some of the leadership process on the overlord specifically maybe needs to be revisited in general).

Still working on some unit tests for this change

Description

  • In SupervisorManager use a static pool of shutdownThreads to stop supervisors in parallel in the stop method to prevent a single or few slow supervisors from slowing down overall shutdown.
  • In SeekableStreamSupervisor, when stopGracefully is false (as it is when we are shutting down SupervisorManager), don't wait for the ShutdownNotice to run. This means that the recordSupplier (e.g. kafka consumer) may not be cleaned up immediately, but since all the supervisor objects are dereferenced and can be GC'd later i don't think this is a huge deal.

Fixed the bug ...

Renamed the class ...

Added a forbidden-apis entry ...

  • I used a static thread pool in SupervisorManager for now, it's possible it should be configurable but IMO the main point of the thread pool is to not let a few slow supervisor shutdowns run in series block the entire supervisor manager from shutting down.
  • I'm not sure if changing the SeekableStreamSupervisor.stop method when stopGracefully = false is necessary, but it didn't make sense to me to specify a non-graceful shutdown and then try to wait for things to clean up.

Release note

Improve recovery time for overlord leadership after zk nodes are bounced.

Key changed/added classes in this PR
  • SupervisorManager
  • SeekableStreamSupervisor

This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@georgew5656 georgew5656 changed the title Fix supervisor stop logic Parallelize supervisor stop logic to make it run faster Dec 10, 2024
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left some suggestions

}
log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
try {
FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS);
Copy link
Contributor

@kfaraz kfaraz Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should use a timeout of 80s here since each supervisor could have a different value of shutdown timeout. We could either just do get() with no args (which would be no worse than what the code is currently doing) or use a longer timeout.


private volatile boolean started = false;

@Inject
public SupervisorManager(MetadataSupervisorManager metadataSupervisorManager)
{
this.metadataSupervisorManager = metadataSupervisorManager;
this.shutdownExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(25, "supervisor-manager-shutdown-%d")
Copy link
Contributor

@kfaraz kfaraz Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

25 may be excessive in some cases and inadequate in others. Maybe initialize the executor lazily inside the stop() method, then the number of required threads can be computed at run time. The shutdownExec need not be a class-level field either.


Alternatively, instead of using a completely new executor, you could consider using the scheduledExec inside each supervisor. That executor basically just sits idle most of the time and is responsible only for submitting RunNotice to the notice queue.

You could add a stopAsync method to SeekableStreamSupervisor that does the following:

  • returns a future that we coalesce and wait upon
  • internally submits a runnable to the scheduledExec to perform the actual stop

I guess the only thing we will miss out on is parallelizing the autoscaler.stop() which should not be a concern, I guess?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the issue i was running into with this strategy is that part of the stop logic is shutting down the scheduledExec executor, and I couldn't really think of a great way to avoid this chicken-and-egg problem.

Copy link
Contributor

@kfaraz kfaraz Dec 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could perhaps work around that problem by doing something like this:

  • stopAsync sets the supervisor state to STOPPING
  • stopAsync then submits a stop() runnable to the scheduledExec
  • buildRunTask method should check and submit the RunNotice only if the state of the supervisor is not STOPPING
  • stop() can call scheduledExec.shutdown() instead of scheduledExec.shutdownNow()

Another alternative is to simply create a shutdownExec inside stopAsync.
Difference from the current approach would be that the SupervisorManager doesn't need to handle the lifecycle of the shutdown executor.

Let me know what you think.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this makes sense, will update

FutureUtils.coalesce(stopFutures).get(80, TimeUnit.SECONDS);
}
catch (Exception e) {
log.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log the exception too just in case it is something other than a timeout.

Comment on lines 1079 to 1082
if (!stopGracefully) {
stopped = true;
break;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have already parallelized the stop of supervisors, is this still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could probably pull it out

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants