Skip to content

Commit

Permalink
Preview support for virtual threads (nextflow-io#3871)
Browse files Browse the repository at this point in the history
This commit adds support for Java virtual threads. 

To enable this feature, the following variable should be declared 
in the launching environment: 

```
export NXF_ENABLE_VIRTUAL_THREADS
```

Note, this feature it's required the use of Java 19 or later. 

Signed-off-by: Paolo Di Tommaso <[email protected]>
  • Loading branch information
pditommaso authored and abhi18av committed Oct 28, 2023
1 parent ab725fc commit 7fcdbc7
Show file tree
Hide file tree
Showing 31 changed files with 404 additions and 76 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [11, 19]
java_version: [11, 17, 19]

steps:
- name: Environment
Expand Down Expand Up @@ -92,7 +92,7 @@ jobs:
any_changed: ${{ steps.changed-files.outputs.any_changed }}

test:
if: ${{ !contains(github.event.head_commit.message, '[ci fast]') && needs.build.outputs.any_changed == 'true' }}
if: ${{ !contains(github.event.head_commit.message, '[ci fast]') }}
needs: build
runs-on: ubuntu-latest
strategy:
Expand Down
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ allprojects {

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
languageVersion = JavaLanguageVersion.of(19)
}
}

Expand Down Expand Up @@ -139,6 +139,7 @@ allprojects {
if (JavaVersion.current() >= JavaVersion.VERSION_1_9) {
tasks.withType(Test) {
jvmArgs ([
'--enable-preview',
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
Expand All @@ -154,6 +155,7 @@ allprojects {
'--add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED',
'--add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED',
'--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/jdk.internal.vm=ALL-UNNAMED',
])
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
3 changes: 2 additions & 1 deletion launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ if [[ ! $JAVA_VER =~ $version_check ]]; then
exit 1
fi
JVM_ARGS+=" -Dfile.encoding=UTF-8 -XX:+TieredCompilation -XX:TieredStopAtLevel=1"
[[ $JAVA_VER =~ ^(9|10|11|12|13|14|15|16|17|18|19) ]] && JVM_ARGS+=" --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio.file.spi=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.fs=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.util.regex=ALL-UNNAMED"
[[ $JAVA_VER =~ ^(9|10|11|12|13|14|15|16|17|18|19|20) ]] && JVM_ARGS+=" --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio.file.spi=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.fs=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/jdk.internal.vm=ALL-UNNAMED --add-opens=java.base/java.util.regex=ALL-UNNAMED"
[[ $NXF_ENABLE_VIRTUAL_THREADS == 'true' ]] && JVM_ARGS+=" --enable-preview"

## flight recorded -- http://docs.oracle.com/javacomponents/jmc-5-4/jfr-runtime-guide/run.htm
##JVM_ARGS+=" -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=duration=60s,filename=myrecording.jfr"
Expand Down
5 changes: 4 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import nextflow.executor.ExecutorFactory
import nextflow.extension.CH
import nextflow.file.FileHelper
import nextflow.file.FilePorter
import nextflow.util.Threads
import nextflow.util.ThreadPoolManager
import nextflow.plugin.Plugins
import nextflow.processor.ErrorStrategy
Expand Down Expand Up @@ -511,7 +512,9 @@ class Session implements ISession {
registerSignalHandlers()

// create tasks executor
execService = Executors.newFixedThreadPool(poolSize)
execService = Threads.useVirtual()
? Executors.newVirtualThreadPerTaskExecutor()
: Executors.newFixedThreadPool(poolSize)

// signal start to trace observers
notifyFlowCreate()
Expand Down
3 changes: 3 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdInfo.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import nextflow.plugin.DefaultPlugins
import nextflow.plugin.Plugins
import nextflow.scm.AssetManager
import nextflow.util.MemoryUnit
import nextflow.util.Threads
import org.yaml.snakeyaml.Yaml
/**
* CLI sub-command INFO
Expand Down Expand Up @@ -217,6 +218,8 @@ class CmdInfo extends CmdBase {
final os = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean()
result << BLANK << "Process: ${ManagementFactory.getRuntimeMXBean().getName()} " << getLocalAddress() << NEWLINE
result << BLANK << "CPUs: ${os.availableProcessors} - Mem: ${totMem(os)} (${freeMem(os)}) - Swap: ${totSwap(os)} (${freeSwap(os)})"
if( Threads.useVirtual() )
result << " - Virtual threads ON"
}

if( level == 0 )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import java.nio.file.attribute.BasicFileAttributes

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.util.Threads

/**
* Watch the content of a directory for file system events
*
Expand Down Expand Up @@ -133,7 +135,7 @@ class DirWatcher implements DirListener {
this.watcher = base.getFileSystem().newWatchService()
this.watchedPaths = new HashMap<WatchKey,Path>()

thread = Thread.startDaemon {
thread = Threads.start {
try {
apply0()
}
Expand Down
80 changes: 57 additions & 23 deletions modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.Canonical
import groovy.transform.CompileStatic
Expand All @@ -40,6 +43,7 @@ import nextflow.extension.FilesEx
import nextflow.util.CacheHelper
import nextflow.util.Duration
import nextflow.util.ThreadPoolManager
import nextflow.util.Threads

/**
* Move foreign (ie. remote) files to the staging work area
Expand All @@ -55,6 +59,8 @@ class FilePorter {

static final private int MAX_RETRIES = 3

static final private int MAX_TRANSFERS = 50

static final private Duration POLL_TIMEOUT = Duration.of('2sec')

final Map<FileCopy,FileTransfer> stagingTransfers = new HashMap<>()
Expand All @@ -63,14 +69,26 @@ class FilePorter {

private Duration pollTimeout

private final Semaphore semaphore

final private Lock sync

final int maxRetries

final Session session
final int maxTransfers

final private Session session

FilePorter( Session session ) {
this.session = session
maxRetries = session.config.navigate('filePorter.maxRetries') as Integer ?: MAX_RETRIES
pollTimeout = session.config.navigate('filePorter.pollTimeout') as Duration ?: POLL_TIMEOUT
maxTransfers = session.config.navigate('filePorter.maxTransfers') as Integer ?: MAX_TRANSFERS
log.debug "File porter settings maxRetries=$maxRetries; maxTransfers=$maxTransfers; pollTimeout=$threadPool"
sync = new ReentrantLock()
// use a semaphore to cap the number of max transfer when using virtual thread
// when using platform threads the max transfers are limited by the thread pool itself
semaphore = Threads.useVirtual() ? new Semaphore(maxTransfers) : null
threadPool = new ThreadPoolManager('FileTransfer')
.withConfig(session.config)
.createAndRegisterShutdownCallback(session)
Expand All @@ -87,26 +105,26 @@ class FilePorter {
}

protected FileTransfer createFileTransfer(Path source, Path target) {
return new FileTransfer(source, target, maxRetries)
}

protected Future submitForExecution(FileTransfer transfer) {
threadPool.submit(transfer)
return new FileTransfer(source, target, maxRetries, semaphore)
}

protected FileTransfer getOrSubmit(FileCopy copy) {
synchronized (this) {
sync.lock()
try {
FileTransfer transfer = stagingTransfers.get(copy)
if( transfer == null ) {
transfer = createFileTransfer(copy.source, copy.target)
transfer.result = submitForExecution(transfer)
transfer.result = threadPool.submit(transfer)
stagingTransfers.put(copy, transfer)
}
// increment the ref count
transfer.refCount.incrementAndGet()

return transfer
}
finally {
sync.unlock()
}
}

protected void decOrRemove(FileTransfer action) {
Expand Down Expand Up @@ -258,12 +276,14 @@ class FilePorter {
*/
final int maxRetries

final private Semaphore semaphore
final AtomicInteger refCount
volatile Future result
private String message
private int debugDelay

FileTransfer(Path foreignPath, Path stagePath, int maxRetries=0) {
FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore) {
this.semaphore = semaphore
this.source = foreignPath
this.target = stagePath
this.maxRetries = maxRetries
Expand All @@ -274,7 +294,15 @@ class FilePorter {

@Override
void run() throws Exception {
stageForeignFile(source, target)
if( semaphore )
semaphore.acquire()
try {
stageForeignFile(source, target)
}
finally {
if( semaphore )
semaphore.release()
}
}

/**
Expand Down Expand Up @@ -331,19 +359,25 @@ class FilePorter {
}
}

synchronized protected FileCopy getCachePathFor(Path sourcePath, Path stageDir) {
final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing
int i=0
while( true ) {
final uniq = List.of(sourcePath, dirPath, i++)
final hash = CacheHelper.hasher(uniq).hash().toString()
final targetPath = getCacheDir0(stageDir, hash).resolve(sourcePath.getName())
final result = new FileCopy(sourcePath, targetPath)
if( stagingTransfers.containsKey(result) )
return result
final exist = targetPath.exists()
if( !exist || checkPathIntegrity(sourcePath, targetPath) )
return result
protected FileCopy getCachePathFor(Path sourcePath, Path stageDir) {
sync.lock()
try {
final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing
int i=0
while( true ) {
final uniq = List.of(sourcePath, dirPath, i++)
final hash = CacheHelper.hasher(uniq).hash().toString()
final targetPath = getCacheDir0(stageDir, hash).resolve(sourcePath.getName())
final result = new FileCopy(sourcePath, targetPath)
if( stagingTransfers.containsKey(result) )
return result
final exist = targetPath.exists()
if( !exist || checkPathIntegrity(sourcePath, targetPath) )
return result
}
}
finally {
sync.unlock()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import nextflow.Global
import nextflow.Session
import nextflow.extension.CH
import nextflow.util.CustomThreadFactory
import nextflow.util.Threads
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
Expand Down Expand Up @@ -186,7 +187,10 @@ class PathVisitor {
@Memoized
@PackageScope
static ExecutorService createExecutor(Session session) {
final result = Executors.newCachedThreadPool(new CustomThreadFactory('PathVisitor'))
final factory = new CustomThreadFactory('PathVisitor')
final result = Threads.useVirtual()
? Executors.newThreadPerTaskExecutor(factory)
: Executors.newCachedThreadPool(factory)
Global.onCleanup((it) -> result.shutdown())
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import nextflow.Session
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.Threads
import nextflow.util.Throttle
/**
* Monitors the queued tasks waiting for their termination
Expand Down Expand Up @@ -287,7 +288,7 @@ class TaskPollingMonitor implements TaskMonitor {
session.onShutdown { this.cleanup() }

// launch the thread polling the queue
Thread.start('Task monitor') {
Threads.start('Task monitor') {
try {
pollLoop()
}
Expand All @@ -298,7 +299,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

// launch daemon that submits tasks for execution
Thread.startDaemon('Task submitter', this.&submitLoop)
Threads.start('Task submitter', this.&submitLoop)

return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import jline.TerminalFactory
import nextflow.Session
import nextflow.processor.TaskHandler
import nextflow.util.Duration
import nextflow.util.Threads
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import static nextflow.util.LoggerHelper.isHashLogPrefix
Expand Down Expand Up @@ -405,7 +406,7 @@ class AnsiLogObserver implements TraceObserver {
this.statsObserver = session.statsObserver
this.startTimestamp = System.currentTimeMillis()
AnsiConsole.systemInstall()
this.renderer = Thread.start('AnsiLogObserver', this.&render0)
this.renderer = Threads.start('AnsiLogObserver', this.&render0)
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/

package nextflow.util

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import groovyx.gpars.scheduler.Pool
import groovyx.gpars.scheduler.ResizeablePool
import groovyx.gpars.util.PoolFactory
Expand All @@ -27,6 +29,7 @@ import groovyx.gpars.util.PoolFactory
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
class CustomPoolFactory implements PoolFactory {

Expand All @@ -40,7 +43,8 @@ class CustomPoolFactory implements PoolFactory {
@Override
synchronized Pool createPool() {

def type = property(NXF_POOL_TYPE, 'default')
final defType = Threads.useVirtual() ? 'virtual' : 'default'
final type = property(NXF_POOL_TYPE, defType)
switch (type) {
case 'default':
int poolSize = Runtime.runtime.availableProcessors() +1
Expand All @@ -66,6 +70,10 @@ class CustomPoolFactory implements PoolFactory {
int size = property(NXF_MAX_THREADS, cpus+1) as int
return CustomThreadPool.unboundedPool(size)

case 'virtual':
log.debug "Creating virtual thread pool"
return CustomThreadPool.virtualPool()

default:
throw new IllegalAccessException("Unknown thread pool type: `$type`")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ static Pool synchronousPool(int maxThreads) {
);
}

static Pool virtualPool() {
return new VirtualThreadPool( newDaemonThreadFactory() );
}

/**
* Creates the pool with specified executor.
*/
Expand Down
Loading

0 comments on commit 7fcdbc7

Please sign in to comment.