Skip to content

Commit

Permalink
Add resourceLimits directive (#2911)
Browse files Browse the repository at this point in the history
This commit introduces the resourceLimits directive that allows capping 
the max value for cpus, memory, time and disk resources. 


Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
bentsherman and pditommaso authored Apr 18, 2024
1 parent 5c410db commit 7c9d965
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 13 deletions.
37 changes: 37 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,43 @@ Resource labels in Azure are added to pools, rather than jobs, in order to facil

See also: [label](#label)

(process-resourcelimits)=

### resourceLimits

:::{versionadded} 24.04.0
:::

The `resourceLimits` directive allows you to specify environment-specific limits for task resource requests. Resource limits can be specified in a process as follows:

```groovy
process my_task {
resourceLimits cpus: 24, memory: 768.GB, time: 72.h
script:
'''
your_command --here
'''
}
```

Or in the Nextflow configuration:

```groovy
process {
resourceLimits = [ cpus: 24, memory: 768.GB, time: 72.h ]
}
```

Resource limits can be defined for the following directives:

- [cpus](#cpus)
- [disk](#disk)
- [memory](#memory)
- [time](#time)

Resource limits are a useful way to specify environment-specific limits alongside tasks with [dynamic resources](#dynamic-computing-resources). Normally, if a task requests more resources than can be provisioned (e.g. a task requests 32 cores but the largest node in the cluster has 24), the task will either fail or cause the pipeline to hang forever as it will never be scheduled. If the `resourceLimits` directive is defined with these limits, the task resources will be automatically reduced to comply with these limits before the job is submitted.

(process-scratch)=

### scratch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class DiskResource {
this.type = opts.type as String
}

DiskResource withRequest(MemoryUnit value) {
return new DiskResource(request: value, type: this.type)
}

private static MemoryUnit toMemoryUnit( value ) {
if( value instanceof MemoryUnit )
return (MemoryUnit)value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,12 @@ class TaskConfig extends LazyMap implements Cloneable {
throw new IllegalArgumentException("Not a valid `ErrorStrategy` value: ${strategy}")
}

def getResourceLimit(String directive) {
final limits = get('resourceLimits') as Map
return limits?.get(directive)
}

MemoryUnit getMemory() {
private MemoryUnit getMemory0() {
def value = get('memory')

if( !value )
Expand All @@ -253,7 +257,13 @@ class TaskConfig extends LazyMap implements Cloneable {
}
}

DiskResource getDiskResource() {
MemoryUnit getMemory() {
final val = getMemory0()
final lim = getResourceLimit('memory') as MemoryUnit
return val && lim && val > lim ? lim : val
}

private DiskResource getDiskResource0() {
def value = get('disk')

if( value instanceof Map )
Expand All @@ -265,6 +275,12 @@ class TaskConfig extends LazyMap implements Cloneable {
return null
}

DiskResource getDiskResource() {
final val = getDiskResource0()
final lim = getResourceLimit('disk') as MemoryUnit
return val && lim && val.request > lim ? val.withRequest(lim) : val
}

MemoryUnit getDisk() {
getDiskResource()?.getRequest()
}
Expand All @@ -290,10 +306,16 @@ class TaskConfig extends LazyMap implements Cloneable {

}

Duration getTime() {
private Duration getTime0() {
return getDuration0('time')
}

Duration getTime() {
final val = getTime0()
final lim = getResourceLimit('time') as Duration
return val && lim && val > lim ? lim : val
}

Duration getMaxSubmitAwait() {
return getDuration0('maxSubmitAwait')
}
Expand All @@ -302,11 +324,17 @@ class TaskConfig extends LazyMap implements Cloneable {
get('cpus') != null
}

int getCpus() {
private int getCpus0() {
final value = get('cpus')
value ? value as int : 1 // note: always return at least 1 cpus
}

int getCpus() {
final val = getCpus0()
final lim = getResourceLimit('cpus') as Integer
return val && lim && val > lim ? lim : val
}

int getMaxRetries() {
def result = get('maxRetries')
def defResult = getErrorStrategy() == ErrorStrategy.RETRY ? 1 : 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,21 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
'arch',
'beforeScript',
'cache',
'cleanup',
'clusterOptions',
'conda',
'cpus',
'container',
'containerOptions',
'cleanup',
'clusterOptions',
'cpus',
'debug',
'disk',
'echo', // deprecated
'errorStrategy',
'executor',
'ext',
'fair',
'machineType',
'queue',
'label',
'machineType',
'maxSubmitAwait',
'maxErrors',
'maxForks',
Expand All @@ -91,9 +90,15 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
'penv',
'pod',
'publishDir',
'queue',
'resourceLabels',
'resourceLimits',
'scratch',
'secret',
'shell',
'spack',
'stageInMode',
'stageOutMode',
'storeDir',
'tag',
'time',
Expand All @@ -102,12 +107,8 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
'val',
'each',
'env',
'secret',
'stdin',
'stdout',
'stageInMode',
'stageOutMode',
'resourceLabels'
]

/**
Expand Down Expand Up @@ -1003,4 +1004,15 @@ class ProcessConfig implements Map<String,Object>, Cloneable {
return this
}

private static final List<String> VALID_RESOURCE_LIMITS = List.of('cpus', 'memory', 'disk', 'time')

ProcessConfig resourceLimits( Map entries ) {
for( entry in entries )
if( entry.key !in VALID_RESOURCE_LIMITS )
throw new IllegalArgumentException("Not a valid directive in `resourceLimits`: $entry.key")

configProperties.put('resourceLimits', entries)
return this
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,9 @@ class DiskResourceTest extends Specification {
[request: _375_GB, type: 'local-ssd'] | _375_GB | 'local-ssd'
}

def 'should return a disk resource with the specified request' () {
expect:
new DiskResource(request: _100_GB).withRequest(_375_GB) == new DiskResource(request: _375_GB)
new DiskResource(request: _100_GB, type: 'ssd').withRequest(_375_GB) == new DiskResource(request: _375_GB, type: 'ssd')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class TaskConfigTest extends Specification {
when:
def config = new TaskConfig().setContext(ten: 10)
config.time = value
config.resourceLimits = [time: '24h']

then:
config.time == expected
Expand All @@ -230,6 +231,7 @@ class TaskConfigTest extends Specification {
new Duration('1s') || 1000
new Duration('2h') || '2h'
new Duration('10h') || { "$ten hours" }
new Duration('24h') || '48h'

}

Expand All @@ -256,6 +258,7 @@ class TaskConfigTest extends Specification {
when:
def config = new TaskConfig().setContext(ten: 10)
config.memory = value
config.resourceLimits = [memory: '16G']

then:
config.memory == expected
Expand All @@ -267,6 +270,7 @@ class TaskConfigTest extends Specification {
new MemoryUnit('1K') || 1024
new MemoryUnit('2M') || '2M'
new MemoryUnit('10G') || { "$ten G" }
new MemoryUnit('16G') || '32G'

}

Expand All @@ -275,6 +279,7 @@ class TaskConfigTest extends Specification {
when:
def config = new TaskConfig().setContext(x: 20)
config.disk = value
config.resourceLimits = [disk: '100G']

then:
config.disk == expected
Expand All @@ -288,6 +293,7 @@ class TaskConfigTest extends Specification {
new MemoryUnit('5M') || '5M'
new MemoryUnit('20G') || { "$x G" }
new MemoryUnit('30G') || MemoryUnit.of('30G')
new MemoryUnit('100G') || '200G'

}

Expand All @@ -296,6 +302,7 @@ class TaskConfigTest extends Specification {
when:
def config = new TaskConfig().setContext(ten: 10)
config.cpus = value
config.resourceLimits = [cpus: 24]

then:
config.cpus == expected
Expand All @@ -308,6 +315,7 @@ class TaskConfigTest extends Specification {
1 | true | 1
8 | true | 8
10 | true | { ten ?: 0 }
24 | true | 32

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,16 @@ class ProcessConfigTest extends Specification {
process.arch == [name: 'linux/x86_64', target: 'zen3']
}

def 'should apply resourceLimits' () {
given:
def process = new ProcessConfig(Mock(BaseScript))

when:
process.resourceLimits time:'1h', memory: '2GB'
then:
process.resourceLimits == [time:'1h', memory: '2GB']
}


def 'should get default config path' () {
given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.util
import java.time.Instant

import spock.lang.Specification
import spock.lang.Unroll

/**
*
Expand Down

0 comments on commit 7c9d965

Please sign in to comment.