Skip to content

Commit 2a69691

Browse files
authored
[WX-1156] internal_path_prefix for TES 4.4 (#7190)
1 parent 29d3810 commit 2a69691

File tree

3 files changed

+81
-25
lines changed

3 files changed

+81
-25
lines changed

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesJobPaths.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,15 @@ case class TesJobPaths private[tes] (override val workflowPaths: TesWorkflowPath
2828
val callInputsDockerRoot = callDockerRoot.resolve("inputs")
2929
val callInputsRoot = callRoot.resolve("inputs")
3030

31+
/*
32+
* tesTaskRoot: This is the root directory that TES will use for files related to this task.
33+
* We provide it to TES as a k/v pair where the key is "internal_path_prefix" (specified in TesWorkflowOptionKeys.scala)
34+
* and the value is a blob path.
35+
* This is not a standard TES feature, but rather related to the Azure TES implementation that Terra uses.
36+
* While passing it outside of terra won't do any harm, we could consider making this optional and/or configurable.
37+
*/
38+
val tesTaskRoot : Path = callExecutionRoot.resolve("tes_task")
39+
3140
// Given an output path, return a path localized to the storage file system
3241
def storageOutput(path: String): String = {
3342
callExecutionRoot.resolve(path).toString

supportedBackends/tes/src/main/scala/cromwell/backend/impl/tes/TesTask.scala

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
package cromwell.backend.impl.tes
2-
32
import common.collections.EnhancedCollections._
43
import common.util.StringUtil._
54
import cromwell.backend.impl.tes.OutputMode.OutputMode
@@ -71,7 +70,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
7170
path = tesPaths.callExecutionDockerRoot.resolve("script").toString,
7271
`type` = Option("FILE")
7372
)
74-
7573
private def writeFunctionFiles: Map[FullyQualifiedName, Seq[WomFile]] =
7674
instantiatedCommand.createdFiles map { f => f.file.value.md5SumShort -> List(f.file) } toMap
7775

@@ -231,11 +229,6 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
231229
workflowExecutionIdentityOption
232230
)
233231

234-
val resources: Resources = TesTask.makeResources(
235-
runtimeAttributes,
236-
preferedWorkflowExecutionIdentity
237-
)
238-
239232
val executors = Seq(Executor(
240233
image = dockerImageUsed,
241234
command = Seq(jobShell, commandScript.path),
@@ -245,6 +238,12 @@ final case class TesTask(jobDescriptor: BackendJobDescriptor,
245238
stdin = None,
246239
env = None
247240
))
241+
242+
val resources: Resources = TesTask.makeResources(
243+
runtimeAttributes,
244+
preferedWorkflowExecutionIdentity,
245+
Option(tesPaths.tesTaskRoot.pathAsString)
246+
)
248247
}
249248

250249
object TesTask {
@@ -254,15 +253,22 @@ object TesTask {
254253
configIdentity.map(_.value).orElse(workflowOptionsIdentity.map(_.value))
255254
}
256255
def makeResources(runtimeAttributes: TesRuntimeAttributes,
257-
workflowExecutionId: Option[String]): Resources = {
258-
259-
// This was added in BT-409 to let us pass information to an Azure
260-
// TES server about which user identity to run tasks as.
261-
// Note that we validate the type of WorkflowExecutionIdentity
262-
// in TesInitializationActor.
263-
val backendParameters = runtimeAttributes.backendParameters ++
256+
workflowExecutionId: Option[String], internalPathPrefix: Option[String]): Resources = {
257+
/*
258+
* workflowExecutionId: This was added in BT-409 to let us pass information to an Azure
259+
* TES server about which user identity to run tasks as.
260+
* Note that we validate the type of WorkflowExecutionIdentity in TesInitializationActor.
261+
*
262+
* internalPathPrefix: Added in WX-1156 to support the azure TES implementation. Specifies
263+
* a working directory that the TES task can use.
264+
*/
265+
val internalPathPrefixKey = "internal_path_prefix"
266+
val backendParameters : Map[String, Option[String]] = runtimeAttributes.backendParameters ++
264267
workflowExecutionId
265268
.map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(_))
269+
.toMap ++
270+
internalPathPrefix
271+
.map(internalPathPrefixKey -> Option(_))
266272
.toMap
267273
val disk :: ram :: _ = Seq(runtimeAttributes.disk, runtimeAttributes.memory) map {
268274
case Some(x) =>

supportedBackends/tes/src/test/scala/cromwell/backend/impl/tes/TesTaskSpec.scala

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,46 +31,87 @@ class TesTaskSpec
3131
false,
3232
Map.empty
3333
)
34+
val internalPathPrefix = Option("mock/path/to/tes/task")
35+
val expectedTuple = "internal_path_prefix" -> internalPathPrefix
3436

3537
it should "create the correct resources when an identity is passed in WorkflowOptions" in {
3638
val wei = Option("abc123")
37-
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
38-
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
39+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
40+
Resources(None, None, None, Option(false), None,
41+
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
42+
expectedTuple))
3943
)
4044
}
4145

