Skip to content

Commit

Permalink
Add retry policy in Channel.fromSRA factory (#5389)
Browse files Browse the repository at this point in the history
Signed-off-by: jorgee <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Christopher Hakkaart <[email protected]>
  • Loading branch information
3 people authored Oct 11, 2024
1 parent 7bbba67 commit fb1c8b2
Show file tree
Hide file tree
Showing 6 changed files with 251 additions and 4 deletions.
19 changes: 19 additions & 0 deletions docs/reference/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ Available options:
`protocol`
: Allow choosing the protocol for the resulting remote URLs. Available choices: `ftp`, `http`, `https` (default: `ftp`).

`retryPolicy`
: Set a retry policy in case the SRA request fails with a retriable error.
The retry policy is set as a Map specifying the different policy properties.

Available retry policy properties:

| Property | Description | Default |
| ------------- |-------------------------------------------------| ------- |
| `delay` | Delay when retrying failed SRA requests. | `500ms` |
| `jitter` | Jitter value when retrying failed SRA requests. | `0.25` |
| `maxAttempts` | Max attempts when retrying failed SRA requests. | `3` |
| `maxDelay` | Max delay when retrying failed SRA requests. | `30s` |

The following code snippet shows an example for using the `Channel.fromSRA` factory method with a custom `retryPolicy`.

```groovy
channel.fromSRA(ids, retryPolicy: [delay: '250ms', maxAttempts: 5])
```

(channel-interval)=

## interval
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

package nextflow.datasource

import dev.failsafe.Failsafe
import dev.failsafe.RetryPolicy
import dev.failsafe.event.EventListener
import dev.failsafe.event.ExecutionAttemptedEvent
import dev.failsafe.function.CheckedSupplier

import java.nio.file.NoSuchFileException
import java.nio.file.Path

Expand All @@ -34,6 +40,11 @@ import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.util.CacheHelper
import nextflow.util.Duration

import java.time.temporal.ChronoUnit
import java.util.function.Predicate
import java.util.regex.Pattern

/**
* Query NCBI SRA database and returns the retrieved FASTQs to the specified
* target channel. Inspired to SRA-Explorer by Phil Ewels -- https://ewels.github.io/sra-explorer/
Expand All @@ -43,7 +54,9 @@ import nextflow.util.Duration
@Slf4j
class SraExplorer {

static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https']]
static public Map PARAMS = [apiKey:[String,GString], cache: Boolean, max: Integer, protocol: ['ftp','http','https'], retryPolicy: Map]
final static public List<Integer> RETRY_CODES = List.of(408, 429, 500, 502, 503, 504)
final static private Pattern ERROR_PATTERN = ~/Server returned HTTP response code: (\d+) for URL.*/

@ToString
static class SearchRecord {
Expand All @@ -67,6 +80,7 @@ class SraExplorer {
private List<String> missing = new ArrayList<>()
private Path cacheFolder
private String protocol = 'ftp'
private SraRetryConfig retryConfig = new SraRetryConfig()

String apiKey
boolean useCache = true
Expand Down Expand Up @@ -94,6 +108,8 @@ class SraExplorer {
maxResults = opts.max as int
if( opts.protocol )
protocol = opts.protocol as String
if( opts.retryPolicy )
retryConfig = new SraRetryConfig(opts.retryPolicy as Map)
}

DataflowWriteChannel apply() {
Expand Down Expand Up @@ -181,7 +197,7 @@ class SraExplorer {

protected Map makeDataRequest(String url) {
log.debug "SRA data request url=$url"
final text = new URL(url).getText()
final text = runWithRetry(()->getTextFormUrl(url))

log.trace "SRA data result:\n${pretty(text)?.indent()}"
def response = jsonSlurper.parseText(text)
Expand Down Expand Up @@ -220,7 +236,7 @@ class SraExplorer {

protected SearchRecord makeSearch(String url) {
log.debug "SRA search url=$url"
final text = new URL(url).getText()
final text = runWithRetry(()-> getTextFormUrl(url))

log.trace "SRA search result:\n${pretty(text)?.indent()}"
final response = jsonSlurper.parseText(text)
Expand Down Expand Up @@ -265,10 +281,14 @@ class SraExplorer {
return result
}

protected static String getTextFormUrl(String url) {
new URI(url).toURL().getText()
}

protected String readRunUrl(String acc) {
final url = "https://www.ebi.ac.uk/ena/portal/api/filereport?result=read_run&fields=fastq_ftp&accession=$acc"
log.debug "SRA fetch ftp fastq url=$url"
String result = new URL(url).text.trim()
String result = runWithRetry(() -> getTextFormUrl(url)).trim()
log.trace "SRA fetch ftp fastq url result:\n${result?.indent()}"

if( result.indexOf('\n')==-1 ) {
Expand Down Expand Up @@ -330,6 +350,66 @@ class SraExplorer {
return url
}

/**
* Creates a retry policy using the SRA retry configuration
*
* @param cond A predicate that determines when a retry should be triggered
* @return The {@link dev.failsafe.RetryPolicy} instance
*/
protected <T> RetryPolicy<T> retryPolicy(Predicate<? extends Throwable> cond) {
final EventListener<ExecutionAttemptedEvent> listener = new EventListener<ExecutionAttemptedEvent>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
return RetryPolicy.<T>builder()
.handleIf(cond)
.withBackoff(retryConfig.delay.toMillis(), retryConfig.maxDelay.toMillis(), ChronoUnit.MILLIS)
.withMaxAttempts(retryConfig.maxAttempts)
.withJitter(retryConfig.jitter)
.onRetry(listener)
.build()
}

/**
* Carry out the invocation of the specified action using a retry policy
* when {@link java.io.IOException} is returned containing an error code.
*
* @param action A {@link dev.failsafe.function.CheckedSupplier} instance modeling the action to be performed in a safe manner
* @return The result of the supplied action
*/
protected <T> T runWithRetry(CheckedSupplier<T> action) {
// define listener
final listener = new EventListener<ExecutionAttemptedEvent>() {
@Override
void accept(ExecutionAttemptedEvent event) throws Throwable {
log.debug("Retryable response error - attempt: ${event.attemptCount}; reason: ${event.lastFailure.message}")
}
}
// define the retry condition
final cond = new Predicate<? extends Throwable>() {
@Override
boolean test(Throwable t) {
if( t instanceof IOException && containsErrorCodes(t.message, RETRY_CODES))
return true
if(t.cause instanceof IOException && containsErrorCodes(t.cause.message, RETRY_CODES))
return true
return false
}
}
// create the retry policy
def policy = retryPolicy(cond)
// apply the action with
return Failsafe.with(policy).get(action)
}

static boolean containsErrorCodes(String message, List<Integer> codes){
def matcher = (message =~ ERROR_PATTERN)
def httpCode = matcher ? matcher[0][1] as Integer : null
return httpCode != null && codes.contains(httpCode)
}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.datasource

import groovy.transform.CompileStatic
import groovy.transform.EqualsAndHashCode
import groovy.transform.ToString
import nextflow.util.Duration
/**
* Models retry policy configuration for Sra queries
*
* @author Jorge Ejarque <[email protected]>
*/
@ToString(includePackage = false, includeNames = true)
@EqualsAndHashCode
@CompileStatic
class SraRetryConfig {
Duration delay = Duration.of('500ms')
Duration maxDelay = Duration.of('30s')
int maxAttempts = 3
double jitter = 0.25

SraRetryConfig() {
this(Collections.emptyMap())
}

SraRetryConfig(Map config) {
if( config.delay )
delay = config.delay as Duration
if( config.maxDelay )
maxDelay = config.maxDelay as Duration
if( config.maxAttempts )
maxAttempts = config.maxAttempts as int
if( config.jitter )
jitter = config.jitter as double
}
}
19 changes: 19 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/ChannelTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -990,4 +990,23 @@ class ChannelTest extends Specification {

}

def 'should not fail when setting SRA correct properties' () {
given:
def id = 'SRR389222'
def retryPolicy = [maxAttempts: 2]

when:
def result = Channel.fromSRA(id, apiKey: '1234', retryPolicy: retryPolicy, cache: false, max: 10, protocol: 'http')
then:
result != null

}

def 'should fail when SRA incorrect property' () {
when:
def result = Channel.fromSRA('SRR389222', incorrectKey: '1234')
then:
thrown(IllegalArgumentException)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package nextflow.datasource

import dev.failsafe.FailsafeException

import java.nio.file.Files
import java.nio.file.Path

Expand Down Expand Up @@ -242,4 +244,35 @@ class SraExplorerTest extends Specification {
result == '1bc'
}

def 'should detect retry errors' () {
given:
def ex = new IOException("Server returned HTTP response code: " + ERROR +" for URL: https://dummy.url")

expect:
SraExplorer.containsErrorCodes(ex.getLocalizedMessage(), SraExplorer.RETRY_CODES) == EXPECTED

where:
ERROR | EXPECTED
'404' | false
'429' | true

}
def 'should retry on errors' () {
given:
def ex = new IOException("Server returned HTTP response code: 429 for URL: https://dummy.url")
def slurper = new SraExplorer(null, [retryPolicy: [maxAttempts: 2]])
def retries = 0

when:
slurper.runWithRetry{
retries ++
throw ex
}

then:
def e = thrown(FailsafeException)
e.cause.message == ex.message
retries == 2
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2013-2024, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package nextflow.datasource

import nextflow.util.Duration
import spock.lang.Specification

/**
*
* @author Jorge Ejarque <[email protected]>
*/
class SraRetryConfigTest extends Specification {

def 'should create retry config'() {

expect:
new SraRetryConfig().delay == Duration.of('500ms')
new SraRetryConfig().maxDelay == Duration.of('30s')
new SraRetryConfig().maxAttempts == 3
new SraRetryConfig().jitter == 0.25d

and:
new SraRetryConfig([maxAttempts: 20]).maxAttempts == 20
new SraRetryConfig([delay: '1s']).delay == Duration.of('1s')
new SraRetryConfig([maxDelay: '1m']).maxDelay == Duration.of('1m')
new SraRetryConfig([jitter: '0.5']).jitter == 0.5d

}
}

0 comments on commit fb1c8b2

Please sign in to comment.