Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiplex worker implementation limits parallelism #494

Closed
jdai8 opened this issue Feb 23, 2021 · 20 comments
Closed

Multiplex worker implementation limits parallelism #494

jdai8 opened this issue Feb 23, 2021 · 20 comments

Comments

@jdai8
Copy link
Contributor

jdai8 commented Feb 23, 2021

The current multiplex worker implementation (at HEAD) sequences reads and writes:

blockable {
generateSequence { WorkRequest.parseDelimitedFrom(io.input) }
}.asFlow()
.map { request ->
info { "received req: ${request.requestId}" }
doTask("request ${request.requestId}") { ctx ->
request.argumentsList.run {
execute(ctx, toList())
}
}.let { result ->
this@run.info { "task result ${result.status}" }
WorkerProtocol.WorkResponse.newBuilder().apply {
output =
listOf(
result.log.out.toString(),
io.captured.toByteArray().toString(UTF_8)
).filter { it.isNotBlank() }.joinToString("\n")
exitCode = result.status.exit
requestId = request.requestId
}.build()
}
}
.collect { response ->
blockable {
info {
response.toString()
}
response.writeDelimitedTo(io.output)
io.output.flush()
}
}

I think this artificially limits performance in certain cases. For example, if we receive a large work request first, all subsequent work requests will have to wait for it to finish before we write their work responses to stdout.

I tested this on a simple reproducer project here: https://github.com/jdai8/rules_kotlin_coroutine_repro. This project has one large source file Big.kt and several small SmallX.kt files. As you can see from this screenshot, //src:small and //src:small4 have to wait for //src:big to finish, even though the other small compilation actions have already finished (in <5s).

image

Let me know if I'm understanding this correctly. If so, I'm happy to put up a PR.

@jongerrish
Copy link
Contributor

Fascinating, great detective work! Want to put up a PR and I can test against our repo (multiplex workers performance regressed so I'm super keen to try your fix)

@jdai8
Copy link
Contributor Author

jdai8 commented Feb 23, 2021

@jongerrish If enabling multiplex workers was the only change at play for you, I think this shouldn't regress from normal persistent workers - at worst you'd be performing the same (doing everything in serial).

That being said, I'll try to have a PR up in the next few days so you can test it out.

@jongerrish
Copy link
Contributor

Well regular persistent workers we give like 8 max instances so 8 processes for the KotlinCompile mnemonic so we'd have 8 parallel Kotlinc jobs running at a time without multiplexed-workers.

@jdai8
Copy link
Contributor Author

jdai8 commented Feb 23, 2021

Yes, I meant within each persistent worker work would happen serially. There might be some other behavior at play - maybe Bazel tries to schedule more work on a worker if it's known to support multiplexing, or something like that.

@jongerrish
Copy link
Contributor

Yeah, that makes sense. There should be just a single process for KotlinCompile and I'm not sure if there are any upper bounds for max number of concurrent requests Bazel will schedule at a time, but it makes sense that Bazel would schedule more work on that process.

@jeffzoch
Copy link
Contributor

@jdai8 how did you generate that graph out of curiosity? Would like to have that tool in my tool belt :)

@jdai8
Copy link
Contributor Author

jdai8 commented Feb 24, 2021

@jeffzoch take a look at the profiling section in the bazel docs: https://docs.bazel.build/versions/master/skylark/performance.html#performance-profiling. It's definitely a useful tool to have!

Specifically, I ran:

bazel build src:all --worker_verbose --worker_max_instances=1 --worker_quit_after_build  

in my reproducer project. Limiting the number of max instances sends every work request to one worker (forcing us to multiplex).

@restingbull
Copy link
Collaborator

Welp. I feel stupid. Lemme go fix that -- coroutines are still a bit obtuse for me.

@restingbull
Copy link
Collaborator

#496

@jdai8
Copy link
Contributor Author

jdai8 commented Mar 2, 2021

I don't think #496 solves this.

When running in my repro project

bazel build //src:all --worker_verbose --experimental_worker_max_multiplex_instances=KotlinCompile=5

I would expect a profile to look like this:
image

(I generated this with my fix at #495).

However, with the fix on master, I get something that looks like this, suggesting actions are still running sequentially:
image

@chancila
Copy link
Contributor

chancila commented Mar 2, 2021

Reading docs on flows...

This operator retains a sequential nature of flow if changing the context does not call for changing the dispatcher. Otherwise, if changing dispatcher is required, it collects flow emissions in one coroutine that is run using a specified context and emits them from another coroutines with the original collector’s context using a channel with a default buffer size between two coroutines similarly to buffer operator, unless buffer operator is explicitly called before or after flowOn, which requests buffering behavior and specifies channel size.

emphasis on retains a sequential nature of flow if changing the context does not call for changing the dispatcher

@jeffzoch
Copy link
Contributor

jeffzoch commented Mar 3, 2021

Yeah for parallel requests in flow you need to do something like this (I use this helper all the time):

/**
 * suspendingParallelMap
 * @param scope - CoroutineScope
 * @param f - suspending function
 */
fun <A, B> Flow<A>.suspendingParallelMap(scope: CoroutineScope, f: suspend (A) -> B): Flow<B> { 
    return flowOn(Dispatchers.IO)
        .map { scope.async { f(it) } }
        .buffer() //default concurrency limit of 64
        .map { it.await() }
}
/*

You can pass a size to the buffer to limit concurrency @restingbull

@restingbull
Copy link
Collaborator

Second pass -- #498

Thanks, @jeffzoch. I sorted through the options and it appears they aren't quite up scaling worker pool primitives yet -- so your helper is by far the cleanest option.

The alternatives are to either:

  • Go to streams
  • Build a channel fan-out, fan-in with a scaling worker pool.

@jeffzoch
Copy link
Contributor

jeffzoch commented Mar 8, 2021

Personally I find streams (assuming you are referring to Java 8 Streams) lackluster for this kind of thing since the amount of control you have over the executor is not great - controlling the level of concurrency per task you want to run isnt as straightforward and it tends to shine in cpu-intensive tasks (since by default it runs on the FJP). Channels are a good approach too not too dissimilar from Flow. Some of the channel api is getting deprecated though as Flow gets more features and I find working with flow to be more pleasant. YMMV

@jdai8
Copy link
Contributor Author

jdai8 commented Mar 10, 2021

While #498 parallelizes the compilation, it still sequences writing the work responses to stdout, since we're serially await'ing each async result.

I'm seeing this when enabling the multiplex flag:

image

The small actions compile in parallel, but they're blocked on writing to stdout until the big action finishes. Once the big action finishes, they all write very quickly. This is slightly better than the previous profile, where there is still a delay (to do the compilation) before each subsequent small action finishes.

Since the underlying work here is thread-blocking, I'm not sure what value coroutines and flow offer - it seems to be tripping us up more than it's helping. Wouldn't it be simpler just to use threads? This is what Bazel does for Java compilation.

@jeffzoch
Copy link
Contributor

@jdai8 are you referring to this https://github.com/restingbull/rules_kotlin/blob/6998aba9ee01198e04a47d39302939ecbd7fda34/src/main/kotlin/io/bazel/worker/PersistentWorker.kt#L115-L122 being serial? If so, that would make sense - collection is done serially. If we want this part to be parallelize we should apply the same parallelMap'ing strategy i outlined above to writing to stdout (and then we can just call collect() at the very end if we dont need any output).

I am also a bit confused on the usage of the private ThreadAwareDispatcher but im also not intimately familiar with exactly how this compilation works. Normally you can just use the Dispatchers.IO as your Dispatcher for this work and call it a day but again I admit im not familiar with how this code is called

@jeffzoch
Copy link
Contributor

jeffzoch commented Mar 11, 2021

@jdai8 mind trying #501?

@restingbull this PR is just for sharing ideas - but I was wondering if things would still work (performantly) by tweaking the persistent worker a bit

image
I am getting the following with #501 which I think is what we want (ran bazel build src:all --worker_verbose --worker_max_instances=1 --worker_quit_after_build --profile=profile.gz)

@jdai8
Copy link
Contributor Author

jdai8 commented Mar 11, 2021

Thanks @jeffzoch! Performance-wise, #501 looks good to me 👍

I still think using threads directly (as in #495) is a simpler implementation, since it doesn't look like we're using any coroutine/flow-specific features. I'll leave it up to @restingbull and others though.

@jeffzoch
Copy link
Contributor

Given that #501 was merged can we close this?

@jdai8
Copy link
Contributor Author

jdai8 commented Mar 16, 2021

Yeah. Thanks!

@jdai8 jdai8 closed this as completed Mar 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants