Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preview support for virtual threads #3871

Merged
merged 29 commits into from
Apr 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
64c6e3e
Preview support for virtual threads
pditommaso Apr 14, 2023
ae811cc
Add support for virtual threads
pditommaso Apr 14, 2023
3724404
Fix test
pditommaso Apr 14, 2023
1febcec
Merge branch 'master' into virtual-threads
pditommaso Apr 15, 2023
d06f1a8
Improve support for virtual threads
pditommaso Apr 15, 2023
b4a35d7
Update build jvm
pditommaso Apr 15, 2023
45968e7
Merge branch 'master' into virtual-threads
pditommaso Apr 15, 2023
7ebe04c
Fix build
pditommaso Apr 15, 2023
7bc8d06
More test
pditommaso Apr 15, 2023
db71658
Fix tests
pditommaso Apr 15, 2023
bfbab44
Improve virtual thread pool
pditommaso Apr 15, 2023
558568d
Fix tests
pditommaso Apr 15, 2023
d3b9c7f
Add Threads helper class
pditommaso Apr 15, 2023
10e5b29
Fix threads helper
pditommaso Apr 15, 2023
8ec1fd4
Add tests
pditommaso Apr 16, 2023
2663dc3
Improve handling default out param
pditommaso Apr 16, 2023
8cba991
Add thread count
pditommaso Apr 16, 2023
258dfdf
Improve virtual threads creation
pditommaso Apr 16, 2023
7e64495
Fix run on java 11
pditommaso Apr 16, 2023
11e6c91
Use lock instead of synchronize
pditommaso Apr 17, 2023
0587622
Remove warning message
pditommaso Apr 17, 2023
9272bc6
Fix tests
pditommaso Apr 17, 2023
8b0748e
Merge branch 'master' into virtual-threads
pditommaso Apr 23, 2023
d69e3e0
Merge master at 62dfc482 [ci fast]
pditommaso Apr 23, 2023
8d1654d
Merge branch 'master' into virtual-threads
pditommaso Apr 27, 2023
27df16c
Minor changes
pditommaso Apr 27, 2023
4e5a427
Update modules/nextflow/src/main/groovy/nextflow/util/VirtualThreadPo…
pditommaso Apr 27, 2023
3cd3498
Update modules/nextflow/src/test/groovy/nextflow/util/ThreadPoolHelpe…
pditommaso Apr 27, 2023
79717cd
Update modules/nextflow/src/main/groovy/nextflow/util/VirtualThreadPo…
pditommaso Apr 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
strategy:
fail-fast: false
matrix:
java_version: [11, 19]
java_version: [11, 17, 19]

steps:
- name: Environment
Expand Down Expand Up @@ -92,7 +92,7 @@ jobs:
any_changed: ${{ steps.changed-files.outputs.any_changed }}

test:
if: ${{ !contains(github.event.head_commit.message, '[ci fast]') && needs.build.outputs.any_changed == 'true' }}
if: ${{ !contains(github.event.head_commit.message, '[ci fast]') }}
needs: build
runs-on: ubuntu-latest
strategy:
Expand Down
4 changes: 3 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ allprojects {

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
languageVersion = JavaLanguageVersion.of(19)
}
}

