Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize stdout and clean up PersistentWorker #501

Merged
merged 3 commits into from
Mar 12, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 53 additions & 58 deletions src/main/kotlin/io/bazel/worker/PersistentWorker.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,17 @@ import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.io.PrintStream
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.Executors
import kotlin.coroutines.CoroutineContext
Expand All @@ -50,81 +60,66 @@ class PersistentWorker(

constructor() : this(Dispatchers.IO, IO.Companion::capture)

/**
* ThreadAwareDispatchers provides an ability to separate thread blocking operations from coroutines..
*
* Coroutines interleave actions over a pool of threads. When an action blocks it stands a chance
* of producing a deadlock. We sidestep this by providing a separate dispatcher to contain
* blocking operations, like reading from a stream. Inelegant, and a bit of a sledgehammer, but
* safe for the moment.
*/
private class BlockableDispatcher(
private val unblockedContext: CoroutineContext,
private val blockingContext: ExecutorCoroutineDispatcher,
scope: CoroutineScope
) : CoroutineScope by scope {
companion object {
fun <T> runIn(
owningContext: CoroutineContext,
exec: suspend BlockableDispatcher.() -> T
) =
Executors.newCachedThreadPool().asCoroutineDispatcher().use { dispatcher ->
runBlocking(owningContext) { BlockableDispatcher(owningContext, dispatcher, this).exec() }
}
}

fun <T> blockable(action: () -> T): T {
return runBlocking(blockingContext) {
return@runBlocking action()
}
}
}
val scope = CoroutineScope(Job() + Dispatchers.IO)

@ExperimentalCoroutinesApi
override fun start(execute: Work) = WorkerContext.run {
//Use channel to serialize writing output
val writeChannel = Channel<WorkerProtocol.WorkResponse>(UNLIMITED)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main idea is to launch the producer in its own coroutine and have the collector run to completion synchronously. The producer does the compilation in parallel and emits results across a channel which is serially read from. Flows are serial by default so we don't need to synchronize access on the consumer side.

captureIO().use { io ->
BlockableDispatcher.runIn(coroutineContext) {
blockable {
runBlocking {
launch {
generateSequence { WorkRequest.parseDelimitedFrom(io.input) }
}.asFlow()
.map { request ->
info { "received req: ${request.requestId}" }
async {
doTask("request ${request.requestId}") { ctx ->
.asFlow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this flow necessary? Would it be simpler to do something like:

generateSequence { WorkRequest.parseDelimitedFrom(io.input) }.forEach {
  launch(Dispatchers.DEFAULT) {
    // do the work
    writeChannel.send(response)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jdai8 yeah that would totally work too - I didn't want to rock the boat too much but I can change it to that as its easier to reason about.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just went ahead and changed to just use coroutines (very similar to your thread implementation now just using kotlin primitives I guess. I'd think we would need to be compiling thousands of files to maybe even see a perf difference)

.suspendingParallelMap(scope) { request ->
val result = doTask("request ${request.requestId}") { ctx ->
request.argumentsList.run {
execute(ctx, toList())
}
}.let { result ->
info { "task result ${result.status}" }
WorkerProtocol.WorkResponse.newBuilder().apply {
output =
listOf(
result.log.out.toString(),
io.captured.toByteArray().toString(UTF_8)
).filter { it.isNotBlank() }.joinToString("\n")
exitCode = result.status.exit
requestId = request.requestId
}.build()
}
}
}
.buffer()
.map { deferred ->
deferred.await()
}
.collect { response ->
blockable {
info { "task result ${result.status}" }
val response = WorkerProtocol.WorkResponse.newBuilder().apply {
output = listOf(
result.log.out.toString(),
io.captured.toByteArray().toString(UTF_8)
).filter { it.isNotBlank() }.joinToString("\n")
exitCode = result.status.exit
requestId = request.requestId
}.build()
info {
response.toString()
}
response.writeDelimitedTo(io.output)
io.output.flush()
writeChannel.send(response)
}
}
.onCompletion { writeChannel.close() }
.collect()
}
writeChannel.consumeAsFlow()
.collect { response -> writeOutput(response, io.output) }
}

io.output.close()
info { "stopped worker" }
}
return@run 0
}

private suspend fun writeOutput(response: WorkerProtocol.WorkResponse, output: PrintStream) =
withContext(Dispatchers.IO) {
response.writeDelimitedTo(output)
output.flush()
}

/**
* suspendingParallelMap
* @param scope - CoroutineScope
* @param f - suspending function
*/
fun <A, B> Flow<A>.suspendingParallelMap(scope: CoroutineScope, f: suspend (A) -> B): Flow<B> {
return flowOn(Dispatchers.IO)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing compilation is CPU-bound, so Dispatchers.DEFAULT or similar might be more appropriate.

.map { scope.async { f(it) } }
.buffer() //default concurrency limit of 64
.map { it.await() }
}

}