Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stdout and clean up PersistentWorker #501

Merged
merged 3 commits into from
Mar 12, 2021

Conversation

jeffzoch
Copy link
Contributor

For discussion in #494


@ExperimentalCoroutinesApi
override fun start(execute: Work) = WorkerContext.run {
//Use channel to serialize writing output
val writeChannel = Channel<WorkerProtocol.WorkResponse>(UNLIMITED)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main idea is to launch the producer in its own coroutine and have the collector run to completion synchronously. The producer does the compilation in parallel and emits results across a channel which is serially read from. Flows are serial by default so we don't need to synchronize access on the consumer side.

* @param f - suspending function
*/
fun <A, B> Flow<A>.suspendingParallelMap(scope: CoroutineScope, f: suspend (A) -> B): Flow<B> {
return flowOn(Dispatchers.IO)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing compilation is CPU-bound, so Dispatchers.DEFAULT or similar might be more appropriate.

info { "received req: ${request.requestId}" }
async {
doTask("request ${request.requestId}") { ctx ->
.asFlow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flow necessary? Would it be simpler to do something like:

generateSequence { WorkRequest.parseDelimitedFrom(io.input) }.forEach {
  launch(Dispatchers.DEFAULT) {
    // do the work
    writeChannel.send(response)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdai8 yeah that would totally work too - I didn't want to rock the boat too much but I can change it to that as its easier to reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just went ahead and changed to just use coroutines (very similar to your thread implementation now just using kotlin primitives I guess. I'd think we would need to be compiling thousands of files to maybe even see a perf difference)

@restingbull
Copy link
Collaborator

Much cleaner, thank you.

Also educational. 👍

@restingbull restingbull merged commit 517d30a into bazelbuild:master Mar 12, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants