diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy index 64b8ee52b6..900b675d8c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy @@ -55,7 +55,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor { @Override protected boolean canSubmit(TaskHandler handler) { - return super.canSubmit(handler) && semaphore?.tryAcquire() + return super.canSubmit(handler) && (semaphore == null || semaphore.tryAcquire()) } protected RateLimiter createSubmitRateLimit() { diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy index 2761e7c031..d7e8327a99 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/ParallelPollingMonitorTest.groovy @@ -16,6 +16,7 @@ package nextflow.processor + import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger @@ -23,6 +24,7 @@ import nextflow.Session import nextflow.util.Duration import nextflow.util.ThrottlingExecutor import spock.lang.Specification +import spock.lang.Unroll /** * * @author Paolo Di Tommaso @@ -71,4 +73,48 @@ class ParallelPollingMonitorTest extends Specification { } + @Unroll + def 'should validate can submit method' () { + given: + def success = new AtomicInteger() + def failure = new AtomicInteger() + def count = new AtomicInteger() + def change = new AtomicInteger() + def retry = new AtomicInteger() + + def session = Mock(Session) + def handler = Mock(TaskHandler) + + def opts = new ThrottlingExecutor.Options() + .retryOn(IllegalArgumentException) + .withRateLimit('10/sec') + .withErrorBurstDelay(Duration.of('5sec')) + .withAutoThrottle() + .onSuccess { success.incrementAndGet() } + .onRetry { retry.incrementAndGet() } + .onFailure { failure.incrementAndGet() } + .onRateLimitChange { change.incrementAndGet() } + + def exec = ThrottlingExecutor.create(opts) + def mon = Spy(new ParallelPollingMonitor(exec, [capacity:CAPACITY, session:session, name:'foo', pollInterval:'1sec'])) + and: + SUBMIT.times { mon.runningQueue.add(Mock(TaskHandler)) } + + when: + def result = mon.canSubmit(handler) + then: + handler.canForkProcess() >> FORK + and: + result == EXPECTED + + where: + CAPACITY | SUBMIT | FORK | EXPECTED + 0 | 5 | true | true + 10 | 5 | true | true + 10 | 10 | true | false + and: + 0 | 1 | false | false + 10 | 1 | false | false + } + }