4246
it should "create the correct resources when an empty identity is passed in WorkflowOptions" in {
4347
val wei = Option("")
44-
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
45-
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("")))
48+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
49+
Resources(None, None, None, Option(false), None,
50+
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option(""),
51+
expectedTuple))
4652
)
4753
}
4854

4955
it should "create the correct resources when no identity is passed in WorkflowOptions" in {
5056
val wei = None
51-
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
52-
Resources(None, None, None, Option(false), None, Option(Map.empty[String, Option[String]])
53-
)
57+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
58+
Resources(None, None, None, Option(false), None, Option(Map(expectedTuple)))
5459
}
5560

5661
it should "create the correct resources when an identity is passed in via backend config" in {
5762
val weic = Option(WorkflowExecutionIdentityConfig("abc123"))
5863
val weio = Option(WorkflowExecutionIdentityOption("def456"))
5964
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
60-
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
61-
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123")))
65+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
66+
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
67+
expectedTuple))
6268
)
6369
}
6470

6571
it should "create the correct resources when no identity is passed in via backend config" in {
6672
val weic = None
6773
val weio = Option(WorkflowExecutionIdentityOption("def456"))
6874
val wei = TesTask.getPreferredWorkflowExecutionIdentity(weic, weio)
69-
TesTask.makeResources(runtimeAttributes, wei) shouldEqual
70-
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456")))
75+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
76+
Resources(None, None, None, Option(false), None, Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("def456"),
77+
expectedTuple))
7178
)
7279
}
7380

81+
it should "correctly set the internal path prefix when provided as a backend parameter" in {
82+
val wei = Option("abc123")
83+
val internalPathPrefix = Option("mock/path/to/tes/task")
84+
TesTask.makeResources(runtimeAttributes, wei, internalPathPrefix) shouldEqual
85+
Resources(None, None, None, Option(false), None,
86+
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
87+
"internal_path_prefix" -> internalPathPrefix)
88+
))
89+
}
90+
91+
it should "correctly resolve the path to .../tes_task and add the k/v pair to backend parameters" in {
92+
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))
93+
val workflowDescriptor = buildWdlWorkflowDescriptor(TestWorkflows.HelloWorld,
94+
labels = Labels("foo" -> "bar"))
95+
val jobDescriptor = jobDescriptorFromSingleCallWorkflow(workflowDescriptor,
96+
Map.empty,
97+
emptyWorkflowOptions,
98+
Set.empty)
99+
val tesPaths = TesJobPaths(jobDescriptor.key,
100+
jobDescriptor.workflowDescriptor,
101+
TestConfig.emptyConfig)
102+
103+
val expectedKey = "internal_path_prefix"
104+
val expectedValue = Option(tesPaths.tesTaskRoot.pathAsString)
105+
106+
//Assert path correctly ends up in the resources
107+
val wei = Option("abc123")
108+
TesTask.makeResources(runtimeAttributes, wei, expectedValue) shouldEqual
109+
Resources(None, None, None, Option(false), None,
110+
Option(Map(TesWorkflowOptionKeys.WorkflowExecutionIdentity -> Option("abc123"),
111+
expectedKey -> expectedValue))
112+
)
113+
}
114+
74115
it should "copy labels to tags" in {
75116
val jobLogger = mock[JobLogger]
76117
val emptyWorkflowOptions = WorkflowOptions(JsObject(Map.empty[String, JsValue]))

0 commit comments

Comments
 (0)