Skip to content

Commit

Permalink
Fix semaphore in parallel polling monitor (#4927)
Browse files Browse the repository at this point in the history

Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
bentsherman and pditommaso committed May 28, 2024
1 parent a28b404 commit d173cea
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor {

@Override
protected boolean canSubmit(TaskHandler handler) {
return super.canSubmit(handler) && semaphore?.tryAcquire()
return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire())
}

protected RateLimiter createSubmitRateLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package nextflow.processor


import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import nextflow.Session
import nextflow.util.Duration
import nextflow.util.ThrottlingExecutor
import spock.lang.Specification
import spock.lang.Unroll
/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand Down Expand Up @@ -71,4 +73,48 @@ class ParallelPollingMonitorTest extends Specification {

}

@Unroll
def 'should validate can submit method' () {
given:
def success = new AtomicInteger()
def failure = new AtomicInteger()
def count = new AtomicInteger()
def change = new AtomicInteger()
def retry = new AtomicInteger()

def session = Mock(Session)
def handler = Mock(TaskHandler)

def opts = new ThrottlingExecutor.Options()
.retryOn(IllegalArgumentException)
.withRateLimit('10/sec')
.withErrorBurstDelay(Duration.of('5sec'))
.withAutoThrottle()
.onSuccess { success.incrementAndGet() }
.onRetry { retry.incrementAndGet() }
.onFailure { failure.incrementAndGet() }
.onRateLimitChange { change.incrementAndGet() }

def exec = ThrottlingExecutor.create(opts)
def mon = Spy(new ParallelPollingMonitor(exec, [capacity:CAPACITY, session:session, name:'foo', pollInterval:'1sec']))
and:
SUBMIT.times { mon.runningQueue.add(Mock(TaskHandler)) }

when:
def result = mon.canSubmit(handler)
then:
handler.canForkProcess() >> FORK
and:
result == EXPECTED

where:
CAPACITY | SUBMIT | FORK | EXPECTED
0 | 5 | true | true
10 | 5 | true | true
10 | 10 | true | false
and:
0 | 1 | false | false
10 | 1 | false | false
}

}

0 comments on commit d173cea

Please sign in to comment.