Skip to content

Commit

Permalink
Wait for all child processes in nxf_parallel (#4050)
Browse files Browse the repository at this point in the history
nxf_parallel runs a number of processes in parallel, distributing them
across a limited number of workers.

Prior to PR #1884, errors were not being checked.  PR#1884 was
intended to remedy this but effectively only checked the _last_
command to exit (which was, unfortunately the case that was tested
for).

Processes that exit while there is still work going on were not being
"waited" on, so their exit status was not being checked, and failures
were escaping unnoticed.

This commit:

- updates the loop that's tracking the work so that it waits on
  processes that have exited; and
- updates the test cases so that the failing command is _not_ the
  last in the list and ensures that the templated text is as expected.


Signed-off-by: George Hartzell <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
hartzell and pditommaso authored Jul 7, 2023
1 parent 96c04e3 commit 60a5f1a
Show file tree
Hide file tree
Showing 10 changed files with 70 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ class BashFunLib<V extends BashFunLib> {
while ((i<\${#cmd[@]})); do
local copy=()
for x in "\${pid[@]}"; do
[[ -e /proc/\$x ]] && copy+=(\$x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/\$x ]] && copy+=(\$x) || wait \$x
done
pid=("\${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ class BashFunLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -88,8 +90,8 @@ class BashFunLibTest extends Specification {

"""
cmds=()
cmds+=("true")
cmds+=("false")
cmds+=("true")
nxf_parallel "\${cmds[@]}"
""".stripIndent()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ class AwsBatchFileCopyStrategyTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -253,7 +255,9 @@ class AwsBatchFileCopyStrategyTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ class AwsBatchScriptLauncherTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -271,7 +273,9 @@ class AwsBatchScriptLauncherTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -441,7 +445,9 @@ class AwsBatchScriptLauncherTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -552,7 +558,9 @@ class AwsBatchScriptLauncherTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -160,7 +162,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -405,7 +409,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -494,7 +500,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -586,7 +594,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -682,7 +692,9 @@ class S3BashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,9 @@ class BashWrapperBuilderWithS3Test extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ class AzFileCopyStrategyTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -267,7 +269,9 @@ class AzFileCopyStrategyTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -426,7 +430,9 @@ class AzFileCopyStrategyTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ class AzBashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -194,7 +196,9 @@ class AzBashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ nxf_parallel() {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@ class GsBashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down Expand Up @@ -256,7 +258,9 @@ class GsBashLibTest extends Specification {
while ((i<${#cmd[@]})); do
local copy=()
for x in "${pid[@]}"; do
[[ -e /proc/$x ]] && copy+=($x)
# if the process exist, keep in the 'copy' array, otherwise wait on it to capture the exit code
# see https://github.com/nextflow-io/nextflow/pull/4050
[[ -e /proc/$x ]] && copy+=($x) || wait $x
done
pid=("${copy[@]}")
Expand Down

0 comments on commit 60a5f1a

Please sign in to comment.