Skip to content

Commit

Permalink
Fix stage retry on corrupted HTTP downloads (#5275)
Browse files Browse the repository at this point in the history

Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored Sep 3, 2024
1 parent 4a6b54a commit bf0cd32
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 4 deletions.
14 changes: 13 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,15 @@ class FilePorter {
}
}

protected void cleanup(Path path) {
try {
FileHelper.deletePath(path)
}
catch (IOException e) {
log.warn "Unable to cleanup file path: ${path.toUriString()} - cause: ${e.message}"
}
}

/**
* Download a foreign file (ie. remote) storing it the current pipeline execution directory
*
Expand All @@ -319,8 +328,11 @@ class FilePorter {
return stageForeignFile0(filePath, stagePath)
}
catch( IOException e ) {
// remove the target file that could be have partially downloaded
cleanup(stagePath)
// check if a stage/download retry is allowed
if( count++ < maxRetries && e !instanceof NoSuchFileException && e !instanceof InterruptedIOException && !Thread.currentThread().isInterrupted() ) {
def message = "Unable to stage foreign file: ${filePath.toUriString()} (try ${count}) -- Cause: $e.message"
def message = "Unable to stage foreign file: ${filePath.toUriString()} (try ${count} of ${maxRetries}) -- Cause: $e.message"
log.isDebugEnabled() ? log.warn(message, e) : log.warn(message)

sleep (10 + RND.nextInt(300))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file.http

import groovy.transform.CompileStatic

/**
* Implements a {@link FilterInputStream} that checks the expected length of bytes have been
* read when closing the stream or throws an error otherwise
*
* @author Paolo Di Tommaso <[email protected]>
*/
@CompileStatic
class FixedInputStream extends FilterInputStream {

private final long length
private long bytesRead

FixedInputStream(InputStream inputStream, long len) {
super(inputStream)
this.length = len
}

@Override
int read() throws IOException {
final result = super.read()
if( result!=-1 )
bytesRead++
return result
}

@Override
int read(byte[] b, int off, int len) throws IOException {
final result = super.read(b, off, len)
if( result!=-1 )
bytesRead += result
return result
}

@Override
long skip(long n) throws IOException {
long skipped = super.skip(n)
bytesRead += skipped
return skipped
}

@Override
int available() throws IOException {
super.available()
}

@Override
void close() throws IOException {
if( bytesRead != length )
throw new IOException("Read data length does not match expected size - bytes read: ${bytesRead}; expected: ${length}")
super.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,11 @@ abstract class XFileSystemProvider extends FileSystemProvider {
}

final conn = toConnection(path)
final stream = new BufferedInputStream(conn.getInputStream())
final length = conn.getContentLengthLong()
final target = length>0
? new FixedInputStream(conn.getInputStream(),length)
: conn.getInputStream()
final stream = new BufferedInputStream(target)

new SeekableByteChannel() {

Expand Down Expand Up @@ -346,7 +350,11 @@ abstract class XFileSystemProvider extends FileSystemProvider {
}
}

return toConnection(path).getInputStream()
final conn = toConnection(path)
final length = conn.getContentLengthLong()
return length>0
? new FixedInputStream(conn.getInputStream(), length)
: conn.getInputStream()
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.file.http

import spock.lang.Specification

/**
*
* @author Paolo Di Tommaso <[email protected]>
*/
class FixedInputStreamTest extends Specification {

def 'should read byte by byte' () {
given:
def bytes = "Hello world". bytes
def stream = new FixedInputStream(new ByteArrayInputStream(bytes), bytes.length)

when:
def ch
def result = new StringBuilder()
while( (ch=stream.read())!=-1 )
result.append(ch as char)
and:
stream.close()
then:
noExceptionThrown()
result.toString() == 'Hello world'
}

def 'should read byte buffer' () {
given:
def bytes = "Hello world". bytes
def stream = new FixedInputStream(new ByteArrayInputStream(bytes), bytes.length)

when:
def buffer = new byte[5]
def result = new StringBuilder()
def c
while( (c=stream.read(buffer))!=-1 ) {
for( int i=0; i<c; i++ )
result.append(buffer[i] as char)
}
and:
stream.close()
then:
noExceptionThrown()
result.toString() == 'Hello world'
}

def 'should read all bytes' () {
given:
def bytes = "Hello world". bytes
def stream = new FixedInputStream(new ByteArrayInputStream(bytes), bytes.length)

when:
def result = stream.readAllBytes()
and:
stream.close()
then:
noExceptionThrown()
and:
new String(result) == "Hello world"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class XFileSystemProviderTest extends Specification {

def "should return input stream from path"() {
given:
def DATA = 'Hello world'
def fsp = Spy(new HttpFileSystemProvider())
def path = fsp.getPath(new URI('http://host.com/index.html?query=123'))
def connection = Mock(URLConnection)
Expand All @@ -54,7 +55,8 @@ class XFileSystemProviderTest extends Specification {
return connection
}
and:
connection.getInputStream() >> new ByteArrayInputStream('Hello world'.bytes)
connection.getInputStream() >> new ByteArrayInputStream(DATA.bytes)
connection.getContentLengthLong() >> DATA.size()
and:
stream.text == 'Hello world'
}
Expand Down

0 comments on commit bf0cd32

Please sign in to comment.