Skip to content

Commit

Permalink
Improve wave debug-task plugin command
Browse files Browse the repository at this point in the history
This commit adds the ability to specify a custom container 
when debugging a wave-based container task. 

When providing the container name, the nextflow cache is not 
needed to launch the task execution  

Signed-off-by: Jordi Deu-Pons <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
jordeu and pditommaso authored Feb 18, 2023
1 parent 23d47de commit cf3fd74
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package nextflow.fusion
import groovy.transform.CompileStatic
import nextflow.executor.Executor
import nextflow.processor.TaskRun

import static nextflow.fusion.FusionConfig.FUSION_PATH

/**
* Implements commons logic to handle fusion based tasks.
* This trait is expected to be used by a sub-class of {@link nextflow.processor.TaskHandler}
Expand Down Expand Up @@ -59,7 +62,7 @@ trait FusionAwareTask {
List<String> fusionSubmitCli() {
final fusion = fusionLauncher()
final runFile = fusion.toContainerMount(task.workDir.resolve(TaskRun.CMD_RUN))
return ['/usr/bin/fusion', 'bash', runFile.toString() ]
return [FUSION_PATH, 'bash', runFile.toString() ]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class FusionConfig {
final static public String DEFAULT_FUSION_AMD64_URL = 'https://fusionfs.seqera.io/releases/v2.1-amd64.json'
final static public String DEFAULT_FUSION_ARM64_URL = 'https://fusionfs.seqera.io/releases/v2.1-arm64.json'

final static public String FUSION_PATH = '/usr/bin/fusion'

final private Boolean enabled
final private String containerConfigUrl
final private Boolean exportAwsAccessKeys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package io.seqera.wave.plugin.cli

import java.nio.file.Path

import com.google.common.hash.HashCode
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.Session
import nextflow.SysEnv
import nextflow.cache.CacheDB
import nextflow.cache.CacheFactory
import nextflow.exception.AbortOperationException
Expand All @@ -29,6 +32,10 @@ import nextflow.file.FileHelper
import nextflow.trace.TraceRecord
import nextflow.util.HistoryFile

import static nextflow.fusion.FusionConfig.FUSION_PATH

import static nextflow.util.StringUtils.*

/**
* Implement wave container task debug
*
Expand All @@ -47,40 +54,83 @@ class WaveDebugCmd {
}

protected void taskDebug(List<String> args) {
if( !args )
throw new AbortOperationException("Missing id of the task to debug or remote task path")

final criteria = args.pop()

if ( isRemotePath(criteria) && args ) {
final image = args.pop()
runRemoteTask(criteria, image)
}
else {
runCacheTask(criteria)
}

}

protected runRemoteTask(String workDir, String image) {
final path = FileHelper.asPath(workDir)
final cmd = buildWaveRunCmd(path.scheme)
if ( image.startsWith("wave.seqera.io/") ) {
// Run a waved container
cmd.runContainer(image, buildCommand(path))
}
else {
// Wave it before running
List<String> args = [image] + buildCommand(path)
cmd.runContainer(args)
}
}

protected runCacheTask(String criteria) {
HistoryFile history = HistoryFile.DEFAULT
HistoryFile.Record runRecord
if( !history.exists() || history.empty() || !(runRecord=history.findByIdOrName('last')[0]) )
throw new AbortOperationException("It looks no pipeline was executed in this folder (or execution history is empty)")

if( !args )
throw new AbortOperationException("Missing id of the task to debug")
final criteria = args.pop()

this.cacheDb = CacheFactory.create(runRecord.sessionId, runRecord.runName)
cacheDb.openForRead()
try {
final trace = getOrFindTrace(criteria)
if( trace==null )
throw new AbortOperationException("Cannot find any task with id: '$criteria'")

if( !trace.workDir.startsWith('s3://') )
if( !isRemotePath(trace.workDir) )
throw new AbortOperationException("Cannot run non-fusion enabled task - Task work dir: $trace.workDir")

log.info "Launching debug session for task '${trace.get('name')}' - work directory: ${trace.workDir}"
final workDir = FileHelper.asPath(trace.workDir)
final fusionPath = FusionHelper.toContainerMount(workDir, 's3')
new WaveRunCmd(session)
.withContainerParams([tty:true, privileged: true])
.withEnvironment('AWS_ACCESS_KEY_ID')
.withEnvironment('AWS_SECRET_ACCESS_KEY')
.withEnvironment("FUSION_WORK=$fusionPath".toString())
.runContainer(trace.get('container')?.toString(), ['/usr/bin/fusion','sh', '-c', '\'cd $FUSION_WORK && exec bash\''])
final cmd = buildWaveRunCmd(getUrlProtocol(trace.workDir))
cmd.runContainer(trace.get('container')?.toString(), buildCommand(trace.workDir))
}
finally {
cacheDb.close()
}
}

protected static List<String> buildCommand(String workDir) {
final workPath = FileHelper.asPath(workDir)
return buildCommand(workPath)
}

protected static List<String> buildCommand(Path workPath) {
final fusionPath = FusionHelper.toContainerMount(workPath, workPath.scheme)
return [FUSION_PATH, 'sh', '-c', "\'cd $fusionPath && PS1=\"[fusion] \" exec bash --norc\'".toString()]
}

protected WaveRunCmd buildWaveRunCmd(String scheme) {
final result = new WaveRunCmd(session)
result.withContainerParams([tty:true, privileged: true])
if( scheme=='s3' ) {
result.withEnvironment('AWS_ACCESS_KEY_ID')
result.withEnvironment('AWS_SECRET_ACCESS_KEY')
}
else if( scheme=='gs' && SysEnv.containsKey('GOOGLE_APPLICATION_CREDENTIALS') ) {
final path = Path.of(SysEnv.get('GOOGLE_APPLICATION_CREDENTIALS'))
result.withMounts(List.of(path))
result.withEnvironment('GOOGLE_APPLICATION_CREDENTIALS')
}
return result
}

protected TraceRecord getOrFindTrace(String criteria) {
TraceRecord result = null
final norm = criteria.replace('/','')
Expand All @@ -107,4 +157,9 @@ class WaveDebugCmd {
return result
}

static boolean isRemotePath(String path) {
if (!path) return false
final result = getUrlProtocol(path)
return result!=null && result!='file'
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class WaveRunCmd {

private Map containerParams

private List<Path> containerMounts

private Set<String> environment = new HashSet<>()

WaveRunCmd(Session session) { this.session=session }
Expand All @@ -48,6 +50,11 @@ class WaveRunCmd {
return this
}

WaveRunCmd withMounts(List<Path> path) {
this.containerMounts = path
return this
}

WaveRunCmd withEnvironment(String... envs) {
this.environment.addAll(envs)
return this
Expand All @@ -67,6 +74,7 @@ class WaveRunCmd {
final containerBuilder = new DockerBuilder(image)
.addMountWorkDir(false)
.addRunOptions('--rm')
.addMounts(containerMounts)
.params(containerConfig)
.params(containerParams)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2020-2022, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.seqera.wave.plugin.cli

import spock.lang.Specification
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
class WaveDebugCmdTest extends Specification {

@Unroll
def 'should check remote path' () {
expect:
WaveDebugCmd.isRemotePath(PATH) == EXPECTED
where:
PATH | EXPECTED
null | false
'foo' | false
'/some/file' | false
and:
's3://foo/bar' | true
'gs://foo/bar' | true
and:
'file:/foo/bar' | false
'file://foo/bar' | false
'file:///foo/bar' | false
}

}

0 comments on commit cf3fd74

Please sign in to comment.