Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problem that Nextflow Executions stuck on Kubernetes #5501

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 41 additions & 39 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -418,48 +418,50 @@ class TraceRecord implements Serializable {
*/

TraceRecord parseTraceFile( Path file ) {
try (BufferedReader reader = new BufferedReader(new FileReader(file.toFile()))) {

final text = file.text

final lines = text.readLines()
if( !lines )
return this
if( lines[0] != 'nextflow.trace/v2' )
return parseLegacy(file, lines)

for( int i=0; i<lines.size(); i++ ) {
final pair = lines[i].tokenize('=')
final name = pair[0]
final value = pair[1]
if( value == null )
continue

switch (name) {
case '%cpu':
case '%mem':
// fields '%cpu' and '%mem' are expressed as percent value
this.put(name, parseInt(value, file, name) / 10F)
break

case 'rss':
case 'vmem':
case 'peak_rss':
case 'peak_vmem':
// these fields are provided in KB, so they are normalized to bytes
def val = parseLong(value, file, name) * 1024
this.put(name, val)
break

case 'cpu_model':
this.put(name, value)
break

default:
def val = parseLong(value, file, name)
this.put(name, val)
break
String line
if( ( line = reader.readLine() ) == null ) {
return this
} else if ( line != 'nextflow.trace/v2' ) {
return parseLegacy(file, file.readLines())
}

while(( line = reader.readLine() ) != null ) {
final pair = line.tokenize('=')
final name = pair[0]
final value = pair[1]
if( value == null )
continue

switch (name) {
case '%cpu':
case '%mem':
// fields '%cpu' and '%mem' are expressed as percent value
this.put(name, parseInt(value, file, name) / 10F)
break

case 'rss':
case 'vmem':
case 'peak_rss':
case 'peak_vmem':
// these fields are provided in KB, so they are normalized to bytes
def val = parseLong(value, file, name) * 1024
this.put(name, val)
break

case 'cpu_model':
this.put(name, value)
break

default:
def val = parseLong(value, file, name)
this.put(name, val)
break
}
}
} catch (IOException e) {
e.printStackTrace();
}

return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package nextflow.processor

import spock.lang.TempDir

import java.nio.file.Path
import java.util.concurrent.atomic.LongAdder

import nextflow.Session
Expand All @@ -33,6 +36,9 @@ class TaskHandlerTest extends Specification {

def static final long KB = 1024

@TempDir
Path folder

def 'test get trace record'() {

given:
Expand All @@ -42,7 +48,6 @@ class TaskHandlerTest extends Specification {
'''
.leftTrim()

def folder = TestHelper.createInMemTempDir()
folder.resolve( TaskRun.CMD_TRACE ).text = traceText

def config = [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
*/

package nextflow.trace

import spock.lang.TempDir

import java.nio.file.Files

import nextflow.Session
Expand All @@ -28,7 +31,9 @@ import nextflow.processor.TaskStatus
import nextflow.util.CacheHelper
import nextflow.util.Duration
import spock.lang.Specification
import test.TestHelper

import java.nio.file.Path

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand All @@ -37,6 +42,9 @@ class TraceFileObserverTest extends Specification {

final static long MB = 1024 * 1024

@TempDir
Path tempDir

def setupSpec() {
TraceRecord.TIMEZONE = TimeZone.getTimeZone('UTC') // note: set the timezone to be sure the time string does not change on CI test servers
}
Expand Down Expand Up @@ -235,7 +243,7 @@ class TraceFileObserverTest extends Specification {

given:
final KB = 1024L
final file = TestHelper.createInMemTempFile('trace')
final file = Files.createTempFile(tempDir,'trace', '.txt')
file.text = '''
pid state %cpu %mem vmem rss peak_vmem peak_rss rchar wchar syscr syscw read_bytes write_bytes
18 0 7999 46 7868980 6694900 7876620 6702144 2147483647 2147483647 44001533 148401890 2147483647 2147483647
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package nextflow.trace

import groovy.json.JsonSlurper
import spock.lang.Specification
import spock.lang.TempDir
import spock.lang.Unroll
import test.TestHelper

import java.nio.file.Files
import java.nio.file.Path

/**
*
* @author Paolo Di Tommaso <[email protected]>
Expand All @@ -33,6 +37,9 @@ class TraceRecordTest extends Specification {
TraceRecord.TIMEZONE = TimeZone.getTimeZone('UTC') // note: set the timezone to be sure the time string does not change on CI test servers
}

@TempDir
Path tempDir

def 'test get field'() {

given:
Expand Down Expand Up @@ -153,7 +160,7 @@ class TraceRecordTest extends Specification {
def 'should parse a trace file and return a TraceRecord object'() {

given:
def file = TestHelper.createInMemTempFile('trace')
def file = Files.createTempFile(tempDir,'trace', '.txt')
file.text = '''\
nextflow.trace/v2
realtime=12021
Expand Down Expand Up @@ -200,7 +207,7 @@ class TraceRecordTest extends Specification {
def 'should parse a legacy trace file and return a TraceRecord object'() {

given:
def file = TestHelper.createInMemTempFile('trace')
def file = Files.createTempFile(tempDir,'trace', '.txt')
file.text = '''
pid state %cpu %mem vmem rss peak_vmem peak_rss rchar wchar syscr syscw read_bytes write_bytes
1 0 10 20 11084 1220 21084 2220 4790 12 11 1 20 30
Expand Down