Skip to content

Commit

Permalink
Detecting errors in data unstaging (#5345)
Browse files Browse the repository at this point in the history

Signed-off-by: jorgee <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
jorgee and pditommaso authored Dec 2, 2024
1 parent ab13ce5 commit 3c8e602
Show file tree
Hide file tree
Showing 16 changed files with 140 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ class SimpleFileCopyStrategy implements ScriptFileCopyStrategy {
return """\
IFS=\$'\\n'
for name in \$(eval "ls -1d ${escape.join(' ')}" | sort | uniq); do
${stageOutCommand('$name', targetDir, mode)} || true
${stageOutCommand('$name', targetDir, mode)}
done
unset IFS""".stripIndent(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
## Capture possible errors.
## Can be caused either by the task script, unstage script or after script if defined
local last_err=$?
## capture the task error first or fallback to unstage error
local exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status {{exit_file}}
set +u
{{cleanup_cmd}}
Expand All @@ -121,13 +127,26 @@ nxf_stage() {
{{stage_inputs}}
}

nxf_unstage() {
nxf_unstage_outputs() {
true
{{unstage_controls}}
[[ ${nxf_main_ret:=0} != 0 ]] && return
{{unstage_outputs}}
}

nxf_unstage_controls() {
true
{{unstage_controls}}
}

nxf_unstage() {
## Deactivate fast failure to allow uploading stdout and stderr files later
if [[ ${nxf_main_ret:=0} == 0 ]]; then
## Data unstaging redirecting stdout and stderr with append mode
(set -e -o pipefail; (nxf_unstage_outputs | tee -a {{stdout_file}}) 3>&1 1>&2 2>&3 | tee -a {{stderr_file}})
nxf_unstage_ret=$?
fi
nxf_unstage_controls
}

nxf_main() {
trap on_exit EXIT
trap on_term TERM INT USR2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class BashWrapperBuilderTest extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_fs_copy "$name" /work/dir || true
nxf_fs_copy "$name" /work/dir
done
unset IFS
'''.stripIndent().rightTrim()
Expand All @@ -576,7 +576,7 @@ class BashWrapperBuilderTest extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_fs_move "$name" /another/dir || true
nxf_fs_move "$name" /another/dir
done
unset IFS
'''.stripIndent().rightTrim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_copy "$name" /target/work\\ dir || true
nxf_fs_copy "$name" /target/work\\ dir
done
unset IFS
'''
Expand All @@ -293,7 +293,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_move "$name" /target/store || true
nxf_fs_move "$name" /target/store
done
unset IFS
'''
Expand All @@ -315,7 +315,7 @@ class SimpleFileCopyStrategyTest extends Specification {
script == '''
IFS=$'\\n'
for name in $(eval "ls -1d simple.txt my/path/file.bam" | sort | uniq); do
nxf_fs_rsync "$name" /target/work\\'s || true
nxf_fs_rsync "$name" /target/work\\'s
done
unset IFS
'''
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
local exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -289,9 +292,20 @@ nxf_stage() {
true
}

nxf_unstage() {
nxf_unstage_outputs() {
true
}

nxf_unstage_controls() {
true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_unstage() {
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (nxf_unstage_outputs | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
nxf_unstage_controls
}

nxf_main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
local exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -100,9 +103,20 @@ nxf_stage() {
true
}

nxf_unstage() {
nxf_unstage_outputs() {
true
}

nxf_unstage_controls() {
true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_unstage() {
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (nxf_unstage_outputs | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
nxf_unstage_controls
}

nxf_main() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BashWrapperBuilderWithS3Test extends Specification {
binding.unstage_outputs == '''\
IFS=$'\\n'
for name in $(eval "ls -1d test.bam test.bai bla\\ nk.txt" | sort | uniq); do
nxf_s3_upload $name s3://some/buck\\ et || true
nxf_s3_upload $name s3://some/buck\\ et
done
unset IFS
'''.stripIndent().rightTrim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class BashWrapperBuilderWithAzTest extends Specification {
binding.unstage_outputs == """\
IFS=\$'\\n'
for name in \$(eval "ls -1d test.bam test.bai" | sort | uniq); do
nxf_az_upload \$name '${AzHelper.toHttpUrl(target)}' || true
nxf_az_upload \$name '${AzHelper.toHttpUrl(target)}'
done
unset IFS
""".stripIndent().rightTrim()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ class GoogleLifeSciencesHelper {
final remoteTaskDir = getRemoteTaskDir(workDir)
def result = 'set -x; '
result += "trap 'err=\$?; exec 1>&2; gsutil -m -q cp -R $localTaskDir/${TaskRun.CMD_LOG} ${remoteTaskDir}/${TaskRun.CMD_LOG} || true; [[ \$err -gt 0 || \$GOOGLE_LAST_EXIT_STATUS -gt 0 || \$NXF_DEBUG -gt 0 ]] && { ls -lah $localTaskDir || true; gsutil -m -q cp -R /google/ ${remoteTaskDir}; } || rm -rf $localTaskDir; exit \$err' EXIT; "
result += "{ cd $localTaskDir; bash ${TaskRun.CMD_RUN} nxf_unstage; } >> $localTaskDir/${TaskRun.CMD_LOG} 2>&1"
result += "{ cd $localTaskDir; bash ${TaskRun.CMD_RUN} nxf_unstage;} >> $localTaskDir/${TaskRun.CMD_LOG} 2>&1"
return result
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ class GoogleLifeSciencesHelperTest extends GoogleSpecification {
def unstage = helper.getUnstagingScript(dir)
then:
unstage ==
'set -x; trap \'err=$?; exec 1>&2; gsutil -m -q cp -R /work/dir/.command.log gs://my-bucket/work/dir/.command.log || true; [[ $err -gt 0 || $GOOGLE_LAST_EXIT_STATUS -gt 0 || $NXF_DEBUG -gt 0 ]] && { ls -lah /work/dir || true; gsutil -m -q cp -R /google/ gs://my-bucket/work/dir; } || rm -rf /work/dir; exit $err\' EXIT; { cd /work/dir; bash .command.run nxf_unstage; } >> /work/dir/.command.log 2>&1'
'set -x; trap \'err=$?; exec 1>&2; gsutil -m -q cp -R /work/dir/.command.log gs://my-bucket/work/dir/.command.log || true; [[ $err -gt 0 || $GOOGLE_LAST_EXIT_STATUS -gt 0 || $NXF_DEBUG -gt 0 ]] && { ls -lah /work/dir || true; gsutil -m -q cp -R /google/ gs://my-bucket/work/dir; } || rm -rf /work/dir; exit $err\' EXIT; { cd /work/dir; bash .command.run nxf_unstage;} >> /work/dir/.command.log 2>&1'
}

@Unroll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,10 @@ nxf_fs_fcp() {
}

on_exit() {
exit_status=${nxf_main_ret:=$?}
local last_err=$?
local exit_status=${nxf_main_ret:=0}
[[ ${exit_status} -eq 0 && ${nxf_unstage_ret:=0} -ne 0 ]] && exit_status=${nxf_unstage_ret:=0}
[[ ${exit_status} -eq 0 && ${last_err} -ne 0 ]] && exit_status=${last_err}
printf -- $exit_status > {{folder}}/.exitcode
set +u
exit $exit_status
Expand All @@ -192,12 +195,23 @@ nxf_stage() {
nxf_parallel "${downloads[@]}"
}

nxf_unstage() {
nxf_unstage_outputs() {
true
}

nxf_unstage_controls() {
true
gsutil -m -q cp -R .command.out gs://bucket/work/dir/.command.out || true
gsutil -m -q cp -R .command.err gs://bucket/work/dir/.command.err || true
gsutil -m -q cp -R .exitcode gs://bucket/work/dir/.exitcode || true
[[ ${nxf_main_ret:=0} != 0 ]] && return
}

nxf_unstage() {
if [[ ${nxf_main_ret:=0} == 0 ]]; then
(set -e -o pipefail; (nxf_unstage_outputs | tee -a .command.out) 3>&1 1>&2 2>&3 | tee -a .command.err)
nxf_unstage_ret=$?
fi
nxf_unstage_controls
}

nxf_main() {
Expand Down
12 changes: 12 additions & 0 deletions validation/awsbatch-unstage-fail.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* do not include plugin requirements otherwise latest
* published version will be downloaded instead of using local build
*/

workDir = 's3://nextflow-ci/work'
process.executor = 'awsbatch'
process.queue = 'nextflow-ci'
process.container = 'quay.io/nextflow/test-aws-unstage-fail:1.0'
aws.region = 'eu-west-1'
aws.batch.maxTransferAttempts = 3
aws.batch.delayBetweenAttempts = '5 sec'
9 changes: 8 additions & 1 deletion validation/awsbatch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ get_abs_filename() {

export NXF_CMD=${NXF_CMD:-$(get_abs_filename ../launch.sh)}

# Execution should fail ignoring
$NXF_CMD run test-aws-unstage-fail.nf -c awsbatch-unstage-fail.config || true
[[ `grep -c "Error executing process > 'test (1)'" .nextflow.log` == 1 ]] || false
[[ `grep -c " Essential container in task exited" .nextflow.log` == 1 ]] || false
[[ `grep -cozP "Command exit status:\n 1" .nextflow.log` == 1 ]] || false
[[ `grep -c "Producing a failure in aws" .nextflow.log` == 2 ]] || false

$NXF_CMD run test-complexpaths.nf -c awsbatch.config
[[ -d foo ]] || false
[[ -e 'foo/.alpha' ]] || false
Expand Down Expand Up @@ -73,4 +80,4 @@ $NXF_CMD run nextflow-io/hello \
-process.array 10 \
-with-wave \
-with-fusion \
-c awsbatch.config
-c awsbatch.config
11 changes: 11 additions & 0 deletions validation/test-aws-unstage-fail-container/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM ubuntu

RUN apt-get update && apt-get -y install curl unzip && apt-get clean


RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" && \
unzip awscliv2.zip && ./aws/install && rm -rf aws*

ADD fake_aws /fake_aws

ENV PATH=/fake_aws/bin/:$PATH
9 changes: 9 additions & 0 deletions validation/test-aws-unstage-fail-container/fake_aws/bin/aws
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/bin/bash

if [[ "$*" == *".command."* ]] || [[ "$*" == *".exitcode"* ]]; then
/usr/local/bin/aws $@
else
>&2 echo "Producing a failure in aws $@"
exit 2
fi

16 changes: 16 additions & 0 deletions validation/test-aws-unstage-fail.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
process test {
input:
val i
output:
file("test${i}")
file("test_2_${i}")
script:
"""
dd if=/dev/urandom of=test${i} bs=1K count=90
dd if=/dev/urandom of=test_2_${i} bs=1K count=90
"""
}

workflow {
Channel.of(1) | test
}

0 comments on commit 3c8e602

Please sign in to comment.