Skip to content

Commit

Permalink
Defend against over-large stderr files (broadinstitute#4952)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjllanwarne authored May 10, 2019
1 parent 1b8e5b3 commit 6018d7c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package cromwell.core.path

import java.io.InputStream
import java.io.{BufferedReader, IOException, InputStream, InputStreamReader}
import java.nio.file.{FileAlreadyExistsException, Files}
import java.nio.file.attribute.{PosixFilePermission, PosixFilePermissions}

import better.files.File.OpenOptions
import cromwell.util.TryWithResource.tryWithResource

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.io.Codec
import scala.util.Failure

/**
* Implements methods beyond those implemented in NioPathMethods and BetterFileMethods
Expand Down Expand Up @@ -98,4 +100,40 @@ trait EvenBetterPathMethods {
locally(ec)
write(content)(openOptions, Codec.UTF8)
}

/*
* The input stream will be closed when this method returns, which means the f function
* cannot leak an open stream.
*/
def withReader[A](f: BufferedReader => A)(implicit ec: ExecutionContext): A = {

// Use an input reader to convert the byte stream to character stream. Buffered reader for efficiency.
tryWithResource(() => new BufferedReader(new InputStreamReader(this.mediaInputStream, Codec.UTF8.name)))(f).recoverWith({
case failure => Failure(new IOException(s"Could not read from ${this.pathAsString}: ${failure.getMessage}", failure))
}).get
}

/**
* Returns an Array[Byte] from a Path. Limit the array size to "limit" byte if defined.
* @throws IOException if failOnOverflow is true and the file is larger than limit
*/
def limitFileContent(limit: Option[Int], failOnOverflow: Boolean)(implicit ec: ExecutionContext) = withReader { reader =>
val bytesIterator = Iterator.continually(reader.read).takeWhile(_ != -1).map(_.toByte)
// Take 1 more than the limit so that we can look at the size and know if it's overflowing
val bytesArray = limit.map(l => bytesIterator.take(l + 1)).getOrElse(bytesIterator).toArray

limit match {
case Some(l) if failOnOverflow && bytesArray.length > l =>
throw new IOException(s"File $this is larger than $l Bytes. Maximum read limits can be adjusted in the configuration under system.input-read-limits.")
case Some(l) => bytesArray.take(l)
case _ => bytesArray
}
}

/**
* Reads the first limitBytes of a file and makes a String. Prepend with an annotation at the start (to say that this is the
* first n bytes).
*/
def annotatedContentAsStringWithLimit(limitBytes: Int)(implicit ec: ExecutionContext): String =
s"[First $limitBytes bytes]:" + new String(limitFileContent(Option(limitBytes), failOnOverflow = false))
}
37 changes: 3 additions & 34 deletions engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import cromwell.util.TryWithResource._

import scala.concurrent.ExecutionContext
import scala.io.Codec
import scala.util.Failure
object NioFlow {
def NoopOnRetry(context: IoCommandContext[_])(failure: Throwable) = ()
}
Expand Down Expand Up @@ -98,7 +97,7 @@ class NioFlow(parallelism: Int,

private def readAsString(read: IoContentAsStringCommand) = IO {
new String(
limitFileContent(read.file, read.options.maxBytes, read.options.failOnOverflow),
read.file.limitFileContent(read.options.maxBytes, read.options.failOnOverflow),
StandardCharsets.UTF_8
)
}
Expand Down Expand Up @@ -130,7 +129,7 @@ class NioFlow(parallelism: Int,
}

private def readLines(exists: IoReadLinesCommand) = IO {
withReader(exists.file) { reader =>
exists.file.withReader { reader =>
Stream.continually(reader.readLine()).takeWhile(_ != null).toList
}
}
Expand All @@ -141,50 +140,20 @@ class NioFlow(parallelism: Int,

private def createDirectories(path: Path) = path.parent.createDirectories()

/*
* The input stream will be closed when this method returns, which means the f function
* cannot leak an open stream.
*/
private def withReader[A](file: Path)(f: BufferedReader => A): A = {

// Use an input reader to convert the byte stream to character stream. Buffered reader for efficiency.
tryWithResource(() => new BufferedReader(new InputStreamReader(file.mediaInputStream, Codec.UTF8.name)))(f).recoverWith({
case failure => Failure(new IOException(s"Could not read from ${file.pathAsString}: ${failure.getMessage}", failure))
}).get
}

/**
* Returns an Array[Byte] from a Path. Limit the array size to "limit" byte if defined.
* @throws IOException if failOnOverflow is true and the file is larger than limit
*/
private def limitFileContent(file: Path, limit: Option[Int], failOnOverflow: Boolean) = withReader(file) { reader =>
val bytesIterator = Iterator.continually(reader.read).takeWhile(_ != -1).map(_.toByte)
// Take 1 more than the limit so that we can look at the size and know if it's overflowing
val bytesArray = limit.map(l => bytesIterator.take(l + 1)).getOrElse(bytesIterator).toArray

limit match {
case Some(l) if failOnOverflow && bytesArray.length > l =>
throw new IOException(s"File $file is larger than $l Bytes. Maximum read limits can be adjusted in the configuration under system.input-read-limits.")
case Some(l) => bytesArray.take(l)
case _ => bytesArray
}
}

private def getFileHashForDrsPath(drsPath: DrsPath): IO[String] = {
val drsFileSystemProvider = drsPath.drsPath.getFileSystem.provider.asInstanceOf[DrsCloudNioFileSystemProvider]

//Since this does not actually do any IO work it is not wrapped in IO
val fileAttributesOption = drsFileSystemProvider.fileProvider.fileAttributes(drsPath.drsPath.cloudHost, drsPath.drsPath.cloudPath)

fileAttributesOption match {
case Some(fileAttributes) => {
case Some(fileAttributes) =>
val fileHashIO = IO { fileAttributes.fileHash }

fileHashIO.flatMap({
case Some(fileHash) => IO.pure(fileHash)
case None => IO.raiseError(new IOException(s"Error while resolving DRS path $drsPath. The response from Martha doesn't contain the 'md5' hash for the file."))
})
}
case None => IO.raiseError(new IOException(s"Error getting file hash of DRS path $drsPath. Reason: File attributes class DrsCloudNioRegularFileAttributes wasn't defined in DrsCloudNioFileProvider."))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import cromwell.webservice.EngineStatsActor
import net.ceedubs.ficus.Ficus._
import org.apache.commons.lang3.exception.ExceptionUtils

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.Try
Expand Down Expand Up @@ -321,11 +322,13 @@ class WorkflowManagerActor(params: WorkflowManagerActorParams)

private def expandFailureReasons(reasons: Seq[Throwable]): String = {

implicit val ec: ExecutionContext = context.dispatcher

reasons map {
case reason: ThrowableAggregation => expandFailureReasons(reason.throwables.toSeq)
case reason: KnownJobFailureException =>
val stderrMessage = reason.stderrPath map { path =>
val content = Try(path.contentAsString).recover({
val content = Try(path.annotatedContentAsStringWithLimit(300)).recover({
case e => s"Could not retrieve content: ${e.getMessage}"
}).get
s"\nCheck the content of stderr for potential additional information: ${path.pathAsString}.\n $content"
Expand Down

0 comments on commit 6018d7c

Please sign in to comment.