Skip to content

Commit

Permalink
Add account config option for grid executors (#4975)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Dr Marco Claudio De La Pierre <[email protected]>
Co-authored-by: Dr Marco Claudio De La Pierre <[email protected]>
  • Loading branch information
pditommaso and marcodelapierre authored May 8, 2024
1 parent a614fbe commit a09e37d
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 7 deletions.
6 changes: 6 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,12 @@ The `executor` scope controls various executor behaviors.

The following settings are available:

`executor.account`
: :::{versionadded} 24.04.0
:::
: *Used only by the {ref}`slurm-executor`, {ref}`lsf-executor`, {ref}`pbs-executor` and {ref}`pbspro-executor` executors.*
: Allows specifying the project or organisation account that should be charged for running the pipeline jobs.

`executor.cpus`
: The maximum number of CPUs made available by the underlying system. Used only by the `local` executor.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ class LsfExecutor extends AbstractGridExecutor {
// -- at the end append the command script wrapped file name
result.addAll( task.config.getClusterOptionsAsList() )

// add account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
if( account ) {
result << '-G' << account
}

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ class PbsExecutor extends AbstractGridExecutor {
result << task.config.getClusterOptions() << ''
}

// add account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
if( account ) {
result << '-P' << account
}

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class PbsProExecutor extends PbsExecutor {
result << "-l" << "walltime=${duration.format('HH:mm:ss')}".toString()
}

// add account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
if( account ) {
result << '-P' << account
}

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ class SlurmExecutor extends AbstractGridExecutor {
result << task.config.getClusterOptions() << ''
}

// add slurm account from config
final account = session.getExecConfigProp(getName(), 'account', null) as String
if( account ) {
result << '-A' << account
}

return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class LsfExecutorTest extends Specification {
given:
def WORK_DIR = Paths.get('/work/dir')
def executor = Spy(LsfExecutor)
executor.getSession() >> Mock(Session)
and:
def task = Mock(TaskRun)
task.workDir >> WORK_DIR

Expand All @@ -65,6 +67,8 @@ class LsfExecutorTest extends Specification {
given:
def WORK_DIR = Paths.get('/work/dir')
def executor = Spy(new LsfExecutor(memUnit:'GB', usageUnit:'GB'))
executor.getSession() >> Mock(Session)
and:
def task = Mock(TaskRun)
task.workDir >> WORK_DIR

Expand All @@ -86,6 +90,8 @@ class LsfExecutorTest extends Specification {
given:
def WORK_DIR = Paths.get('/work/dir')
def executor = Spy(new LsfExecutor(usageUnit:'KB', perJobMemLimit:true))
executor.getSession() >> Mock(Session)
and:
def task = Mock(TaskRun)
task.workDir >> WORK_DIR

Expand All @@ -109,6 +115,8 @@ class LsfExecutorTest extends Specification {
given:
def WORK_DIR = Paths.get('/work/dir')
def executor = Spy(new LsfExecutor(perTaskReserve:true, perJobMemLimit: true, usageUnit:'KB'))
executor.getSession() >> Mock(Session)
and:
def task = Mock(TaskRun)
task.workDir >> WORK_DIR

Expand Down Expand Up @@ -302,12 +310,14 @@ class LsfExecutorTest extends Specification {
given:
def config = new TaskConfig(clusterOptions: [], disk: '10GB')
def WORKDIR = Paths.get('/my/work')
def lsf = Spy(LsfExecutor)
lsf.@memUnit = 'MB'
def executor = Spy(LsfExecutor)
executor.getSession() >> Mock(Session)
executor.@memUnit = 'MB'
and:
def task = Mock(TaskRun)

when:
def result = lsf.getDirectives(task)
def result = executor.getDirectives(task)
then:
task.workDir >> WORKDIR
task.config >> config
Expand Down Expand Up @@ -721,5 +731,31 @@ class LsfExecutorTest extends Specification {
'a'.repeat(509) | 'nf-'.concat("a".repeat(508))
}

@Unroll
def 'should set lsf account' () {
given:
// task
def task = new TaskRun()
task.workDir = Paths.get('/work/dir')
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> Mock(Session)
task.config = Mock(TaskConfig) { getClusterOptionsAsList()>>[] }
and:
def executor = Spy(LsfExecutor)
executor.getJobNameFor(_) >> 'foo'
executor.getName() >> 'lsf'
executor.getSession() >> Mock(Session) { getExecConfigProp('lsf', 'account',null)>>ACCOUNT }

when:
def result = executor.getDirectives(task, [])
then:
result == EXPECTED

where:
ACCOUNT | EXPECTED
null | ['-o', '/work/dir/.command.log', '-J', 'foo']
'project-123' | ['-o', '/work/dir/.command.log', '-J', 'foo', '-G', 'project-123']
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.executor

import java.nio.file.Paths

import nextflow.Session
import nextflow.processor.TaskConfig
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
Expand All @@ -33,7 +34,7 @@ class PbsExecutorTest extends Specification {
def testGetCommandLine() {

given:
def executor = [:] as PbsExecutor
def executor = Spy(PbsExecutor)
def task = Mock(TaskRun); task.getName() >> 'hello world'

expect:
Expand All @@ -44,7 +45,8 @@ class PbsExecutorTest extends Specification {
def testHeaders() {

setup:
def executor = [:] as PbsExecutor
def executor = Spy(PbsExecutor)
executor.getSession() >> Mock(Session)

// mock process
def proc = Mock(TaskProcessor)
Expand Down Expand Up @@ -167,6 +169,7 @@ class PbsExecutorTest extends Specification {

setup:
def executor = Spy(PbsExecutor)
executor.getSession() >> Mock(Session)

// mock process
def proc = Mock(TaskProcessor)
Expand Down Expand Up @@ -299,4 +302,28 @@ class PbsExecutorTest extends Specification {
!PbsExecutor.matchOptions('-x-l foo')
}

def 'should set pbs account' () {
given:
// task
def task = new TaskRun()
task.workDir = Paths.get('/work/dir')
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> Mock(Session)
task.config = Mock(TaskConfig) { getClusterOptionsAsList()>>[] }
and:
def executor = Spy(PbsExecutor)
executor.getJobNameFor(_) >> 'foo'
executor.getName() >> 'pbs'
executor.getSession() >> Mock(Session) { getExecConfigProp('pbs', 'account',null)>>ACCOUNT }

when:
def result = executor.getDirectives(task, [])
then:
result == EXPECTED

where:
ACCOUNT | EXPECTED
null | ['-N', 'foo', '-o', '/work/dir/.command.log', '-j', 'oe']
'project-123' | ['-N', 'foo', '-o', '/work/dir/.command.log', '-j', 'oe', '-P', 'project-123']
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package nextflow.executor

import nextflow.Session
import nextflow.processor.TaskProcessor
import spock.lang.Specification

Expand All @@ -32,7 +33,9 @@ class PbsProExecutorTest extends Specification {

def 'should get directives' () {
given:
def session = Mock(Session) { getConfig()>>[:] }
def executor = Spy(PbsProExecutor)
executor.getSession() >> session
def WORK_DIR = Paths.get('/here')

def task = Mock(TaskRun)
Expand All @@ -54,7 +57,9 @@ class PbsProExecutorTest extends Specification {

def 'should get directives with queue' () {
given:
def session = Mock(Session) { getConfig()>>[:] }
def executor = Spy(PbsProExecutor)
executor.getSession()>>session
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
Expand All @@ -78,6 +83,8 @@ class PbsProExecutorTest extends Specification {
def 'should get directives with cpus' () {
given:
def executor = Spy(PbsProExecutor)
executor.getSession() >> Mock(Session)
and:
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
Expand All @@ -102,6 +109,7 @@ class PbsProExecutorTest extends Specification {
def 'should get directives with mem' () {
given:
def executor = Spy(PbsProExecutor)
executor.getSession() >> Mock(Session)
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
Expand All @@ -126,6 +134,7 @@ class PbsProExecutorTest extends Specification {
def 'should get directives with cpus and mem' () {
given:
def executor = Spy(PbsProExecutor)
executor.getSession() >> Mock(Session)
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
Expand All @@ -150,6 +159,7 @@ class PbsProExecutorTest extends Specification {
def 'should ignore cpus and memory when clusterOptions contains -l option' () {
given:
def executor = Spy(PbsProExecutor)
executor.getSession() >> Mock(Session)
def WORK_DIR = Paths.get('/foo/bar')

def task = Mock(TaskRun)
Expand Down Expand Up @@ -221,7 +231,8 @@ class PbsProExecutorTest extends Specification {
def 'should report cluster as first' () {

setup:
def executor = [:] as PbsProExecutor
def executor = Spy(PbsProExecutor)
executor.getSession() >> Mock(Session)

// mock process
def proc = Mock(TaskProcessor)
Expand All @@ -245,4 +256,28 @@ class PbsProExecutorTest extends Specification {
.stripIndent().leftTrim()
}

def 'should set pbs account' () {
given:
// task
def task = new TaskRun()
task.workDir = Paths.get('/work/dir')
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> Mock(Session)
task.config = Mock(TaskConfig) { getClusterOptionsAsList()>>[] }
and:
def executor = Spy(PbsProExecutor)
executor.getJobNameFor(_) >> 'foo'
executor.getName() >> 'pbspro'
executor.getSession() >> Mock(Session) { getExecConfigProp('pbspro', 'account',null)>>ACCOUNT }

when:
def result = executor.getDirectives(task, [])
then:
result == EXPECTED

where:
ACCOUNT | EXPECTED
null | ['-N', 'foo', '-o', '/work/dir/.command.log', '-j', 'oe']
'project-123' | ['-N', 'foo', '-o', '/work/dir/.command.log', '-j', 'oe', '-P', 'project-123']
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

package nextflow.executor

import java.nio.file.Paths

import nextflow.Session
Expand All @@ -23,7 +24,6 @@ import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import spock.lang.Specification
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -82,6 +82,7 @@ class SlurmExecutorTest extends Specification {
setup:
// SLURM executor
def executor = [:] as SlurmExecutor
executor.session = Mock(Session)

// mock process
def proc = Mock(TaskProcessor)
Expand Down Expand Up @@ -210,6 +211,7 @@ class SlurmExecutorTest extends Specification {
setup:
// LSF executor
def executor = Spy(SlurmExecutor)
executor.session = Mock(Session)

// mock process
def proc = Mock(TaskProcessor)
Expand Down Expand Up @@ -275,4 +277,30 @@ class SlurmExecutorTest extends Specification {
exec.queueStatusCommand('xxx') == ['squeue','--noheader','-o','%i %t','-t','all','-p','xxx','-u', usr]

}

@Unroll
def 'should set slurm account' () {
given:
// task
def task = new TaskRun()
task.workDir = Paths.get('/work/dir')
task.processor = Mock(TaskProcessor)
task.processor.getSession() >> Mock(Session)
task.config = Mock(TaskConfig)
and:
def executor = Spy(SlurmExecutor)
executor.getJobNameFor(_) >> 'foo'
executor.getName() >> 'slurm'
executor.getSession() >> Mock(Session) { getExecConfigProp('slurm', 'account',null)>>ACCOUNT }

when:
def result = executor.getDirectives(task, [])
then:
result == EXPECTED

where:
ACCOUNT | EXPECTED
null | ['-J', 'foo', '-o', '/work/dir/.command.log', '--no-requeue', '', '--signal', 'B:USR2@30']
'project-123' | ['-J', 'foo', '-o', '/work/dir/.command.log', '--no-requeue', '', '--signal', 'B:USR2@30', '-A', 'project-123']
}
}

0 comments on commit a09e37d

Please sign in to comment.