Skip to content

Commit 417adfa

Browse files
kraefreiBMurrijgainerdewar
authored
WX-876 Surface TES System Logs to Cromwell when TES backend returns task error status (#6980)
* WX-876 Surface TES System Logs to Cromwell when TES backend returns task error status * Address feedback * Address feedback (#6997) * Address additional feedback (#7000) * Fix copy/paste error (#7005) * Address additional feedback * Fix copy/paste error * Trigger CI --------- Co-authored-by: Blair Murri <BMurri@users.noreply.github.com> Co-authored-by: Janet Gainer-Dewar <jdewar@broadinstitute.org>
1 parent 092059f commit 417adfa

File tree

1 file changed

+42
-3
lines changed

1 file changed

+42
-3
lines changed

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesAsyncBackendJobExecutionActor.scala

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package cromwell.backend.impl.tes
22

3+
import common.exception.AggregatedMessageException
34
import java.io.FileNotFoundException
45
import java.nio.file.FileAlreadyExistsException
56
import cats.syntax.apply._
@@ -30,6 +31,7 @@ import scala.util.{Failure, Success}
3031

3132
sealed trait TesRunStatus {
3233
def isTerminal: Boolean
34+
def sysLogs: Seq[String] = Seq.empty[String]
3335
}
3436

3537
case object Running extends TesRunStatus {
@@ -40,8 +42,14 @@ case object Complete extends TesRunStatus {
4042
def isTerminal = true
4143
}
4244

43-
case object FailedOrError extends TesRunStatus {
45+
case class Error(override val sysLogs: Seq[String] = Seq.empty[String]) extends TesRunStatus {
4446
def isTerminal = true
47+
override def toString = "SYSTEM_ERROR"
48+
}
49+
50+
case class Failed(override val sysLogs: Seq[String] = Seq.empty[String]) extends TesRunStatus {
51+
def isTerminal = true
52+
override def toString = "EXECUTOR_ERROR"
4553
}
4654

4755
case object Cancelled extends TesRunStatus {
@@ -217,6 +225,21 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
217225
override def requestsAbortAndDiesImmediately: Boolean = false
218226

219227
override def pollStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = {
228+
for {
229+
status <- queryStatusAsync(handle)
230+
errorLog <- status match {
231+
case Error(_) | Failed(_) => getErrorLogs(handle)
232+
case _ => Future.successful(Seq.empty[String])
233+
}
234+
statusWithLog = status match {
235+
case Error(_) => Error(errorLog)
236+
case Failed(_) => Failed(errorLog)
237+
case _ => status
238+
}
239+
} yield statusWithLog
240+
}
241+
242+
private def queryStatusAsync(handle: StandardAsyncPendingExecutionHandle): Future[TesRunStatus] = {
220243
makeRequest[MinimalTaskView](HttpRequest(uri = s"$tesEndpoint/${handle.pendingJob.jobId}?view=MINIMAL")) map {
221244
response =>
222245
val state = response.state
@@ -229,24 +252,40 @@ class TesAsyncBackendJobExecutionActor(override val standardParams: StandardAsyn
229252
jobLogger.info(s"Job ${handle.pendingJob.jobId} was canceled")
230253
Cancelled
231254

232-
case s if s.contains("ERROR") =>
255+
case s if s.contains("EXECUTOR_ERROR") =>
256+
jobLogger.info(s"TES reported a failure for Job ${handle.pendingJob.jobId}: '$s'")
257+
Failed()
258+
259+
case s if s.contains("SYSTEM_ERROR") =>
233260
jobLogger.info(s"TES reported an error for Job ${handle.pendingJob.jobId}: '$s'")
234-
FailedOrError
261+
Error()
235262

236263
case _ => Running
237264
}
238265
}
239266
}
240267

268+
private def getErrorLogs(handle: StandardAsyncPendingExecutionHandle): Future[Seq[String]] = {
269+
makeRequest[Task](HttpRequest(uri = s"$tesEndpoint/${handle.pendingJob.jobId}?view=FULL")) map { response =>
270+
response.logs.flatMap(_.lastOption).flatMap(_.system_logs).getOrElse(Seq.empty[String])
271+
}
272+
}
273+
241274
override def customPollStatusFailure: PartialFunction[(ExecutionHandle, Exception), ExecutionHandle] = {
242275
case (oldHandle: StandardAsyncPendingExecutionHandle@unchecked, e: Exception) =>
243276
jobLogger.error(s"$tag TES Job ${oldHandle.pendingJob.jobId} has not been found, failing call")
244277
FailedNonRetryableExecutionHandle(e, kvPairsToSave = None)
245278
}
246279

280+
private def handleExecutionError(status: TesRunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = {
281+
val exception = new AggregatedMessageException(s"Task ${jobDescriptor.key.tag} failed for unknown reason: ${status.toString()}", status.sysLogs)
282+
Future.successful(FailedNonRetryableExecutionHandle(exception, returnCode, None))
283+
}
284+
247285
override def handleExecutionFailure(status: StandardAsyncRunState, returnCode: Option[Int]) = {
248286
status match {
249287
case Cancelled => Future.successful(AbortedExecutionHandle)
288+
case Error(_) | Failed(_) => handleExecutionError(status, returnCode)
250289
case _ => super.handleExecutionFailure(status, returnCode)
251290
}
252291
}

0 commit comments

Comments
 (0)