Skip to content

Commit f289382

Browse files
authored
BT-732 Checksum validation for blobs read by engine (#6838)
* Draft support for optional FileHash * Draft getMd5 for BlobPath * Resolve non-parallel IO to fix tests * Checksum validation for BlobPath * Nicer error message * Test for missing Blob hash * Break attr acquisition into separate method * Cleanup, comments * In-progress tests of blob hash command * Remove test * Remove unused import
1 parent 54fed3e commit f289382

File tree

3 files changed

+91
-14
lines changed

3 files changed

+91
-14
lines changed

engine/src/main/scala/cromwell/engine/io/nio/NioFlow.scala

Lines changed: 36 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import cromwell.core.path.Path
1212
import cromwell.engine.io.IoActor._
1313
import cromwell.engine.io.RetryableRequestSupport.{isInfinitelyRetryable, isRetryable}
1414
import cromwell.engine.io.{IoAttempts, IoCommandContext, IoCommandStalenessBackpressuring}
15+
import cromwell.filesystems.blob.BlobPath
1516
import cromwell.filesystems.drs.DrsPath
1617
import cromwell.filesystems.gcs.GcsPath
1718
import cromwell.filesystems.s3.S3Path
@@ -128,21 +129,33 @@ class NioFlow(parallelism: Int,
128129

129130
def readFileAndChecksum: IO[String] = {
130131
for {
131-
fileHash <- getHash(command.file)
132+
fileHash <- getStoredHash(command.file)
132133
uncheckedValue <- readFile
133-
checksumResult <- checkHash(uncheckedValue, fileHash)
134+
checksumResult <- fileHash match {
135+
case Some(hash) => checkHash(uncheckedValue, hash)
136+
// If there is no stored checksum, don't attempt to validate.
137+
// If the missing checksum is itself an error condition, that
138+
// should be detected by the code that gets the FileHash.
139+
case None => IO.pure(ChecksumSkipped())
140+
}
134141
verifiedValue <- checksumResult match {
135142
case _: ChecksumSkipped => IO.pure(uncheckedValue)
136143
case _: ChecksumSuccess => IO.pure(uncheckedValue)
137144
case failure: ChecksumFailure => IO.raiseError(
138145
ChecksumFailedException(
139-
s"Failed checksum for '${command.file}'. Expected '${fileHash.hashType}' hash of '${fileHash.hash}'. Calculated hash '${failure.calculatedHash}'"))
146+
fileHash match {
147+
case Some(hash) => s"Failed checksum for '${command.file}'. Expected '${hash.hashType}' hash of '${hash.hash}'. Calculated hash '${failure.calculatedHash}'"
148+
case None => s"Failed checksum for '${command.file}'. Couldn't find stored file hash." // This should never happen
149+
}
150+
)
151+
)
140152
}
141153
} yield verifiedValue
142154
}
143155

144156
val fileContentIo = command.file match {
145-
case _: DrsPath => readFileAndChecksum
157+
case _: DrsPath => readFileAndChecksum
158+
case _: BlobPath => readFileAndChecksum
146159
case _ => readFile
147160
}
148161
fileContentIo.map(_.replaceAll("\\r\\n", "\\\n"))
@@ -153,19 +166,27 @@ class NioFlow(parallelism: Int,
153166
}
154167

155168
private def hash(hash: IoHashCommand): IO[String] = {
156-
getHash(hash.file).map(_.hash)
169+
// If there is no hash accessible from the file storage system,
170+
// we'll read the file and generate the hash ourselves.
171+
getStoredHash(hash.file).flatMap {
172+
case Some(storedHash) => IO.pure(storedHash)
173+
case None => generateMd5FileHashForPath(hash.file)
174+
}.map(_.hash)
157175
}
158176

159-
private def getHash(file: Path): IO[FileHash] = {
177+
private def getStoredHash(file: Path): IO[Option[FileHash]] = {
160178
file match {
161-
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath)
179+
case gcsPath: GcsPath => getFileHashForGcsPath(gcsPath).map(Option(_))
180+
case blobPath: BlobPath => getFileHashForBlobPath(blobPath)
162181
case drsPath: DrsPath => IO {
182+
// We assume all DRS files have a stored hash; this will throw
183+
// if the file does not.
163184
drsPath.getFileHash
164-
}
185+
}.map(Option(_))
165186
case s3Path: S3Path => IO {
166-
FileHash(HashType.S3Etag, s3Path.eTag)
187+
Option(FileHash(HashType.S3Etag, s3Path.eTag))
167188
}
168-
case path => getMd5FileHashForPath(path)
189+
case _ => IO.pure(None)
169190
}
170191
}
171192

@@ -201,7 +222,11 @@ class NioFlow(parallelism: Int,
201222
gcsPath.objectBlobId.map(id => FileHash(HashType.GcsCrc32c, gcsPath.cloudStorage.get(id).getCrc32c))
202223
}
203224

