Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Fusion support to HTCondor #5865

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
da34e80
Support container directive for condor executor
bentsherman Jun 21, 2023
f2b63a5
Merge branch 'master' into 3705-condor-docker-support
bentsherman Aug 23, 2023
f1fa5c9
Use condor_history to query job status
bentsherman Aug 23, 2023
ee457e8
Merge pull request #1 from JosephLalli/3705-condor-docker-support
JosephLalli Jan 7, 2024
a27ea06
bring up to date with #3697
JosephLalli Jan 9, 2024
0ea74a7
Merge pull request #2 from nextflow-io/master
JosephLalli Jan 10, 2024
f6f4c1c
Adding changes from https://github.com/nextflow-io/nextflow/pull/4141
JosephLalli Jan 16, 2024
a4a61d4
allow for parallel build
JosephLalli Jan 16, 2024
6c7a5d2
run fusion on htcondor (mid-debugging, messy)
JosephLalli Jan 17, 2024
fb9420c
Merge branch 'master' into master
JosephLalli Feb 27, 2025
611ae7d
Bringing Condor Executor up to date with NF25
JosephLalli Feb 28, 2025
2105054
Continuting to revert (containerconfig and wavedebug) to identify cau…
JosephLalli Feb 28, 2025
340af26
Kept the reference to custom CondorTaskHandler, but had reverted the …
JosephLalli Feb 28, 2025
f267545
Adding my local copy of nextflow's current condor exector file to .gi…
JosephLalli Feb 28, 2025
be32f6d
Actually get rid of my condorExecutorNextflow backup
JosephLalli Feb 28, 2025
bddf57c
remove getHeaderToken (throwing error, unclear function, not in curre…
JosephLalli Feb 28, 2025
f710eb3
removing another duplicated function [stdinLauncherScript]
JosephLalli Feb 28, 2025
0d1e6d3
Fixing typos in variable names
JosephLalli Feb 28, 2025
14cde90
Getting code to compile and compile quickly by removing specialized C…
JosephLalli Mar 1, 2025
1d0c616
Adding back in task handler that is able to create condorfile as a st…
JosephLalli Mar 2, 2025
5036c37
clarifying readAttributes debug message
JosephLalli Mar 2, 2025
171d3bb
Add 10000 to build number to ensure I can know that we are not runnin…
JosephLalli Mar 2, 2025
55c8434
Second attempt at changing an obvious aspect of the build to ensure t…
JosephLalli Mar 2, 2025
ea3c73c
add work dir to condor log files
JosephLalli Mar 3, 2025
6357c53
debugging outfile path specification in condor submitfile
JosephLalli Mar 3, 2025
1fc850d
Trying to run containers in vanilla universe
JosephLalli Mar 4, 2025
3ae5060
fix typo
JosephLalli Mar 4, 2025
f8bd848
remembering to specify vanilla universe in fusion
JosephLalli Mar 4, 2025
b774e48
go back and remove container universe references
JosephLalli Mar 4, 2025
65b4e4d
OK. Adding back custom Condor task handler.
JosephLalli Mar 4, 2025
130bc7f
This currently works on CHTC servers when running "./launch_nextflow.…
JosephLalli Mar 7, 2025
9c1697a
Adding debug error messages to parseQueueStatus because it's not work…
JosephLalli Mar 8, 2025
b14a4b3
removing started variable, which I don't know what it does
JosephLalli Mar 8, 2025
b5ef3a6
The started variable is what skips the header line. Replaced with a d…
JosephLalli Mar 8, 2025
9a9403d
Cleaning code, removing commented out code
JosephLalli Mar 9, 2025
73a5b50
Adding a few clarifying comments
JosephLalli Mar 9, 2025
ae98186
Merge branch 'master' into merging_w_nf
JosephLalli Mar 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -40,3 +40,5 @@ plugins-prod
sandbox
wave-tests
x/*
**/CondorExecutorNextflow.groovy
modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutorNextflow.groovy
1 change: 1 addition & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
@@ -193,6 +193,7 @@ To enable the HTCondor executor, set `process.executor = 'condor'` in the `nextf
Resource requests and other job characteristics can be controlled via the following process directives:

- {ref}`process-clusterOptions`
- {ref}`process-container`
- {ref}`process-cpus`
- {ref}`process-disk`
- {ref}`process-memory`
1 change: 0 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
org.gradle.caching=true
org.gradle.jvmargs=-Xmx4g
org.gradle.parallel=true
Original file line number Diff line number Diff line change
@@ -98,6 +98,7 @@ class NextflowMeta {
private NextflowMeta() {
version = new VersionNumber(BuildInfo.version)
build = BuildInfo.buildNum as int
build += 10000
timestamp = BuildInfo.timestampUTC
}

Original file line number Diff line number Diff line change
@@ -687,7 +687,7 @@ class Launcher {

"""
N E X T F L O W
version ${BuildInfo.version} build ${BuildInfo.buildNum}
version ${BuildInfo.version} build ${BuildInfo.buildNum} - JLL
created ${BuildInfo.timestampUTC} ${BuildInfo.timestampDelta}
cite doi:10.1038/nbt.3820
http://nextflow.io
Original file line number Diff line number Diff line change
@@ -124,6 +124,11 @@ class ContainerConfig extends LinkedHashMap {
final eng = getEngine()
if( !eng )
return null
// JLL code to implement docker/podman rootless fuse access. I don't think it works. I am currently trying to identify the source of another bug, and this code is currently not being used. Reverting to nextflow standard.
// if( eng=='docker' )
// return '--rm --device /dev/fuse --security-opt apparmor=unconfined --security-opt seccomp=unconfined'
// if( eng=='podman' )
// return '--rm --device /dev/fuse'
if( eng=='docker' || eng=='podman' )
return '--rm --privileged'
if( isSingularityOciMode() )
Original file line number Diff line number Diff line change
@@ -387,7 +387,7 @@ class BashWrapperBuilder {
binding.fix_ownership = fixOwnership() ? "[ \${NXF_OWNER:=''} ] && (shopt -s extglob; GLOBIGNORE='..'; chown -fR --from root \$NXF_OWNER ${workDir}/{*,.*}) || true" : null

binding.trace_script = isTraceRequired() ? getTraceScript(binding) : null

return binding
}

201 changes: 182 additions & 19 deletions modules/nextflow/src/main/groovy/nextflow/executor/CondorExecutor.groovy
Original file line number Diff line number Diff line change
@@ -19,7 +19,24 @@ import java.nio.file.Path

import groovy.transform.CompileStatic
import groovy.transform.InheritConstructors
import nextflow.container.ContainerBuilder
import nextflow.processor.TaskRun
import nextflow.fusion.FusionHelper
import nextflow.file.FileHelper
import nextflow.extension.FilesEx
import nextflow.exception.ProcessException
import nextflow.util.MemoryUnit
import nextflow.SysEnv

import static java.nio.file.StandardOpenOption.*

import java.nio.file.FileSystemException
import java.nio.file.FileSystems
import java.nio.file.Files
import java.nio.file.Path



/**
* HTCondor executor
*
@@ -41,23 +58,47 @@ class CondorExecutor extends AbstractGridExecutor {

protected String getDirectivesText(TaskRun task) {
def lines = getDirectives(task)
lines << ''
lines.join('\n')
}

@Override
protected String getHeaderToken() {
throw new UnsupportedOperationException()
// @Override
// protected String getHeaderToken() {
// throw new UnsupportedOperationException()
// }


// Condor does not require a special token or header
protected String getHeaderToken() { return '' }

/**
* Defines the jobs directive headers
*
* @param task
* @return A multi-line string containing the job directives
*/
String getHeaders( TaskRun task ) {
return getDirectivesText(task)
}


// We currently assume that the system has been configured so that
// anyone (user) who can run an HTCondor job can also run docker. It's
// also apparently a security worry to run Docker as root, so let's not.
// https://github.com/htcondor/htcondor/blob/02c6bc70543951cf9d352e3fbf3343a925a47e3a/src/condor_utils/docker-api.cpp#L99C1-L102C1

@Override
protected List<String> getDirectives(TaskRun task, List<String> result) {

result << "universe = vanilla"
result << "executable = ${TaskRun.CMD_RUN}".toString()
result << "log = ${TaskRun.CMD_LOG}".toString()

result << "out = ${TaskRun.CMD_OUTFILE}".toString()
result << "error = ${TaskRun.CMD_ERRFILE}".toString()
result << "log = .condor_runlog.uuid-${session.uniqueId}.log".toString()
result << "getenv = true"

result << "transfer_executable = False" // handled by nextflow
result << "transfer_output_files=\"\"" // ditto

if( task.config.getCpus()>1 ) {
result << "request_cpus = ${task.config.getCpus()}".toString()
result << "machine_count = 1"
@@ -84,14 +125,21 @@ class CondorExecutor extends AbstractGridExecutor {
result.addAll( opts.toString().tokenize(';\n').collect{ it.trim() })
}
}

result<< "queue"

if ( ! pipeLauncherScript() && ! task.isContainerEnabled() ) {
result << "executable = ${task.CMD_RUN}".toString()
result << "environment = ${task.getEnvironment()}".toString()
}
result << 'queue'
}
return result
}


@Override
List<String> getSubmitCommandLine(TaskRun task, Path scriptFile) {
return ['condor_submit', '--terse', CMD_CONDOR]
return pipeLauncherScript()
? List.of('condor_submit', '-', '-terse',)
: List.of('condor_submit', '-terse', CMD_CONDOR)
}

@Override
@@ -106,7 +154,7 @@ class CondorExecutor extends AbstractGridExecutor {

@Override
protected List<String> queueStatusCommand(Object queue) {
["condor_q", "-nobatch"]
["condor_history", "-userlog", ".condor_runlog.uuid-${session.uniqueId}.log".toString(), "-wide","-af:j", "JobStatus"]
}


@@ -117,37 +165,151 @@ class CondorExecutor extends AbstractGridExecutor {
'X': QueueStatus.ERROR, // Removed
'C': QueueStatus.DONE, // Completed
'H': QueueStatus.HOLD, // Held
'E': QueueStatus.ERROR // Error
'E': QueueStatus.ERROR, // Error

// numeric encoding
'0': QueueStatus.PENDING, // Unexpanded
'1': QueueStatus.PENDING, // Idle
'2': QueueStatus.RUNNING, // Running
'3': QueueStatus.ERROR, // Removed
'4': QueueStatus.DONE, // Completed
'5': QueueStatus.HOLD, // Held
'6': QueueStatus.ERROR // Error
]


@Override
protected Map<String, QueueStatus> parseQueueStatus(String text) {
final result = new LinkedHashMap<String, QueueStatus>()
if( !text ) return result
if( !text ) {
println("escaping because !text")
return result
}

boolean started = false
def itr = text.readLines().iterator()
while( itr.hasNext() ) {
String line = itr.next()
if( !started ) {
started = line.startsWith(' ID ')
continue
}
if( line.startsWith(' ID ') ) continue

if( !line.trim() ) {
break
}

def cols = line.tokenize(' ')
def id = cols[0]
def st = cols[5]
def st = cols[1]
result[id] = DECODE_STATUS[st]
}

return result
}

@Override
protected boolean pipeLauncherScript() {
return isFusionEnabled()
}

@Override
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}


/*
* Prepare and launch the task in the underlying execution platform
*/
CondorTaskHandler createTaskHandler(TaskRun task) {
assert task
assert task.workDir

new CondorTaskHandler(task, this)
}


/**
* Handles a job execution in the underlying grid platform
*/
@CompileStatic
@InheritConstructors
class CondorTaskHandler extends GridTaskHandler {
// creates text of bash wrapper executable file to run on remote server
protected String generateFusionBashWrapperCommand() {
final submit = fusionSubmitCli()
final launcher = fusionLauncher()
final config = task.getContainerConfig()
final containerOpts = task.config.getContainerOptions()
final cmd = FusionHelper.runWithContainer(launcher, config, task.getContainer(), containerOpts, submit)

return '#!/bin/bash\n' + cmd + '\n'
}

// creates condor submit file that is fed to stdin. The submit file specifies an executable
protected String fusionStdinWrapper() {
final fusionBashWrapperText = generateFusionBashWrapperCommand()

final String tmp_launch_script = ".condor.${task.id}.${task.hash}.sh"
final Path executable_file_name = FileHelper.getLocalTempPath().resolve(tmp_launch_script)
// save the bash command to a script on disk
executable_file_name.text = fusionBashWrapperText

FilesEx.setExecutable(executable_file_name, true)
def submit_file_commands = getDirectives(task)
submit_file_commands << "executable = ${executable_file_name}".toString()
submit_file_commands << "transfer_executable = True"
submit_file_commands << "queue"
submit_file_commands << ""

return submit_file_commands.join('\n')

}


// code copied from BashWrapperBuilder to allow for writing a wrapper script to give to condor
private static MemoryUnit DEFAULT_STAGE_FILE_THRESHOLD = MemoryUnit.of('1 MB')
private static int DEFAULT_WRITE_BACK_OFF_BASE = 3
private static int DEFAULT_WRITE_BACK_OFF_DELAY = 250
private static int DEFAULT_WRITE_MAX_ATTEMPTS = 5

private MemoryUnit stageFileThreshold = SysEnv.get('NXF_WRAPPER_STAGE_FILE_THRESHOLD') as MemoryUnit ?: DEFAULT_STAGE_FILE_THRESHOLD
private int writeBackOffBase = SysEnv.get('NXF_WRAPPER_BACK_OFF_BASE') as Integer ?: DEFAULT_WRITE_BACK_OFF_BASE
private int writeBackOffDelay = SysEnv.get('NXF_WRAPPER_BACK_OFF_DELAY') as Integer ?: DEFAULT_WRITE_BACK_OFF_DELAY
private int writeMaxAttempts = SysEnv.get('NXF_WRAPPER_MAX_ATTEMPTS') as Integer ?: DEFAULT_WRITE_MAX_ATTEMPTS
static protected boolean isRetryable0(Exception e) {
if( e instanceof FileSystemException )
return true
if( e instanceof SocketException )
return true
if( e instanceof RuntimeException )
return true
if( e.class.getSimpleName() == 'HttpResponseException' )
return true
return false
}

private Path write0(Path path, String data) {
int attempt=0
while( true ) {
try {
try (BufferedWriter writer=Files.newBufferedWriter(path, CREATE,WRITE,TRUNCATE_EXISTING)) {
writer.write(data)
}
return path
}
catch (Exception e) {
if( !this.isRetryable0(e) )
throw e
final isLocalFS = path.getFileSystem()==FileSystems.default
// the retry logic is needed for non-local file system such as S3.
// when the file is local fail without retrying
if( isLocalFS || ++attempt>=writeMaxAttempts )
throw new ProcessException("Unable to create file ${path.toUriString()}", e)
// use an exponential delay before making another attempt
final delay = (Math.pow(writeBackOffBase, attempt) as long) * writeBackOffDelay
Thread.sleep(delay)
}
}
}
}

@InheritConstructors
static class CondorWrapperBuilder extends BashWrapperBuilder {
@@ -163,5 +325,6 @@ class CondorExecutor extends AbstractGridExecutor {
return wrapper
}


}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* Copyright 2013-2024, Seqera Labs
* Copyright 2020-2022, Seqera Labs
* Copyright 2013-2019, Centre for Genomic Regulation (CRG)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,6 +16,7 @@
*/

package nextflow.executor

import java.nio.file.Files

import nextflow.Session
@@ -24,6 +26,8 @@ import nextflow.processor.TaskConfig
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import spock.lang.Specification
import spock.lang.Unroll

/**
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
@@ -130,14 +134,23 @@ class CondorExecutorTest extends Specification {

}


@Unroll
def 'should return launch command line' () {

given:
def executor = [:] as CondorExecutor
def session = Mock(Session) { getConfig() >> [:] }
def exec = Spy(CondorExecutor) { getSession() >> session }

expect:
executor.getSubmitCommandLine( Mock(TaskRun), null) == ['condor_submit', '--terse', '.command.condor']
when:
def result = exec.getSubmitCommandLine(Mock(TaskRun), null)
then:
exec.pipeLauncherScript() >> PIPE
result == EXPECTED

where:
PIPE | EXPECTED
false | ['condor_submit', '-terse', '.command.condor']
true | ['condor_submit', '-terse']

}

@@ -270,4 +283,4 @@ class CondorExecutorTest extends Specification {

}

}
}
Original file line number Diff line number Diff line change
@@ -686,7 +686,7 @@ public <A extends BasicFileAttributes> A readAttributes(Path path, Class<A> type
: readAttr0(s3Path));
}
// not support attribute class
throw new UnsupportedOperationException(format("only %s supported", BasicFileAttributes.class));
throw new UnsupportedOperationException(format("while trying readAttributes of %s, only %s supported", s3Path.toString(), BasicFileAttributes.class));
}

private Optional<S3FileAttributes> readAttr1(S3Path s3Path) throws IOException {