Expand Down Expand Up @@ -139,6 +139,7 @@ allprojects {
if (JavaVersion.current() >= JavaVersion.VERSION_1_9) {
tasks.withType(Test) {
jvmArgs ([
'--enable-preview',
'--add-opens=java.base/java.lang=ALL-UNNAMED',
'--add-opens=java.base/java.io=ALL-UNNAMED',
'--add-opens=java.base/java.nio=ALL-UNNAMED',
Expand All @@ -154,6 +155,7 @@ allprojects {
'--add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED',
'--add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED',
'--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED',
'--add-opens=java.base/jdk.internal.vm=ALL-UNNAMED',
])
}
}
Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
3 changes: 2 additions & 1 deletion launch.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ if [[ ! $JAVA_VER =~ $version_check ]]; then
exit 1
fi
JVM_ARGS+=" -Dfile.encoding=UTF-8 -XX:+TieredCompilation -XX:TieredStopAtLevel=1"
[[ $JAVA_VER =~ ^(9|10|11|12|13|14|15|16|17|18|19) ]] && JVM_ARGS+=" --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio.file.spi=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.fs=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/java.util.regex=ALL-UNNAMED"
[[ $JAVA_VER =~ ^(9|10|11|12|13|14|15|16|17|18|19|20) ]] && JVM_ARGS+=" --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio.file.spi=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.fs=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.http=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.https=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.ftp=ALL-UNNAMED --add-opens=java.base/sun.net.www.protocol.file=ALL-UNNAMED --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED --add-opens=java.base/jdk.internal.vm=ALL-UNNAMED --add-opens=java.base/java.util.regex=ALL-UNNAMED"
[[ $NXF_ENABLE_VIRTUAL_THREADS == 'true' ]] && JVM_ARGS+=" --enable-preview"

## flight recorded -- http://docs.oracle.com/javacomponents/jmc-5-4/jfr-runtime-guide/run.htm
##JVM_ARGS+=" -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=duration=60s,filename=myrecording.jfr"
Expand Down
5 changes: 4 additions & 1 deletion modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import nextflow.executor.ExecutorFactory
import nextflow.extension.CH
import nextflow.file.FileHelper
import nextflow.file.FilePorter
import nextflow.util.Threads
import nextflow.util.ThreadPoolManager
import nextflow.plugin.Plugins
import nextflow.processor.ErrorStrategy
Expand Down Expand Up @@ -511,7 +512,9 @@ class Session implements ISession {
registerSignalHandlers()

// create tasks executor
execService = Executors.newFixedThreadPool(poolSize)
execService = Threads.useVirtual()
? Executors.newVirtualThreadPerTaskExecutor()
: Executors.newFixedThreadPool(poolSize)

// signal start to trace observers
notifyFlowCreate()
Expand Down
3 changes: 3 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/cli/CmdInfo.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import nextflow.plugin.DefaultPlugins
import nextflow.plugin.Plugins
import nextflow.scm.AssetManager
import nextflow.util.MemoryUnit
import nextflow.util.Threads
import org.yaml.snakeyaml.Yaml
/**
* CLI sub-command INFO
Expand Down Expand Up @@ -217,6 +218,8 @@ class CmdInfo extends CmdBase {
final os = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean()
result << BLANK << "Process: ${ManagementFactory.getRuntimeMXBean().getName()} " << getLocalAddress() << NEWLINE
result << BLANK << "CPUs: ${os.availableProcessors} - Mem: ${totMem(os)} (${freeMem(os)}) - Swap: ${totSwap(os)} (${freeSwap(os)})"
if( Threads.useVirtual() )
result << " - Virtual threads ON"
}

if( level == 0 )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import java.nio.file.attribute.BasicFileAttributes

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.util.Threads

/**
* Watch the content of a directory for file system events
*
Expand Down Expand Up @@ -133,7 +135,7 @@ class DirWatcher implements DirListener {
this.watcher = base.getFileSystem().newWatchService()
this.watchedPaths = new HashMap<WatchKey,Path>()

thread = Thread.startDaemon {
thread = Threads.start {
try {
apply0()
}
Expand Down
80 changes: 57 additions & 23 deletions modules/nextflow/src/main/groovy/nextflow/file/FilePorter.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import java.nio.file.attribute.BasicFileAttributes
import java.util.concurrent.ExecutionException
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeoutException
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.Canonical
import groovy.transform.CompileStatic
Expand All @@ -40,6 +43,7 @@ import nextflow.extension.FilesEx
import nextflow.util.CacheHelper
import nextflow.util.Duration
import nextflow.util.ThreadPoolManager
import nextflow.util.Threads

/**
* Move foreign (ie. remote) files to the staging work area
Expand All @@ -55,6 +59,8 @@ class FilePorter {

static final private int MAX_RETRIES = 3

static final private int MAX_TRANSFERS = 50

static final private Duration POLL_TIMEOUT = Duration.of('2sec')

final Map<FileCopy,FileTransfer> stagingTransfers = new HashMap<>()
Expand All @@ -63,14 +69,26 @@ class FilePorter {

private Duration pollTimeout

private final Semaphore semaphore

final private Lock sync

final int maxRetries

final Session session
final int maxTransfers

final private Session session

FilePorter( Session session ) {
this.session = session
maxRetries = session.config.navigate('filePorter.maxRetries') as Integer ?: MAX_RETRIES
pollTimeout = session.config.navigate('filePorter.pollTimeout') as Duration ?: POLL_TIMEOUT
maxTransfers = session.config.navigate('filePorter.maxTransfers') as Integer ?: MAX_TRANSFERS
log.debug "File porter settings maxRetries=$maxRetries; maxTransfers=$maxTransfers; pollTimeout=$threadPool"
sync = new ReentrantLock()
// use a semaphore to cap the number of max transfer when using virtual thread
// when using platform threads the max transfers are limited by the thread pool itself
semaphore = Threads.useVirtual() ? new Semaphore(maxTransfers) : null
threadPool = new ThreadPoolManager('FileTransfer')
.withConfig(session.config)
.createAndRegisterShutdownCallback(session)
Expand All @@ -87,26 +105,26 @@ class FilePorter {
}

protected FileTransfer createFileTransfer(Path source, Path target) {
return new FileTransfer(source, target, maxRetries)
}

protected Future submitForExecution(FileTransfer transfer) {
threadPool.submit(transfer)
return new FileTransfer(source, target, maxRetries, semaphore)
}

protected FileTransfer getOrSubmit(FileCopy copy) {
synchronized (this) {
sync.lock()
try {
FileTransfer transfer = stagingTransfers.get(copy)
if( transfer == null ) {
transfer = createFileTransfer(copy.source, copy.target)
transfer.result = submitForExecution(transfer)
transfer.result = threadPool.submit(transfer)
stagingTransfers.put(copy, transfer)
}
// increment the ref count
transfer.refCount.incrementAndGet()

return transfer
}
finally {
sync.unlock()
}
}

protected void decOrRemove(FileTransfer action) {
Expand Down Expand Up @@ -258,12 +276,14 @@ class FilePorter {
*/
final int maxRetries

final private Semaphore semaphore
final AtomicInteger refCount
volatile Future result
private String message
private int debugDelay

FileTransfer(Path foreignPath, Path stagePath, int maxRetries=0) {
FileTransfer(Path foreignPath, Path stagePath, int maxRetries, Semaphore semaphore) {
this.semaphore = semaphore
this.source = foreignPath
this.target = stagePath
this.maxRetries = maxRetries
Expand All @@ -274,7 +294,15 @@ class FilePorter {

@Override
void run() throws Exception {
stageForeignFile(source, target)
if( semaphore )
semaphore.acquire()
try {
stageForeignFile(source, target)
}
finally {
if( semaphore )
semaphore.release()
}
}

/**
Expand Down Expand Up @@ -331,19 +359,25 @@ class FilePorter {
}
}

synchronized protected FileCopy getCachePathFor(Path sourcePath, Path stageDir) {
final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing
int i=0
while( true ) {
final uniq = List.of(sourcePath, dirPath, i++)
final hash = CacheHelper.hasher(uniq).hash().toString()
final targetPath = getCacheDir0(stageDir, hash).resolve(sourcePath.getName())
final result = new FileCopy(sourcePath, targetPath)
if( stagingTransfers.containsKey(result) )
return result
final exist = targetPath.exists()
if( !exist || checkPathIntegrity(sourcePath, targetPath) )
return result
protected FileCopy getCachePathFor(Path sourcePath, Path stageDir) {
sync.lock()
try {
final dirPath = stageDir.toUriString() // <-- use a string to avoid changes in the dir to alter the hashing
int i=0
while( true ) {
final uniq = List.of(sourcePath, dirPath, i++)
final hash = CacheHelper.hasher(uniq).hash().toString()
final targetPath = getCacheDir0(stageDir, hash).resolve(sourcePath.getName())
final result = new FileCopy(sourcePath, targetPath)
if( stagingTransfers.containsKey(result) )
return result
final exist = targetPath.exists()
if( !exist || checkPathIntegrity(sourcePath, targetPath) )
return result
}
}
finally {
sync.unlock()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import nextflow.Global
import nextflow.Session
import nextflow.extension.CH
import nextflow.util.CustomThreadFactory
import nextflow.util.Threads
import org.slf4j.Logger
import org.slf4j.LoggerFactory
/**
Expand Down Expand Up @@ -186,7 +187,10 @@ class PathVisitor {
@Memoized
@PackageScope
static ExecutorService createExecutor(Session session) {
final result = Executors.newCachedThreadPool(new CustomThreadFactory('PathVisitor'))
final factory = new CustomThreadFactory('PathVisitor')
final result = Threads.useVirtual()
? Executors.newThreadPerTaskExecutor(factory)
: Executors.newCachedThreadPool(factory)
Global.onCleanup((it) -> result.shutdown())
return result
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import nextflow.Session
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.Threads
import nextflow.util.Throttle
/**
* Monitors the queued tasks waiting for their termination
Expand Down Expand Up @@ -287,7 +288,7 @@ class TaskPollingMonitor implements TaskMonitor {
session.onShutdown { this.cleanup() }

// launch the thread polling the queue
Thread.start('Task monitor') {
Threads.start('Task monitor') {
try {
pollLoop()
}
Expand All @@ -298,7 +299,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

// launch daemon that submits tasks for execution
Thread.startDaemon('Task submitter', this.&submitLoop)
Threads.start('Task submitter', this.&submitLoop)

return this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import jline.TerminalFactory
import nextflow.Session
import nextflow.processor.TaskHandler
import nextflow.util.Duration
import nextflow.util.Threads
import org.fusesource.jansi.Ansi
import org.fusesource.jansi.AnsiConsole
import static nextflow.util.LoggerHelper.isHashLogPrefix
Expand Down Expand Up @@ -405,7 +406,7 @@ class AnsiLogObserver implements TraceObserver {
this.statsObserver = session.statsObserver
this.startTimestamp = System.currentTimeMillis()
AnsiConsole.systemInstall()
this.renderer = Thread.start('AnsiLogObserver', this.&render0)
this.renderer = Threads.start('AnsiLogObserver', this.&render0)
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/

package nextflow.util

import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.util.logging.Slf4j
import groovyx.gpars.scheduler.Pool
import groovyx.gpars.scheduler.ResizeablePool
import groovyx.gpars.util.PoolFactory
Expand All @@ -27,6 +29,7 @@ import groovyx.gpars.util.PoolFactory
*
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
@CompileStatic
class CustomPoolFactory implements PoolFactory {

Expand All @@ -40,7 +43,8 @@ class CustomPoolFactory implements PoolFactory {
@Override
synchronized Pool createPool() {

def type = property(NXF_POOL_TYPE, 'default')
final defType = Threads.useVirtual() ? 'virtual' : 'default'
final type = property(NXF_POOL_TYPE, defType)
switch (type) {
case 'default':
int poolSize = Runtime.runtime.availableProcessors() +1
Expand All @@ -66,6 +70,10 @@ class CustomPoolFactory implements PoolFactory {
int size = property(NXF_MAX_THREADS, cpus+1) as int
return CustomThreadPool.unboundedPool(size)

case 'virtual':
log.debug "Creating virtual thread pool"
return CustomThreadPool.virtualPool()

default:
throw new IllegalAccessException("Unknown thread pool type: `$type`")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ static Pool synchronousPool(int maxThreads) {
);
}

static Pool virtualPool() {
return new VirtualThreadPool( newDaemonThreadFactory() );
}

/**
* Creates the pool with specified executor.
*/
Expand Down
Loading