204-
private def getMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
225+
private def getFileHashForBlobPath(blobPath: BlobPath): IO[Option[FileHash]] = delayedIoFromTry {
226+
blobPath.md5HexString.map(md5 => md5.map(FileHash(HashType.Md5, _)))
227+
}
228+
229+
private def generateMd5FileHashForPath(path: Path): IO[FileHash] = delayedIoFromTry {
205230
tryWithResource(() => path.newInputStream) { inputStream =>
206231
FileHash(HashType.Md5, org.apache.commons.codec.digest.DigestUtils.md5Hex(inputStream))
207232
}

engine/src/test/scala/cromwell/engine/io/nio/NioFlowSpec.scala

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ import org.mockito.Mockito.{times, verify, when}
2020
import org.scalatest.flatspec.AsyncFlatSpecLike
2121
import org.scalatest.matchers.should.Matchers
2222
import common.mock.MockSugar
23+
import cromwell.filesystems.blob.BlobPath
2324

2425
import java.nio.file.NoSuchFileException
2526
import java.util.UUID
2627
import scala.concurrent.ExecutionContext
2728
import scala.concurrent.duration._
2829
import scala.language.postfixOps
29-
import scala.util.Failure
30+
import scala.util.{Failure, Success, Try}
3031
import scala.util.control.NoStackTrace
3132

3233
class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with MockSugar {
@@ -127,6 +128,23 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
127128
}
128129
}
129130

131+
it should "get hash from a BlobPath when stored hash exists" in {
132+
val testPath = mock[BlobPath]
133+
val hashString = "2d01d5d9c24034d54fe4fba0ede5182d" // echo "hello there" | md5sum
134+
testPath.md5HexString returns Try(Option(hashString))
135+
136+
val context = DefaultCommandContext(hashCommand(testPath).get, replyTo)
137+
val testSource = Source.single(context)
138+
139+
val stream = testSource.via(flow).toMat(readSink)(Keep.right)
140+
141+
stream.run() map {
142+
case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == hashString)
143+
case (ack, _) =>
144+
fail(s"read returned an unexpected message:\n$ack\n\n")
145+
}
146+
}
147+
130148
it should "fail if DrsPath hash doesn't match checksum" in {
131149
val testPath = mock[DrsPath]
132150
when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext])).thenReturn("hello".getBytes)
@@ -171,6 +189,25 @@ class NioFlowSpec extends TestKitSuite with AsyncFlatSpecLike with Matchers with
171189
}
172190
}
173191

192+
it should "succeed if a BlobPath is missing a stored hash" in {
193+
val testPath = mock[BlobPath]
194+
when(testPath.limitFileContent(any[Option[Int]], any[Boolean])(any[ExecutionContext]))
195+
.thenReturn("hello there".getBytes)
196+
when(testPath.md5HexString)
197+
.thenReturn(Success(None))
198+
199+
val context = DefaultCommandContext(contentAsStringCommand(testPath, Option(100), failOnOverflow = true).get, replyTo)
200+
val testSource = Source.single(context)
201+
202+
val stream = testSource.via(flow).toMat(readSink)(Keep.right)
203+
204+
stream.run() map {
205+
case (success: IoSuccess[_], _) => assert(success.result.asInstanceOf[String] == "hello there")
206+
case (ack, _) =>
207+
fail(s"read returned an unexpected message:\n$ack\n\n")
208+
}
209+
}
210+
174211
it should "copy Nio paths" in {
175212
val testPath = DefaultPathBuilder.createTempFile()
176213
val testCopyPath = testPath.sibling(UUID.randomUUID().toString)

filesystems/blob/src/main/scala/cromwell/filesystems/blob/BlobPathBuilder.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
package cromwell.filesystems.blob
22

33
import com.azure.core.credential.AzureSasCredential
4-
import com.azure.storage.blob.nio.AzureFileSystem
4+
import com.azure.storage.blob.nio.{AzureBlobFileAttributes, AzureFileSystem}
55
import com.google.common.net.UrlEscapers
66
import cromwell.core.path.{NioPath, Path, PathBuilder}
77
import cromwell.filesystems.blob.BlobPathBuilder._
88

99
import java.net.{MalformedURLException, URI}
10-
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems}
10+
import java.nio.file.{FileSystem, FileSystemNotFoundException, FileSystems, Files}
1111
import scala.jdk.CollectionConverters._
1212
import scala.language.postfixOps
1313
import scala.util.{Failure, Try}
@@ -90,4 +90,19 @@ case class BlobPath private[blob](nioPath: NioPath, endpoint: String, container:
9090
override def pathAsString: String = List(endpoint, container, nioPath.toString()).mkString("/")
9191

9292
override def pathWithoutScheme: String = parseURI(endpoint).getHost + "/" + container + "/" + nioPath.toString()
93+
94+
def blobFileAttributes: Try[AzureBlobFileAttributes] =
95+
Try(Files.readAttributes(nioPath, classOf[AzureBlobFileAttributes]))
96+
97+
def md5HexString: Try[Option[String]] = {
98+
blobFileAttributes.map(h =>
99+
Option(h.blobHttpHeaders().getContentMd5) match {
100+
case None => None
101+
case Some(arr) if arr.isEmpty => None
102+
// Convert the bytes to a hex-encoded string. Note that this value
103+
// is rendered in base64 in the Azure web portal.
104+
case Some(bytes) => Option(bytes.map("%02x".format(_)).mkString)
105+
}
106+
)
107+
}
93108
}

0 commit comments

Comments
 (0)