Skip to content

Commit

Permalink
Add support for clusterOptions as a list of values (#4993)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored May 12, 2024
1 parent b8a2370 commit dd173e3
Show file tree
Hide file tree
Showing 20 changed files with 207 additions and 56 deletions.
18 changes: 13 additions & 5 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,20 @@ Resource requests and other job characteristics can be controlled via the follow
- {ref}`process-queue`
- {ref}`process-time`

### Known Limitations
When specifying `clusterOptions` as a string, multiple options must be separated by semicolons to ensure that the job script is formatted correctly:
```groovy
clusterOptions = '-t besteffort;--project myproject'
```

:::{versionadded} 24.04.0
:::

The same behavior can now be achieved using a string list:
```groovy
clusterOptions = [ '-t besteffort', '--project myproject' ]
```

- Multiple `clusterOptions` should be semicolon-separated to ensure that the OAR job script is accurately formatted:
```groovy
clusterOptions = '-t besteffort;--project myproject'
```
See {ref}`process-clusteroptions` for details.

(pbs-executor)=

Expand Down
24 changes: 24 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,30 @@ The following options are available:

The `clusterOptions` directive allows the usage of any native configuration option accepted by your cluster submit command. You can use it to request non-standard resources or use settings that are specific to your cluster and not supported out of the box by Nextflow.

The cluster options can be a string:

```groovy
process foo {
clusterOptions '-x 1 -y 2'
// ...
}
```

:::{versionchanged} 24.04.0
Prior to this version, grid executors that require each option to be on a separate line in the job script would attempt to split multiple options using a variety of different conventions. Multiple options can now be specified more clearly using a string list as shown below.
:::

The cluster options can also be a string list:

```groovy
process foo {
clusterOptions '-x 1', '-y 2', '--flag'
// ...
}
```

Grid executors that require one option per line will write each option to a separate line, while grid executors that allow multiple options per line will write all options to a single line, the same as with a string. This form is useful to control how the options are split across lines when it is required by the scheduler.

:::{note}
This directive is only used by grid executors. Refer to the {ref}`executor-page` page to see which executors support this directive.
:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.processor.TaskConfig
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskProcessor
Expand Down Expand Up @@ -135,6 +136,23 @@ abstract class AbstractGridExecutor extends Executor {
*/
abstract protected List<String> getDirectives(TaskRun task, List<String> initial)

protected void addClusterOptionsDirective(TaskConfig config, List<String> result) {
final opts = config.getClusterOptions()
if( opts instanceof Collection ) {
for( String it : opts ) {
result.add(it)
result.add('')
}
}
else if( opts instanceof CharSequence ) {
result.add(opts.toString())
result.add('')
}
else if( opts != null ) {
throw new IllegalArgumentException("Unexpected value for clusterOptions process directive - offending value: $opts")
}
}

/**
* Given a task returns a *clean* name used to submit the job to the grid engine.
* That string must not contain blank or special shell characters e.g. parenthesis, etc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ class BridgeExecutor extends AbstractGridExecutor {
}

// other cluster options
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,7 @@ class CrgExecutor extends SgeExecutor {
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskConfig
import nextflow.processor.TaskRun
/**
* Processor for Flux Framework executor
Expand Down Expand Up @@ -90,18 +91,25 @@ class FluxExecutor extends AbstractGridExecutor {
}

// Any extra cluster options the user wants!
// Options tokenized with ; akin to OarExecutor
if( task.config.getClusterOptions() ) {
addClusterOptionsDirective(task.config, result)

result << '/bin/bash' << scriptFile.getName()
return result
}

@Override
protected void addClusterOptionsDirective(TaskConfig config, List<String> result) {
final opts = config.getClusterOptions()
final str = opts instanceof Collection ? opts.join(' ') : opts?.toString()

if( str ) {
// Split by space
for (String item : task.config.getClusterOptions().tokenize(' ')) {
for (String item : str.tokenize(' ')) {
if ( item ) {
result << item.stripIndent(true).trim()
}
}
}
result << '/bin/bash' << scriptFile.getName()
return result
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ class HyperQueueExecutor extends AbstractGridExecutor {
result << '--resource' << "gpus=${task.config.getAccelerator().limit}".toString()

// -- At the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskConfig
import nextflow.processor.TaskRun
/**
* Processor for LSF resource manager
Expand Down Expand Up @@ -116,7 +117,7 @@ class LsfExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
}

// -- at the end append the command script wrapped file name
result.addAll( task.config.getClusterOptionsAsList() )
addClusterOptionsDirective(task.config, result)

// add account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
Expand All @@ -127,6 +128,22 @@ class LsfExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
return result
}

@Override
protected void addClusterOptionsDirective(TaskConfig config, List<String> result) {
final opts = config.getClusterOptions()
// when cluster options are defined as a list rely on default behavior
if( opts instanceof Collection ) {
super.addClusterOptionsDirective(config,result)
}
// when cluster options are a string value use the `getClusterOptionsAsList` for backward compatibility
else if( opts instanceof CharSequence ) {
result.addAll( config.getClusterOptionsAsList() )
}
else if( opts != null ) {
throw new IllegalArgumentException("Unexpected value for clusterOptions process directive - offending value: $opts")
}
}

@Override
String sanitizeJobName( String name ) {
// LSF does not allow square brackets in job names except for job arrays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,7 @@ class MoabExecutor extends AbstractGridExecutor {
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,7 @@ class NqsiiExecutor extends AbstractGridExecutor {
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskConfig
import nextflow.processor.TaskRun
/**
* Processor for OAR resource manager
Expand Down Expand Up @@ -80,16 +81,27 @@ class OarExecutor extends AbstractGridExecutor {
}

// -- at the end append the command script wrapped file name
// Options need to be semicolon ";" separated, if several are needed
if( task.config.getClusterOptions() ) {
for (String item : task.config.getClusterOptions().tokenize(';')) {
result << item << ''
}
}
addClusterOptionsDirective(task.config, result)

return result
}

@Override
void addClusterOptionsDirective(TaskConfig config, List result) {
final opts = config.getClusterOptions()
if( opts instanceof Collection ) {
for( String it in opts ) {
result << it << ''
}
}
// Options need to be semicolon ";" separated, if several are needed
else if( opts instanceof CharSequence ) {
for (String item : opts.toString().tokenize(';')) {
result << item << ''
}
}
}

String getHeaderToken() { '#OAR' }

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class PbsExecutor extends AbstractGridExecutor implements TaskArrayExecutor {

// task cpus
if( task.config.getCpus() > 1 ) {
if( matchOptions(task.config.getClusterOptions()) ) {
if( matchOptions(task.config.getClusterOptionsAsString()) ) {
log.warn1 'cpus directive is ignored when clusterOptions contains -l option\ntip: clusterOptions = { "-l nodes=1:ppn=${task.cpus}:..." }'
}
else {
Expand All @@ -83,17 +83,15 @@ class PbsExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
result << "-l" << "mem=${task.config.getMemory().toString().replaceAll(/[\s]/,'').toLowerCase()}".toString()
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}

// add account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
if( account ) {
result << '-P' << account
}

// -- at the end append the command script wrapped file name
addClusterOptionsDirective(task.config, result)

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ class PbsProExecutor extends PbsExecutor {
// when multiple competing directives are provided, only the first one will take effect
// therefore clusterOptions is added as first to give priority over other options as expected
// by the clusterOptions semantics -- see https://github.com/nextflow-io/nextflow/pull/2036
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

result << '-N' << getJobNameFor(task)

Expand All @@ -79,7 +77,7 @@ class PbsProExecutor extends PbsExecutor {
res << "mem=${task.config.getMemory().getMega()}mb".toString()
}
if( res ) {
if( matchOptions(task.config.getClusterOptions()) ) {
if( matchOptions(task.config.getClusterOptionsAsString()) ) {
log.warn1 'cpus and memory directives are ignored when clusterOptions contains -l option\ntip: clusterOptions = { "-l select=1:ncpus=${task.cpus}:mem=${task.memory.toMega()}mb:..." }'
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ class SgeExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ class SlurmExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
private boolean perCpuMemAllocation

private boolean hasSignalOpt(TaskConfig config) {
def opts = config.getClusterOptions()
final opts = config.getClusterOptionsAsString()
return opts ? opts.contains('--signal ') || opts.contains('--signal=') : false
}


/**
* Gets the directives to submit the specified task to the cluster for execution
*
Expand Down Expand Up @@ -103,9 +102,7 @@ class SlurmExecutor extends AbstractGridExecutor implements TaskArrayExecutor {
}

// -- at the end append the command script wrapped file name
if( task.config.getClusterOptions() ) {
result << task.config.getClusterOptions() << ''
}
addClusterOptionsDirective(task.config, result)

// add slurm account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package nextflow.processor

import nextflow.util.CmdLineOptionMap

import static nextflow.processor.TaskProcessor.*

import java.nio.file.Path
Expand All @@ -33,6 +31,7 @@ import nextflow.executor.res.DiskResource
import nextflow.k8s.model.PodOptions
import nextflow.script.TaskClosure
import nextflow.util.CmdLineHelper
import nextflow.util.CmdLineOptionMap
import nextflow.util.Duration
import nextflow.util.MemoryUnit
/**
Expand Down Expand Up @@ -414,10 +413,6 @@ class TaskConfig extends LazyMap implements Cloneable {
throw new IllegalArgumentException("Not a valid PublishDir collection [${dirs.getClass().getName()}] $dirs")
}

String getClusterOptions() {
return get('clusterOptions')
}

def getContainer() {
return get('container')
}
Expand All @@ -433,6 +428,21 @@ class TaskConfig extends LazyMap implements Cloneable {
return null
}

def getClusterOptions() {
return get('clusterOptions')
}

String getClusterOptionsAsString() {
final opts = getClusterOptions()
if( opts instanceof CharSequence )
return opts.toString()
if( opts instanceof Collection )
return CmdLineHelper.toLine(opts as List<String>)
if( opts != null )
throw new IllegalArgumentException("Unexpected value for clusterOptions process directive - offending value: $opts")
return null
}

/**
* @return Parse the {@code clusterOptions} configuration option and return the entries as a list of values
*/
Expand Down
Loading

0 comments on commit dd173e3

Please sign in to comment.