Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move foreign file staging after storeDir evaluation #5759

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ class TaskProcessor {
// -- map the inputs to a map and use to delegate closure values interpolation
final secondPass = [:]
int count = makeTaskContextStage1(task, secondPass, values)
makeTaskContextStage2(task, secondPass, count)
final foreignFiles = makeTaskContextStage2(task, secondPass, count)

// verify that `when` guard, when specified, is satisfied
if( !checkWhenGuard(task) )
Expand All @@ -653,6 +653,9 @@ class TaskProcessor {
if( checkStoredOutput(task) )
return

// -- download foreign files
session.filePorter.transfer(foreignFiles)

def hash = createTaskHashKey(task)
checkCachedOrLaunchTask(task, hash, resumable)
}
Expand Down Expand Up @@ -1928,7 +1931,7 @@ class TaskProcessor {
throw new ProcessUnrecoverableException("Not a valid path value: '$str'")
}

protected List<FileHolder> normalizeInputToFiles( Object obj, int count, boolean coerceToPath, FilePorter.Batch batch ) {
protected List<FileHolder> normalizeInputToFiles( Object obj, int count, boolean coerceToPath, FilePorter.Batch foreignFiles ) {

Collection allItems = obj instanceof Collection ? obj : [obj]
def len = allItems.size()
Expand All @@ -1939,7 +1942,7 @@ class TaskProcessor {

if( item instanceof Path || coerceToPath ) {
def path = normalizeToPath(item)
def target = executor.isForeignFile(path) ? batch.addToForeign(path) : path
def target = executor.isForeignFile(path) ? foreignFiles.addToForeign(path) : path
def holder = new FileHolder(target)
files << holder
}
Expand Down Expand Up @@ -2139,20 +2142,20 @@ class TaskProcessor {
return count
}

final protected void makeTaskContextStage2( TaskRun task, Map secondPass, int count ) {
final protected FilePorter.Batch makeTaskContextStage2( TaskRun task, Map secondPass, int count ) {

final ctx = task.context
final allNames = new HashMap<String,Integer>()

final FilePorter.Batch batch = session.filePorter.newBatch(executor.getStageDir())
final FilePorter.Batch foreignFiles = session.filePorter.newBatch(executor.getStageDir())

// -- all file parameters are processed in a second pass
// so that we can use resolve the variables that eventually are in the file name
for( Map.Entry<FileInParam,?> entry : secondPass.entrySet() ) {
final param = entry.getKey()
final val = entry.getValue()
final fileParam = param as FileInParam
final normalized = normalizeInputToFiles(val, count, fileParam.isPathQualifier(), batch)
final normalized = normalizeInputToFiles(val, count, fileParam.isPathQualifier(), foreignFiles)
final resolved = expandWildcards( fileParam.getFilePattern(ctx), normalized )

if( !param.isValidArity(resolved.size()) )
Expand Down Expand Up @@ -2180,9 +2183,7 @@ class TaskProcessor {
def message = "Process `$name` input file name collision -- There are multiple input files for each of the following file names: ${conflicts.keySet().join(', ')}"
throw new ProcessUnrecoverableException(message)
}

// -- download foreign files
session.filePorter.transfer(batch)
return foreignFiles
}

protected void makeTaskContextStage3( TaskRun task, HashCode hash, Path folder ) {
Expand Down