Skip to content

Commit fdf3921

Browse files
committed
use CSVPrinter to generate DRS localizer manifest (and add a test)
1 parent 6509af9 commit fdf3921

File tree

2 files changed

+67
-11
lines changed

2 files changed

+67
-11
lines changed

supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActor.scala

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ import cromwell.filesystems.drs.DrsPath
1616
import cromwell.filesystems.gcs.GcsPathBuilder.ValidFullGcsPath
1717
import cromwell.filesystems.gcs.{GcsPath, GcsPathBuilder}
1818
import org.apache.commons.codec.digest.DigestUtils
19+
import org.apache.commons.csv.{CSVFormat, CSVPrinter}
20+
import org.apache.commons.io.output.ByteArrayOutputStream
1921
import wom.core.FullyQualifiedName
2022
import wom.expression.FileEvaluation
2123
import wom.values.{GlobFunctions, WomFile, WomGlobFile, WomMaybeListedDirectory, WomMaybePopulatedFile, WomSingleFile, WomUnlistedDirectory}
2224

23-
import java.io.FileNotFoundException
25+
import java.nio.charset.Charset
26+
27+
import java.io.{FileNotFoundException, OutputStreamWriter}
2428
import scala.concurrent.Future
2529
import scala.io.Source
2630
import scala.language.postfixOps
@@ -175,16 +179,6 @@ class PipelinesApiAsyncBackendJobExecutionActor(standardParams: StandardAsyncExe
175179

176180
import mouse.all._
177181

178-
private def generateDrsLocalizerManifest(inputs: List[PipelinesApiInput]): String = {
179-
inputs.collect {
180-
case PipelinesApiFileInput(name, drsPath: DrsPath, relativeHostPath, mount) => {
181-
val drsPathStr = drsPath.pathAsString
182-
val containerPathStr = PipelinesApiFileInput(name, drsPath, relativeHostPath, mount).containerPath.pathAsString
183-
s"\"$drsPathStr\",\"$containerPathStr\""
184-
}
185-
}.mkString("\n")
186-
}
187-
188182
override def uploadDrsLocalizationManifest(createPipelineParameters: CreatePipelineParameters, cloudPath: Path): Future[Unit] = {
189183
val content = generateDrsLocalizerManifest(createPipelineParameters.inputOutputParameters.fileInputParameters)
190184
asyncIo.writeAsync(cloudPath, content, Seq(CloudStorageOptions.withMimeType("text/plain")))
@@ -412,4 +406,17 @@ object PipelinesApiAsyncBackendJobExecutionActor {
412406
}
413407
} combineAll
414408
}
409+
410+
private [v2beta] def generateDrsLocalizerManifest(inputs: List[PipelinesApiInput]): String = {
411+
val outputStream = new ByteArrayOutputStream()
412+
val csvPrinter = new CSVPrinter(new OutputStreamWriter(outputStream), CSVFormat.DEFAULT)
413+
val drsFileInputs = inputs collect {
414+
case drsInput@PipelinesApiFileInput(_, drsPath: DrsPath, _, _) => (drsInput, drsPath)
415+
}
416+
drsFileInputs foreach { case (drsInput, drsPath) =>
417+
csvPrinter.printRecord(drsPath.pathAsString, drsInput.containerPath.pathAsString)
418+
}
419+
csvPrinter.close(true)
420+
outputStream.toString(Charset.defaultCharset())
421+
}
415422
}

supportedBackends/google/pipelines/v2beta/src/test/scala/cromwell/backend/google/pipelines/v2beta/PipelinesApiAsyncBackendJobExecutionActorSpec.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,22 @@ package cromwell.backend.google.pipelines.v2beta
22

33
import java.nio.file.Paths
44
import cats.data.NonEmptyList
5+
import cloud.nio.impl.drs.DrsCloudNioFileProvider.DrsReadInterpreter
6+
import cloud.nio.impl.drs.{DrsCloudNioFileSystemProvider, GoogleDrsCredentials}
7+
import com.google.cloud.NoCredentials
8+
import com.typesafe.config.{Config, ConfigFactory}
59
import common.assertion.CromwellTimeoutSpec
610
import common.mock.MockSugar
711
import cromwell.backend.google.pipelines.common.PipelinesApiFileInput
12+
import cromwell.backend.google.pipelines.common.io.{DiskType, PipelinesApiWorkingDisk}
813
import cromwell.core.path.DefaultPathBuilder
14+
import cromwell.filesystems.drs.DrsPathBuilder
915
import org.mockito.Mockito._
1016
import org.scalatest.flatspec.AnyFlatSpec
1117
import org.scalatest.matchers.should.Matchers
1218

19+
import scala.concurrent.duration.DurationInt
20+
1321
class PipelinesApiAsyncBackendJobExecutionActorSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers
1422
with MockSugar {
1523
behavior of "PipelinesParameterConversions"
@@ -44,4 +52,45 @@ class PipelinesApiAsyncBackendJobExecutionActorSpec extends AnyFlatSpec with Cro
4452

4553
PipelinesApiAsyncBackendJobExecutionActor.groupParametersByGcsBucket(inputs) shouldEqual expected
4654
}
55+
56+
it should "generate a CSV manifest for DRS inputs, ignoring non-DRS inputs" in {
57+
def makeDrsPathBuilder: DrsPathBuilder = {
58+
val marthaConfig: Config = ConfigFactory.parseString(
59+
"""martha {
60+
| url = "http://martha-url"
61+
|}
62+
|""".stripMargin
63+
)
64+
65+
val fakeCredentials = NoCredentials.getInstance
66+
67+
val drsReadInterpreter: DrsReadInterpreter = (_, _) =>
68+
throw new UnsupportedOperationException("PipelinesApiAsyncBackendJobExecutionActorSpec doesn't need to use drs read interpreter.")
69+
70+
DrsPathBuilder(
71+
new DrsCloudNioFileSystemProvider(marthaConfig, GoogleDrsCredentials(fakeCredentials, 1.minutes), drsReadInterpreter),
72+
None,
73+
)
74+
}
75+
76+
val mount = PipelinesApiWorkingDisk(DiskType.LOCAL, 1)
77+
78+
def makeDrsInput(name: String, drsUri: String, containerPath: String): PipelinesApiFileInput = {
79+
val drsPath = makeDrsPathBuilder.build(drsUri).get
80+
val containerRelativePath = DefaultPathBuilder.get(containerPath)
81+
PipelinesApiFileInput(name, drsPath, containerRelativePath, mount)
82+
}
83+
84+
val nonDrsInput: PipelinesApiFileInput = PipelinesApiFileInput("nnn",
85+
DefaultPathBuilder.get("/local/nnn.bai"), DefaultPathBuilder.get("/path/to/nnn.bai"), mount)
86+
87+
val inputs = List(
88+
makeDrsInput("aaa", "drs://drs.example.org/aaa", "path/to/aaa.bai"),
89+
nonDrsInput,
90+
makeDrsInput("bbb", "drs://drs.example.org/bbb", "path/to/bbb.bai")
91+
)
92+
93+
PipelinesApiAsyncBackendJobExecutionActor.generateDrsLocalizerManifest(inputs) shouldEqual
94+
"drs://drs.example.org/aaa,/cromwell_root/path/to/aaa.bai\r\ndrs://drs.example.org/bbb,/cromwell_root/path/to/bbb.bai\r\n"
95+
}
4796
}

0 commit comments

Comments
 (0)