Skip to content

Commit

Permalink
Fix FilePorter concurrency issue (#3511)
Browse files Browse the repository at this point in the history
Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso committed Jan 14, 2023
1 parent e03d9ca commit 0f75715
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ abstract class Executor {

@Memoized
Path getStageDir() {
return getWorkDir().resolve('stage')
return getWorkDir().resolve("stage-${getSession().uniqueId}")
}

boolean isForeignFile(Path path) {
Expand Down
73 changes: 51 additions & 22 deletions modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger

import groovy.transform.Canonical
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.ToString
Expand Down Expand Up @@ -57,7 +58,7 @@ class FilePorter {

static final private Duration POLL_TIMEOUT = Duration.of('2sec')

final Map<Path,FileTransfer> stagingTransfers = new HashMap<>()
final Map<FileCopy,FileTransfer> stagingTransfers = new HashMap<>()

private ExecutorService threadPool

Expand All @@ -76,32 +77,31 @@ class FilePorter {
.createAndRegisterShutdownCallback(session)
}

Batch newBatch(Path stageDir) { new Batch(stageDir) }
Batch newBatch(Path stageDir) { new Batch(this, stageDir) }

void transfer(Batch batch) {
if( batch.size() ) {
log.trace "Stage foreign files: $batch"
submitStagingActions(batch.foreignPaths, batch.stageDir)
submitStagingActions(batch.foreignPaths)
log.trace "Stage foreign files completed: $batch"
}
}

protected FileTransfer createFileTransfer(Path source, Path stageDir) {
final stagePath = getCachePathFor(source,stageDir)
return new FileTransfer(source, stagePath, maxRetries)
protected FileTransfer createFileTransfer(Path source, Path target) {
return new FileTransfer(source, target, maxRetries)
}

protected Future submitForExecution(FileTransfer transfer) {
threadPool.submit(transfer)
}

protected FileTransfer getOrSubmit(Path source, Path stageDir) {
synchronized (stagingTransfers) {
FileTransfer transfer = stagingTransfers.get(source)
protected FileTransfer getOrSubmit(FileCopy copy) {
synchronized (this) {
FileTransfer transfer = stagingTransfers.get(copy)
if( transfer == null ) {
transfer = createFileTransfer(source, stageDir)
transfer = createFileTransfer(copy.source, copy.target)
transfer.result = submitForExecution(transfer)
stagingTransfers.put(source, transfer)
stagingTransfers.put(copy, transfer)
}
// increment the ref count
transfer.refCount.incrementAndGet()
Expand All @@ -117,14 +117,23 @@ class FilePorter {
}
}

protected List<FileTransfer> submitStagingActions(List<Path> paths, Path stageDir) {
/**
* Stages i.e. copies the file from the remote source to a local staging path
* using a thread pool
* @param copies
* A map where each key-value pair represent a file to be copied.
* The key element is the file source path. The value element represent the target path
* @return
* A list of {@link FileTransfer} operations
*/
protected List<FileTransfer> submitStagingActions(List<FileCopy> copies) {

final result = new ArrayList<FileTransfer>(paths.size())
for ( def file : paths ) {
final result = new ArrayList<FileTransfer>(copies.size())
for ( FileCopy it : copies ) {
// here's the magic: use a Map to check if a future for the staging action already exist
// - if exists take it to wait to the submit action termination
// - otherwise create a new future submitting the action operation
result << getOrSubmit(file,stageDir)
result << getOrSubmit(it)
}

// wait for staging actions completion
Expand Down Expand Up @@ -156,16 +165,27 @@ class FilePorter {
return result
}

/**
* Model a file stage requirement
*/
@Canonical
static class FileCopy {
final Path source
final Path target
}

/**
* Models a batch (collection) of foreign files that need to be transferred to
* the process staging are co-located with the work directory
*/
static class Batch {

final private FilePorter owner

/**
* Holds the list of foreign files to be transferred
*/
private List<Path> foreignPaths = new ArrayList<>()
private List<FileCopy> foreignPaths = new ArrayList<>(100)

/**
* The *local* directory where against where files need to be staged.
Expand All @@ -178,7 +198,8 @@ class FilePorter {
*/
private String stageScheme

Batch(Path stageDir) {
Batch(FilePorter owner, Path stageDir) {
this.owner = owner
this.stageDir = stageDir
this.stageScheme = stageDir.scheme
}
Expand All @@ -194,8 +215,9 @@ class FilePorter {
*/
Path addToForeign(Path path) {
// copy the path with a thread pool
foreignPaths << path
return getCachePathFor(path, stageDir)
final copy = owner.getCachePathFor(path, stageDir)
foreignPaths << copy
return copy.target
}

/**
Expand All @@ -208,6 +230,10 @@ class FilePorter {
*/
boolean asBoolean() { foreignPaths.size()>0 }

@Override
String toString() {
return "FilePorter.Batch[stageDir=${stageDir.toUriString()}; foreignPaths=${foreignPaths}]"
}
}

/**
Expand Down Expand Up @@ -306,16 +332,19 @@ class FilePorter {
}
}

static protected Path getCachePathFor(Path sourcePath, Path stageDir) {
synchronized protected FileCopy getCachePathFor(Path sourcePath, Path stageDir) {
final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing
int i=0
while( true ) {
final uniq = [sourcePath, dirPath, i++]
final uniq = List.of(sourcePath, dirPath, i++)
final hash = CacheHelper.hasher(uniq).hash().toString()
final targetPath = getCacheDir0(stageDir, hash).resolve(sourcePath.getName())
final result = new FileCopy(sourcePath, targetPath)
if( stagingTransfers.containsKey(result) )
return result
final exist = targetPath.exists()
if( !exist || checkPathIntegrity(sourcePath, targetPath) )
return targetPath
return result
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package nextflow.executor

import java.nio.file.Paths

import nextflow.Session
import spock.lang.Specification
import test.TestHelper

Expand All @@ -14,17 +15,24 @@ class ExecutorTest extends Specification {

def 'should return stage dir' () {
given:
def uid = UUID.randomUUID()
def session = Mock(Session) { getUniqueId()>>uid }
and:
def WORK_DIR = Paths.get('/the/work/dir')
def executor = Spy(Executor)

when:
executor.getWorkDir() >> WORK_DIR
executor.getSession() >> session
then:
executor.getStageDir() == WORK_DIR.resolve('stage')
executor.getStageDir() == WORK_DIR.resolve("stage-$uid")
}

def 'should check foreign file' () {
given:
def uid = UUID.randomUUID()
def session = Mock(Session) { getUniqueId()>>uid }
and:
def local = Paths.get('/work/dir')
def foreign1 = TestHelper.createInMemTempFile('hola.txt', 'hola mundo!')
def foreign2 = TestHelper.createInMemTempFile('ciao.txt', 'ciao mondo!')
Expand All @@ -33,6 +41,7 @@ class ExecutorTest extends Specification {

when:
executor.getWorkDir() >> local
executor.getSession() >> session
then:
!executor.isForeignFile(local.resolve('foo'))
executor.isForeignFile(foreign1)
Expand Down
100 changes: 56 additions & 44 deletions modules/nextflow/src/test/groovy/nextflow/file/FilePorterTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package nextflow.file
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.Paths

import groovy.util.logging.Slf4j
import nextflow.Session
Expand Down Expand Up @@ -155,17 +154,20 @@ class FilePorterTest extends Specification {
def folder = Files.createTempDirectory('test')
def session = new Session(workDir: folder)
FilePorter porter = Spy(FilePorter, constructorArgs:[session])
def files = [ foreign1, foreign2 ]
def files = [
new FilePorter.FileCopy(foreign1, folder.resolve(foreign1.name)),
new FilePorter.FileCopy(foreign2, folder.resolve(foreign2.name)) ]

when:
def result = porter.submitStagingActions(files, folder)
def result = porter.submitStagingActions(files)
then:
result.size() == 2
result[0].result.done
result[1].result.done

when:
porter.submitStagingActions([Paths.get('/missing/file')], folder)
def missing = new FilePorter.FileCopy(Path.of('/missing/file'), folder.resolve('file'))
porter.submitStagingActions(List.of(missing))
then:
thrown(ProcessStageException)

Expand All @@ -175,7 +177,6 @@ class FilePorterTest extends Specification {




def 'should check batch size and truth' () {
given:
def STAGE = Files.createTempDirectory('test')
Expand Down Expand Up @@ -213,27 +214,46 @@ class FilePorterTest extends Specification {
def 'should return stage path' () {

given:
def session = Mock(Session) { getConfig() >> [:] }
def porter = new FilePorter(session)
and:
def STAGE = Files.createTempDirectory('test')
def FTP_FILE1 = 'ftp://host.com/file1.txt' as Path
def FTP_FILE2 = 'ftp://host.com/file2.txt' as Path
def REMOTE_FILE1 = TestHelper.createInMemTempFile('file1.txt', 'foo')
def REMOTE_FILE2 = TestHelper.createInMemTempFile('file2.txt', 'bar')

when:
def stage1 = FilePorter.getCachePathFor(FTP_FILE1, STAGE)
def stage1 = porter.getCachePathFor(REMOTE_FILE1, STAGE)
then:
stage1.toString().startsWith( STAGE.toString() )
stage1.target.toString().startsWith( STAGE.toString() )

when:
def stage2 = FilePorter.getCachePathFor(FTP_FILE2, STAGE)
def stage2 = porter.getCachePathFor(REMOTE_FILE2, STAGE)
then:
stage2.toString().startsWith( STAGE.toString() )
stage2.target.toString().startsWith( STAGE.toString() )
stage1 != stage2

// copy the remote files and repeat the test
// it should return the same paths
when:
STAGE.resolve('foo.txt').text = 'ciao' // <-- add a file to alter the content of the dir
Files.copy(REMOTE_FILE1, stage1.target)
Files.copy(REMOTE_FILE2, stage2.target)
and:
def newStage1 = FilePorter.getCachePathFor(FTP_FILE1, STAGE)
def newStage1 = porter.getCachePathFor(REMOTE_FILE1, STAGE)
then:
newStage1.target.toString().startsWith( STAGE.toString() )
stage1 == newStage1
and:
stage1.target.exists()
newStage1.target.exists()

when:
stage1.target.text = 'some other content' // <-- modify the target file
and:
newStage1 = porter.getCachePathFor(REMOTE_FILE1, STAGE)
then:
stage1 != newStage1
stage1.target.exists()
!newStage1.target.exists()

cleanup:
STAGE?.deleteDir()
Expand All @@ -247,7 +267,7 @@ class FilePorterTest extends Specification {
def folder = Files.createTempDirectory('test')
def local = folder.resolve('hola.text'); local.text = 'Hola'
and:
def foreign = TestHelper.createInMemTempDir()
def foreign = TestHelper.createInMemTempDir().resolve(local.name)

and:
def porter = new FilePorter(sess)
Expand All @@ -256,36 +276,7 @@ class FilePorterTest extends Specification {
def transfer1 = porter.createFileTransfer(local, foreign)
then:
transfer1.source == local
transfer1.target.startsWith(foreign)

// nothing change then the target path will be the same
when:
def transfer2 = porter.createFileTransfer(local, foreign)
then:
transfer2.source == local
transfer2.target == transfer1.target

when:
// copy the source to the expected target
FileHelper.copyPath(local, transfer1.target)
and:
// the transfer path should not be modified
def transfer3 = porter.createFileTransfer(local, foreign)
then:
transfer3.source == local
transfer3.target == transfer1.target

when:
// modify the file in the expected target path
transfer1.target.text = 'Ciao moundo'
and:
// the transfer path should not be modified
def transfer4 = porter.createFileTransfer(local, foreign)
then:
transfer4.source == local
transfer4.target != transfer1.target // <-- it's changed
and:
transfer4.target.startsWith(foreign) // <-- it's still in the foreign path
transfer1.target == foreign

cleanup:
folder?.deleteDir()
Expand Down Expand Up @@ -370,4 +361,25 @@ class FilePorterTest extends Specification {
cleanup:
local1?.deleteDir()
}

def 'should check equals and hashcode of filecopy' () {
given:
def copy1 = new FilePorter.FileCopy(Path.of('/some/path/foo'), Path.of('/other/path/foo'))
def copy2 = new FilePorter.FileCopy(Path.of('/some/path/foo'), Path.of('/other/path/foo'))
def copy3 = new FilePorter.FileCopy(Path.of('/some/path/bar'), Path.of('/other/path/bar'))
def copy4 = new FilePorter.FileCopy(Path.of('/some/path/bar'), Path.of('/other/path/foo'))

expect:
copy1 == copy2
copy1 != copy3
copy1 != copy4
copy3 != copy4

and:
copy1.hashCode() == copy2.hashCode()
copy1.hashCode() != copy3.hashCode()
copy1.hashCode() != copy4.hashCode()
copy3.hashCode() != copy4.hashCode()

}
}
Loading

0 comments on commit 0f75715

Please sign in to comment.