@@ -1416,7 +1416,7 @@ trait StandardAsyncExecutionActor
14161416 ): Future [ExecutionHandle ] = {
14171417
14181418 // Returns true if the task has written an RC file that indicates OOM, false otherwise
1419- def memoryRetryRC : Future [Boolean ] = {
1419+ def memoryRetryRC : Future [( Boolean , Option [ Path ]) ] = {
14201420
14211421 def readFile (path : Path , maxBytes : Option [Int ]): Future [String ] =
14221422 asyncIo.contentAsStringAsync(path, maxBytes, failOnOverflow = false )
@@ -1439,22 +1439,33 @@ trait StandardAsyncExecutionActor
14391439 }
14401440
14411441 def checkMemoryRetryStderr (errorKeys : List [String ], maxBytes : Int ): Future [Boolean ] =
1442- readFile(jobPaths.standardPaths.error , Option (maxBytes)) map { errorContent =>
1442+ readFile(jobPaths.memoryRetryError , Option (maxBytes)) map { errorContent =>
14431443 errorKeys.exists(errorContent.contains)
14441444 }
14451445
1446- asyncIo.existsAsync(jobPaths.memoryRetryRC) flatMap {
1447- case true => checkMemoryRetryRC()
1448- case false =>
1449- (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1450- case (Some (keys), Some (limit)) =>
1451- asyncIo.existsAsync(jobPaths.standardPaths.error) flatMap {
1452- case true => checkMemoryRetryStderr(keys, limit)
1453- case false => Future .successful(false )
1454- }
1455- case _ => Future .successful(false )
1456- }
1457- }
1446+ def checkMemoryRetryError (): Future [Boolean ] =
1447+ (memoryRetryErrorKeys, memoryRetryStderrLimit) match {
1448+ case (Some (keys), Some (limit)) =>
1449+ for {
1450+ memoryRetryErrorExists <- asyncIo.existsAsync(jobPaths.memoryRetryError)
1451+ memoryRetryErrorFound <-
1452+ if (memoryRetryErrorExists) checkMemoryRetryStderr(keys, limit) else Future .successful(false )
1453+ } yield memoryRetryErrorFound
1454+ case _ => Future .successful(false )
1455+ }
1456+
1457+ // For backwards behavioral compatibility, check for the old memory retry RC file first. That file used to catch
1458+ // the errors from the standard error file, but now sometimes the error is written to a separate log file.
1459+ // If it exists, check its contents. If it doesn't find an OOM code, check the new memory retry error file.
1460+ for {
1461+ memoryRetryRCExists <- asyncIo.existsAsync(jobPaths.memoryRetryRC)
1462+ memoryRetryRCErrorFound <- if (memoryRetryRCExists) checkMemoryRetryRC() else Future .successful(false )
1463+ memoryRetryErrorFound <- if (memoryRetryRCErrorFound) Future .successful(true ) else checkMemoryRetryError()
1464+ memoryErrorPathOption =
1465+ if (memoryRetryRCErrorFound) Option (jobPaths.standardPaths.error)
1466+ else if (memoryRetryErrorFound) Option (jobPaths.memoryRetryError)
1467+ else None
1468+ } yield (memoryRetryErrorFound, memoryErrorPathOption)
14581469 }
14591470
14601471 val stderr = jobPaths.standardPaths.error
@@ -1465,74 +1476,76 @@ trait StandardAsyncExecutionActor
14651476 // Only check stderr size if we need to, otherwise this results in a lot of unnecessary I/O that
14661477 // may fail due to race conditions on quickly-executing jobs.
14671478 stderrSize <- if (failOnStdErr) asyncIo.sizeAsync(stderr) else Future .successful(0L )
1468- outOfMemoryDetected <- memoryRetryRC
1469- } yield (stderrSize, returnCodeAsString, outOfMemoryDetected)
1470-
1471- stderrSizeAndReturnCodeAndMemoryRetry flatMap { case (stderrSize, returnCodeAsString, outOfMemoryDetected) =>
1472- val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1473-
1474- if (isDone(status)) {
1475- tryReturnCodeAsInt match {
1476- case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1477- val executionHandle = Future .successful(
1478- FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1479- Option (returnCodeAsInt),
1480- None
1479+ (outOfMemoryDetected, outOfMemoryPathOption) <- memoryRetryRC
1480+ } yield (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption)
1481+
1482+ stderrSizeAndReturnCodeAndMemoryRetry flatMap {
1483+ case (stderrSize, returnCodeAsString, outOfMemoryDetected, outOfMemoryPathOption) =>
1484+ val tryReturnCodeAsInt = Try (returnCodeAsString.trim.toInt)
1485+
1486+ if (isDone(status)) {
1487+ tryReturnCodeAsInt match {
1488+ case Success (returnCodeAsInt) if failOnStdErr && stderrSize.intValue > 0 =>
1489+ val executionHandle = Future .successful(
1490+ FailedNonRetryableExecutionHandle (StderrNonEmpty (jobDescriptor.key.tag, stderrSize, stderrAsOption),
1491+ Option (returnCodeAsInt),
1492+ None
1493+ )
14811494 )
1482- )
1483- retryElseFail(executionHandle)
1484- case Success (returnCodeAsInt) if continueOnReturnCode.continueFor( returnCodeAsInt) =>
1485- handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1486- // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1487- // if it was caused by OOM killer, want to handle as OOM and not job abort.
1488- case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1489- val executionHandle = Future .successful (
1490- FailedNonRetryableExecutionHandle (
1491- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log ),
1492- Option (returnCodeAsInt),
1493- None
1495+ retryElseFail(executionHandle )
1496+ case Success (returnCodeAsInt) if continueOnReturnCode.continueFor(returnCodeAsInt) =>
1497+ handleExecutionSuccess(status, oldHandle, returnCodeAsInt)
1498+ // It's important that we check retryWithMoreMemory case before isAbort. RC could be 137 in either case;
1499+ // if it was caused by OOM killer, want to handle as OOM and not job abort.
1500+ case Success (returnCodeAsInt) if outOfMemoryDetected && memoryRetryRequested =>
1501+ val executionHandle = Future .successful(
1502+ FailedNonRetryableExecutionHandle (
1503+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1504+ Option (returnCodeAsInt ),
1505+ None
1506+ )
14941507 )
1495- )
1496- retryElseFail(executionHandle,
1497- MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1498- )
1499- case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1500- Future .successful(AbortedExecutionHandle )
1501- case Success (returnCodeAsInt) =>
1502- val executionHandle = Future .successful(
1503- FailedNonRetryableExecutionHandle (WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1504- Option (returnCodeAsInt),
1505- None
1508+ retryElseFail(executionHandle,
1509+ MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
15061510 )
1507- )
1508- retryElseFail(executionHandle)
1509- case Failure (_) =>
1510- Future .successful(
1511- FailedNonRetryableExecutionHandle (
1512- ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1513- kvPairsToSave = None
1511+ case Success (returnCodeAsInt) if isAbort(returnCodeAsInt) =>
1512+ Future .successful(AbortedExecutionHandle )
1513+ case Success (returnCodeAsInt) =>
1514+ val executionHandle = Future .successful(
1515+ FailedNonRetryableExecutionHandle (
1516+ WrongReturnCode (jobDescriptor.key.tag, returnCodeAsInt, stderrAsOption),
1517+ Option (returnCodeAsInt),
1518+ None
1519+ )
15141520 )
1515- )
1516- }
1517- } else {
1518- tryReturnCodeAsInt match {
1519- case Success (returnCodeAsInt)
1520- if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor(returnCodeAsInt) =>
1521- val executionHandle = Future .successful(
1522- FailedNonRetryableExecutionHandle (
1523- RetryWithMoreMemory (jobDescriptor.key.tag, stderrAsOption, memoryRetryErrorKeys, log),
1524- Option (returnCodeAsInt),
1525- None
1521+ retryElseFail(executionHandle)
1522+ case Failure (_) =>
1523+ Future .successful(
1524+ FailedNonRetryableExecutionHandle (
1525+ ReturnCodeIsNotAnInt (jobDescriptor.key.tag, returnCodeAsString, stderrAsOption),
1526+ kvPairsToSave = None
1527+ )
15261528 )
1527- )
1528- retryElseFail(executionHandle,
1529- MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1530- )
1531- case _ =>
1532- val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1533- retryElseFail(failureStatus)
1529+ }
1530+ } else {
1531+ tryReturnCodeAsInt match {
1532+ case Success (returnCodeAsInt)
1533+ if outOfMemoryDetected && memoryRetryRequested && ! continueOnReturnCode.continueFor(returnCodeAsInt) =>
1534+ val executionHandle = Future .successful(
1535+ FailedNonRetryableExecutionHandle (
1536+ RetryWithMoreMemory (jobDescriptor.key.tag, outOfMemoryPathOption, memoryRetryErrorKeys, log),
1537+ Option (returnCodeAsInt),
1538+ None
1539+ )
1540+ )
1541+ retryElseFail(executionHandle,
1542+ MemoryRetryResult (outOfMemoryDetected, memoryRetryFactor, previousMemoryMultiplier)
1543+ )
1544+ case _ =>
1545+ val failureStatus = handleExecutionFailure(status, tryReturnCodeAsInt.toOption)
1546+ retryElseFail(failureStatus)
1547+ }
15341548 }
1535- }
15361549 } recoverWith { case exception =>
15371550 if (isDone(status)) Future .successful(FailedNonRetryableExecutionHandle (exception, kvPairsToSave = None ))
15381551 else {
0 commit comments