@@ -21,7 +21,10 @@ import com.google.api.gax.grpc.GrpcStatusCode
2121import com.google.api.gax.rpc.NotFoundException
2222import com.google.cloud.batch.v1.JobStatus
2323import com.google.cloud.batch.v1.Task
24+ import com.google.cloud.batch.v1.TaskExecution
2425import io.grpc.Status
26+ import nextflow.cloud.google.batch.logging.BatchLogging
27+ import nextflow.exception.ProcessException
2528
2629import java.nio.file.Path
2730
@@ -473,15 +476,18 @@ class GoogleBatchTaskHandlerTest extends Specification {
473476
474477 }
475478
476- TaskStatus makeTaskStatus (TaskStatus.State state , String desc ) {
479+ TaskStatus makeTaskStatus (TaskStatus.State state , String desc , Integer exitCode = null ) {
477480 def builder = TaskStatus . newBuilder()
478481 if (state)
479482 builder. setState(state)
480- if (desc)
481- builder. addStatusEvents(
482- StatusEvent . newBuilder()
483- .setDescription(desc)
484- )
483+ if (desc || exitCode != null ) {
484+ def statusBuilder = StatusEvent . newBuilder()
485+ if (desc)
486+ statusBuilder. setDescription(desc)
487+ if (exitCode != null )
488+ statusBuilder. setTaskExecution(TaskExecution . newBuilder(). setExitCode(exitCode). build())
489+ builder. addStatusEvents(statusBuilder. build())
490+ }
485491 builder. build()
486492 }
487493
@@ -665,4 +671,74 @@ class GoogleBatchTaskHandlerTest extends Specification {
665671 .build()
666672
667673 }
674+
675+ def ' should check if completed from task status' () {
676+ given :
677+ def jobId = ' 1'
678+ def taskId = ' 1'
679+ def client = Mock (BatchClient ){
680+ getTaskInArrayStatus(jobId, taskId) >> makeTaskStatus(STATE ," " , EXIT_CODE )
681+ getTaskStatus(jobId, taskId) >> makeTaskStatus(STATE ," " , EXIT_CODE )
682+ getJobStatus(jobId) >> makeJobStatus(JOB_STATUS ," " )
683+ }
684+ def logging = Mock (BatchLogging )
685+ def executor = Mock (GoogleBatchExecutor ){
686+ getLogging() >> logging
687+ }
688+ def task = new TaskRun ()
689+ task. name = ' hello'
690+ def handler = Spy (new GoogleBatchTaskHandler (jobId : jobId, taskId : taskId, client : client, task : task, isArrayChild : ARRAY_CHILD , status : nextflow.processor.TaskStatus . RUNNING , executor : executor))
691+ when :
692+ def result = handler. checkIfCompleted()
693+ then :
694+ handler. status == TASK_STATUS
695+ handler. task. exitStatus == EXIT_STATUS
696+ result == RESULT
697+
698+ where :
699+ JOB_STATUS | STATE | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT
700+ JobStatus.State . SUCCEEDED | TaskStatus.State . SUCCEEDED | 0 | true | nextflow.processor.TaskStatus . COMPLETED | 0 | true
701+ JobStatus.State . FAILED | TaskStatus.State . FAILED | 1 | true | nextflow.processor.TaskStatus . COMPLETED | 1 | true
702+ JobStatus.State . RUNNING | TaskStatus.State . RUNNING | null | true | nextflow.processor.TaskStatus . RUNNING | Integer . MAX_VALUE | false
703+ JobStatus.State . SUCCEEDED | TaskStatus.State . SUCCEEDED | 0 | false | nextflow.processor.TaskStatus . COMPLETED | 0 | true
704+ JobStatus.State . FAILED | TaskStatus.State . FAILED | 1 | false | nextflow.processor.TaskStatus . COMPLETED | 1 | true
705+ JobStatus.State . RUNNING | TaskStatus.State . RUNNING | null | false | nextflow.processor.TaskStatus . RUNNING | Integer . MAX_VALUE | false
706+
707+ }
708+
709+ def ' should check if completed from read file' () {
710+ given :
711+ def jobId = ' 1'
712+ def taskId = ' 1'
713+ def client = Mock (BatchClient ){
714+ getTaskInArrayStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE , DESC , EXIT_CODE ): null }
715+ getTaskStatus(jobId, taskId) >> { TASK_STATE ? makeTaskStatus(TASK_STATE , DESC , EXIT_CODE ): null }
716+ getJobStatus(jobId ) >> makeJobStatus(JobStatus.State . FAILED ,DESC )
717+ }
718+ def logging = Mock (BatchLogging )
719+ def executor = Mock (GoogleBatchExecutor ){
720+ getLogging() >> logging
721+ }
722+ def task = new TaskRun ()
723+ task. name = ' hello'
724+ def handler = Spy (new GoogleBatchTaskHandler (jobId : jobId, taskId : taskId, client : client, task : task, isArrayChild : ARRAY_CHILD , status : nextflow.processor.TaskStatus . RUNNING , executor : executor))
725+ when :
726+ def result = handler. checkIfCompleted()
727+ then :
728+ 1 * handler. readExitFile() >> EXIT_STATUS
729+ handler. status == TASK_STATUS
730+ handler. task. exitStatus == EXIT_STATUS
731+ handler. task. error?. message == TASK_ERROR
732+ result == RESULT
733+
734+ where :
735+ TASK_STATE | DESC | EXIT_CODE | ARRAY_CHILD | TASK_STATUS | EXIT_STATUS | RESULT | TASK_ERROR
736+ TaskStatus.State . FAILED | " Error" | null | true | nextflow.processor.TaskStatus . COMPLETED | 0 | true | null
737+ null | " Error" | null | true | nextflow.processor.TaskStatus . COMPLETED | 1 | true | null
738+ TaskStatus.State . FAILED | ' Task failed due to Spot VM preemption with exit code 50001.' | 50001 | true | nextflow.processor.TaskStatus . COMPLETED | Integer . MAX_VALUE | true | ' Task failed due to Spot VM preemption with exit code 50001.'
739+ TaskStatus.State . FAILED | " Error" | null | false | nextflow.processor.TaskStatus . COMPLETED | 0 | true | null
740+ null | " Error" | null | false | nextflow.processor.TaskStatus . COMPLETED | 1 | true | null
741+ TaskStatus.State . FAILED | ' Task failed due to Spot VM preemption with exit code 50001.' | 50001 | false | nextflow.processor.TaskStatus . COMPLETED | Integer . MAX_VALUE | true | ' Task failed due to Spot VM preemption with exit code 50001.'
742+ }
743+
668744}
0 commit comments