Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -545,9 +545,10 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
if( state in COMPLETED ) {
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - terminated job=$jobId; task=$taskId; state=$state"
// finalize the task
task.exitStatus = readExitFile()
task.exitStatus = getExitCode()
if( state == 'FAILED' ) {
if( task.exitStatus == Integer.MAX_VALUE )
// When no exit code or 500XX codes, get the jobError reason from events
if( task.exitStatus == Integer.MAX_VALUE || task.exitStatus >= 50000)
task.error = getJobError()
task.stdout = executor.logging.stdout(uid, taskId) ?: outputFile
task.stderr = executor.logging.stderr(uid, taskId) ?: errorFile
Expand All @@ -565,6 +566,28 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
return false
}

/**
* Try to get the latest exit code form the task status events list.
* Fallback to read .exitcode file generated by Nextflow if not found (null).
* The rationale of this is that, in case of error, the exit code return by the batch API is more reliable.
*
* @return exit code if found, otherwise Integer.MAX_VALUE
*/
private Integer getExitCode(){
final events = client.getTaskStatus(jobId, taskId)?.getStatusEventsList()
if( events ) {
final batchExitCode = events.stream().filter(ev -> ev.hasTaskExecution())
.max( (ev1, ev2) -> Long.compare(ev1.getEventTime().seconds, ev2.getEventTime().seconds) )
.map(ev -> ev.getTaskExecution().getExitCode())
.orElse(null)
if( batchExitCode != null )
return batchExitCode
}
// fallback to read
log.debug("[GOOGLE BATCH] Exit code not found from API. Checking .exitcode file...")
return readExitFile()
}

protected Throwable getJobError() {
try {
final events = noTaskJobfailure
Expand All @@ -574,7 +597,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}; exit code: ${lastEvent?.taskExecution?.exitCode}"

final error = lastEvent?.description
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find()) ) {
if( error && (EXIT_CODE_REGEX.matcher(error).find() || BATCH_ERROR_REGEX.matcher(error).find())) {
return new ProcessException(error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import com.google.api.gax.grpc.GrpcStatusCode
import com.google.api.gax.rpc.NotFoundException
import com.google.cloud.batch.v1.JobStatus
import com.google.cloud.batch.v1.Task
import com.google.cloud.batch.v1.TaskExecution
import io.grpc.Status
import nextflow.cloud.google.batch.logging.BatchLogging
import nextflow.exception.ProcessException

import java.nio.file.Path

Expand Down Expand Up @@ -473,15 +476,18 @@ class GoogleBatchTaskHandlerTest extends Specification {

}

TaskStatus makeTaskStatus(TaskStatus.State state, String desc) {
TaskStatus makeTaskStatus(TaskStatus.State state, String desc, Integer exitCode = null) {
def builder = TaskStatus.newBuilder()
if (state)
builder.setState(state)
if (desc)
builder.addStatusEvents(
StatusEvent.newBuilder()
.setDescription(desc)
)
if (desc || exitCode != null) {
def statusBuilder = StatusEvent.newBuilder()
if (desc)
statusBuilder.setDescription(desc)
if (exitCode != null)
statusBuilder.setTaskExecution(TaskExecution.newBuilder().setExitCode(exitCode).build())
builder.addStatusEvents(statusBuilder.build())
}
builder.build()
}

Expand Down Expand Up @@ -665,4 +671,104 @@ class GoogleBatchTaskHandlerTest extends Specification {
.build()

}

def 'should check if completed from task status' () {
given:
def jobId = '1'
def taskId = '1'
def client = Mock(BatchClient){
getTaskInArrayStatus(jobId, taskId) >> makeTaskStatus(STATE,"", EXIT_CODE)
getTaskStatus(jobId, taskId) >> makeTaskStatus(STATE,"", EXIT_CODE)
getJobStatus(jobId) >> makeJobStatus(JOB_STATUS,"")
}
def logging = Mock(BatchLogging)
def executor = Mock(GoogleBatchExecutor){
getLogging() >> logging
}
def task = new TaskRun()
task.name = 'hello'
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor))
when:
def result = handler.checkIfCompleted()
then:
handler.status == TASK_STATUS
handler.task.exitStatus == EXIT_STATUS
result == RESULT

where:
JOB_STATUS | STATE | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT
JobStatus.State.SUCCEEDED | TaskStatus.State.SUCCEEDED | 0 | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true
JobStatus.State.FAILED | TaskStatus.State.FAILED | 1 | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true
JobStatus.State.RUNNING | TaskStatus.State.RUNNING | null | true | nextflow.processor.TaskStatus.RUNNING | Integer.MAX_VALUE | false
JobStatus.State.SUCCEEDED | TaskStatus.State.SUCCEEDED | 0 | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true
JobStatus.State.FAILED | TaskStatus.State.FAILED | 1 | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true
JobStatus.State.RUNNING | TaskStatus.State.RUNNING | null | false | nextflow.processor.TaskStatus.RUNNING | Integer.MAX_VALUE | false

}

def 'should check if completed from read file' () {
given:
def jobId = '1'
def taskId = '1'
def client = Mock(BatchClient){
getTaskInArrayStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null }
getTaskStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null }
getJobStatus(jobId ) >> makeJobStatus(JobStatus.State.FAILED,DESC)
}
def logging = Mock(BatchLogging)
def executor = Mock(GoogleBatchExecutor){
getLogging() >> logging
}
def task = new TaskRun()
task.name = 'hello'
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor))
when:
def result = handler.checkIfCompleted()
then:
1 * handler.readExitFile() >> EXIT_STATUS
handler.status == TASK_STATUS
handler.task.exitStatus == EXIT_STATUS
handler.task.error?.message == TASK_ERROR
result == RESULT

where:
TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR
TaskStatus.State.FAILED | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null
null | "Error" | null | true | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null
TaskStatus.State.FAILED | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 0 | true | null
null | "Error" | null | false | nextflow.processor.TaskStatus.COMPLETED | 1 | true | null
}

def 'should check if completed when 500xx errors' () {
given:
def jobId = '1'
def taskId = '1'
def client = Mock(BatchClient){
getTaskInArrayStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null }
getTaskStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE, DESC, EXIT_CODE): null }
getJobStatus(jobId ) >> makeJobStatus(JobStatus.State.FAILED,DESC)
}
def logging = Mock(BatchLogging)
def executor = Mock(GoogleBatchExecutor){
getLogging() >> logging
}
def task = new TaskRun()
task.name = 'hello'
def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task, isArrayChild: ARRAY_CHILD, status: nextflow.processor.TaskStatus.RUNNING, executor: executor))
when:
def result = handler.checkIfCompleted()
then:
0 * handler.readExitFile() >> EXIT_STATUS
handler.status == TASK_STATUS
handler.task.exitStatus == EXIT_STATUS
handler.task.error?.message == TASK_ERROR
result == RESULT

where:
TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR
TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | true | nextflow.processor.TaskStatus.COMPLETED | 50001 | true | 'Task failed due to Spot VM preemption with exit code 50001.'
TaskStatus.State.FAILED | 'Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus.COMPLETED | 50001 | true | 'Task failed due to Spot VM preemption with exit code 50001.'
}


}