Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add remote bin support for TES in a workdir #3990

Merged
merged 3 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import nextflow.processor.TaskRun
@CompileStatic
class TesBashBuilder extends BashWrapperBuilder {

TesBashBuilder(TaskRun task) {
super(new TaskBean(task), new TesFileCopyStrategy())
TesBashBuilder(TaskRun task, String remoteBinDir) {
super(new TaskBean(task), new TesFileCopyStrategy(remoteBinDir))
}

TesBashBuilder(TaskBean task) {
super(task, new TesFileCopyStrategy())
TesBashBuilder(TaskBean task, String remoteBinDir) {
super(task, new TesFileCopyStrategy(remoteBinDir))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package nextflow.ga4gh.tes.executor

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.exception.AbortOperationException
import nextflow.executor.Executor
import nextflow.extension.FilesEx
import nextflow.ga4gh.tes.client.ApiClient
import nextflow.ga4gh.tes.client.api.TaskServiceApi
import nextflow.processor.TaskHandler
Expand All @@ -30,6 +32,8 @@ import nextflow.util.Duration
import nextflow.util.ServiceName
import org.pf4j.ExtensionPoint

import java.nio.file.Path

/**
* Experimental TES executor
*
Expand All @@ -44,14 +48,15 @@ class TesExecutor extends Executor implements ExtensionPoint {

private TaskServiceApi client

/**
* A path accessible to TES where executable scripts need to be uploaded
*/
private Path remoteBinDir = null

@Override
protected void register() {
if( session.binDir && !session.binDir.empty() ) {
session.abort()
throw new AbortOperationException("ERROR: TES executor does not allow the use of custom scripts in the `bin` folder")
}

super.register()
uploadBinDir()

client = new TaskServiceApi( new ApiClient(basePath: getEndPoint()) )
}
Expand All @@ -64,6 +69,22 @@ class TesExecutor extends Executor implements ExtensionPoint {
client
}

@PackageScope
Path getRemoteBinDir() {
remoteBinDir
}

protected void uploadBinDir() {
/*
* upload local binaries
*/
if( session.binDir && !session.binDir.empty() && !session.disableRemoteBinDir ) {
def tempBin = getTempDir()
log.info "Uploading local `bin` scripts folder to ${tempBin.toUriString()}/bin"
remoteBinDir = FilesEx.copyTo(session.binDir, tempBin)
}
}

protected String getEndPoint() {
def result = session.getConfigAttribute('executor.tes.endpoint', 'http://localhost:8000')
log.debug "[TES] endpoint=$result"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ import nextflow.util.Escape
*/
@CompileStatic
class TesFileCopyStrategy implements ScriptFileCopyStrategy {
private String remoteBinDir

TesFileCopyStrategy(String remoteBinDir) {
this.remoteBinDir = remoteBinDir
}

/**
* {@inheritDoc}
Expand Down Expand Up @@ -97,7 +102,25 @@ class TesFileCopyStrategy implements ScriptFileCopyStrategy {
*/
@Override
String getEnvScript(Map env, boolean container) {
if(container) throw new UnsupportedOperationException()
TaskProcessor.bashEnvironmentScript(env,false)
if( container )
throw new UnsupportedOperationException("Parameter `container` not supported by ${this.class.simpleName}")

final result = new StringBuilder()
final copy = env ? new HashMap<String,String>(env) : Collections.<String,String>emptyMap()
final path = copy.containsKey('PATH')
// remove any external PATH
if( path )
copy.remove('PATH')
// when a remote bin directory is provide managed it properly
if( remoteBinDir ) {
result << "cp -r ${remoteBinDir}/* \$PWD/nextflow-bin/\n"
result << "chmod +x \$PWD/nextflow-bin/* || true\n"
result << "export PATH=\$PWD/nextflow-bin:\$PATH\n"
}
// finally render the environment
final envSnippet = TaskProcessor.bashEnvironmentScript(copy,false)
if( envSnippet )
result << envSnippet
return result.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package nextflow.ga4gh.tes.executor

import nextflow.ga4gh.tes.client.model.TesFileType
import nextflow.processor.TaskBean

import static nextflow.processor.TaskStatus.COMPLETED
import static nextflow.processor.TaskStatus.RUNNING
Expand Down Expand Up @@ -53,7 +54,7 @@ class TesTaskHandler extends TaskHandler {

final List<TesState> STARTED_STATUSES = [TesState.INITIALIZING, TesState.RUNNING, TesState.PAUSED] + COMPLETE_STATUSES

final TesExecutor executor
private TesExecutor executor

private final Path exitFile

Expand All @@ -75,6 +76,9 @@ class TesTaskHandler extends TaskHandler {

private String requestId

/** only for testing purpose -- do not use */
protected TesTaskHandler() {}

TesTaskHandler(TaskRun task, TesExecutor executor) {
super(task)
this.executor = executor
Expand All @@ -90,6 +94,8 @@ class TesTaskHandler extends TaskHandler {
this.traceFile = task.workDir.resolve(TaskRun.CMD_TRACE)
}

protected String getRequestId() { requestId }

@Override
boolean checkIfRunning() {

Expand Down Expand Up @@ -148,7 +154,8 @@ class TesTaskHandler extends TaskHandler {
void submit() {

// create task wrapper
final bash = new TesBashBuilder(task)
String remoteBinDir = executor.getRemoteBinDir()
final bash = newTesBashBuilder(task, remoteBinDir)
bash.build()

final body = newTesTask()
Expand All @@ -159,7 +166,11 @@ class TesTaskHandler extends TaskHandler {
status = TaskStatus.SUBMITTED
}

protected final TesTask newTesTask() {
protected TesBashBuilder newTesBashBuilder(TaskRun task, String remoteBinDir) {
return new TesBashBuilder(task, remoteBinDir)
}

protected TesTask newTesTask() {
// the cmd list to launch it
def job = new ArrayList(BashWrapperBuilder.BASH) << wrapperFile.getName()
List cmd = ['/bin/bash','-c', job.join(' ') + " &> $TaskRun.CMD_LOG" ]
Expand Down Expand Up @@ -195,6 +206,8 @@ class TesTaskHandler extends TaskHandler {
body.addOutputsItem(outItem(fileName))
}

body.setName(task.getName())

// add the executor
body.executors = [exec]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TesBashBuilderTest extends Specification {
name: 'Hello 1',
workDir: folder,
script: 'echo Hello world!',
] as TaskBean )
] as TaskBean, null)
bash.build()

then:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package nextflow.ga4gh.tes.executor

import nextflow.ga4gh.tes.client.api.TaskServiceApi
import nextflow.ga4gh.tes.client.model.TesCreateTaskResponse
import nextflow.ga4gh.tes.client.model.TesTask
import nextflow.processor.TaskStatus

import java.nio.file.Path
import java.nio.file.Paths

import nextflow.processor.TaskConfig
Expand Down Expand Up @@ -49,5 +55,33 @@ class TesTaskHandlerTest extends Specification {

}

def 'should submit job' () {

given:
def executor = Mock(TesExecutor)
def task = Mock(TaskRun)
def bashBuilder = Mock(TesBashBuilder)
def client = Mock(TaskServiceApi)
def handler = Spy(TesTaskHandler)
handler.@client = client
handler.@executor = executor
handler.task = task

def req = Mock(TesTask)
def resp = Mock(TesCreateTaskResponse)

when:
handler.submit()
then:
1 * executor.getRemoteBinDir() >> Path.of("/work/bin")
1 * handler.newTesBashBuilder(task, "/work/bin") >> bashBuilder
1 * bashBuilder.build() >> null
1 * handler.newTesTask() >> req
1 * client.createTask(req) >> resp
1 * resp.getId() >> '12345'

handler.status == TaskStatus.SUBMITTED
handler.requestId == '12345'
}

}