Skip to content

Commit f01c1f3

Browse files
zykondacjllanwarnejgainerdewar
authored
[WM-2291] Callback API contract tests between Cromwell and CBAS (#7251)
Co-authored-by: Chris Llanwarne <cjllanwarne@gmail.com> Co-authored-by: Janet Gainer-Dewar <jdewar@broadinstitute.org>
1 parent da13c87 commit f01c1f3

File tree

11 files changed

+183
-16
lines changed

11 files changed

+183
-16
lines changed

.github/workflows/consumer_contract_tests.yml

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ jobs:
116116
runs-on: ubuntu-latest
117117
needs: [init-github-context]
118118
outputs:
119-
pact-b64: ${{ steps.encode-pact.outputs.pact-b64 }}
119+
pact-b64-drshub: ${{ steps.encode-pact.outputs.pact-b64-drshub }}
120+
pact-b64-cbas: ${{ steps.encode-pact.outputs.pact-b64-cbas }}
120121

121122
steps:
122123
- uses: actions/checkout@v3
@@ -132,9 +133,12 @@ jobs:
132133
- name: Output consumer contract as non-breaking base64 string
133134
id: encode-pact
134135
run: |
136+
set -e
135137
cd pact4s
136-
NON_BREAKING_B64=$(cat target/pacts/cromwell-drshub.json | base64 -w 0)
137-
echo "pact-b64=${NON_BREAKING_B64}" >> $GITHUB_OUTPUT
138+
NON_BREAKING_B64_DRSHUB=$(cat target/pacts/cromwell-drshub.json | base64 -w 0)
139+
NON_BREAKING_B64_CBAS=$(cat target/pacts/cromwell-cbas.json | base64 -w 0)
140+
echo "pact-b64-drshub=${NON_BREAKING_B64_DRSHUB}" >> $GITHUB_OUTPUT
141+
echo "pact-b64-cbas=${NON_BREAKING_B64_CBAS}" >> $GITHUB_OUTPUT
138142
139143
# Prevent untrusted sources from using PRs to publish contracts
140144
# since access to secrets is not allowed.
@@ -143,7 +147,23 @@ jobs:
143147
if: ${{ needs.init-github-context.outputs.fork == 'false' || needs.init-github-context.outputs.fork == ''}}
144148
needs: [init-github-context, cromwell-contract-tests]
145149
steps:
146-
- name: Dispatch to terra-github-workflows
150+
- name: Dispatch drshub to terra-github-workflows
151+
uses: broadinstitute/workflow-dispatch@v4.0.0
152+
with:
153+
run-name: "${{ env.PUBLISH_CONTRACTS_RUN_NAME }}"
154+
workflow: .github/workflows/publish-contracts.yaml
155+
repo: broadinstitute/terra-github-workflows
156+
ref: refs/heads/main
157+
token: ${{ secrets.BROADBOT_GITHUB_TOKEN }} # github token for access to kick off a job in the private repo
158+
inputs: '{
159+
"run-name": "${{ env.PUBLISH_CONTRACTS_RUN_NAME }}",
160+
"pact-b64": "${{ needs.cromwell-contract-tests.outputs.pact-b64-drshub }}",
161+
"repo-owner": "${{ github.repository_owner }}",
162+
"repo-name": "${{ github.event.repository.name }}",
163+
"repo-branch": "${{ needs.init-github-context.outputs.repo-branch }}",
164+
"release-tag": "${{ needs.init-github-context.outputs.repo-version }}"
165+
}'
166+
- name: Dispatch cbas to terra-github-workflows
147167
uses: broadinstitute/workflow-dispatch@v4.0.0
148168
with:
149169
run-name: "${{ env.PUBLISH_CONTRACTS_RUN_NAME }}"
@@ -153,7 +173,7 @@ jobs:
153173
token: ${{ secrets.BROADBOT_GITHUB_TOKEN }} # github token for access to kick off a job in the private repo
154174
inputs: '{
155175
"run-name": "${{ env.PUBLISH_CONTRACTS_RUN_NAME }}",
156-
"pact-b64": "${{ needs.cromwell-contract-tests.outputs.pact-b64 }}",
176+
"pact-b64": "${{ needs.cromwell-contract-tests.outputs.pact-b64-cbas }}",
157177
"repo-owner": "${{ github.repository_owner }}",
158178
"repo-name": "${{ github.event.repository.name }}",
159179
"repo-branch": "${{ needs.init-github-context.outputs.repo-branch }}",

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,7 +399,9 @@ lazy val `cromwell-drs-localizer` = project
399399

400400
lazy val pact4s = project.in(file("pact4s"))
401401
.settings(pact4sSettings)
402+
.dependsOn(engine)
402403
.dependsOn(services)
404+
.dependsOn(engine % "test->test")
403405
.disablePlugins(sbtassembly.AssemblyPlugin)
404406

405407
lazy val server = project

cloud-nio/cloud-nio-spi/src/main/scala/cloud/nio/spi/CloudNioPath.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ class CloudNioPath(filesystem: CloudNioFileSystem, private[spi] val unixPath: Un
176176
): WatchKey = throw new UnsupportedOperationException
177177

178178
override def iterator(): java.util.Iterator[Path] =
179-
if (unixPath.isEmpty || unixPath.isRoot) {
179+
if (unixPath.izEmpty || unixPath.isRoot) {
180180
java.util.Collections.emptyIterator()
181181
} else {
182182
unixPath.split().to(LazyList).map(part => newPath(UnixPath.getPath(part)).asInstanceOf[Path]).iterator.asJava

cloud-nio/cloud-nio-spi/src/main/scala/cloud/nio/spi/UnixPath.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ final private[spi] case class UnixPath(path: String) extends CharSequence {
6767

6868
def isAbsolute: Boolean = UnixPath.isAbsolute(path)
6969

70-
def isEmpty: Boolean = path.isEmpty
70+
// Named this way because isEmpty is a name collision new in 17.
71+
// The initial compile error is that it needs an override.
72+
// Adding the override results in a second error saying it overrides nothing!
73+
// So, we just renamed it.
74+
def izEmpty: Boolean = path.isEmpty
7175

7276
def hasTrailingSeparator: Boolean = UnixPath.hasTrailingSeparator(path)
7377

engine/src/main/scala/cromwell/engine/workflow/lifecycle/finalization/WorkflowCallbackActor.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import akka.http.scaladsl.model.headers.RawHeader
1111
import akka.routing.Broadcast
1212
import akka.util.ByteString
1313
import cats.data.Validated.{Invalid, Valid}
14-
import cats.implicits.toTraverseOps
14+
import cats.implicits.{catsSyntaxValidatedId, toTraverseOps}
1515
import com.typesafe.config.Config
1616
import com.typesafe.scalalogging.LazyLogging
1717
import common.validation.ErrorOr
@@ -50,6 +50,10 @@ object WorkflowCallbackConfig extends LazyLogging {
5050
override def getAccessToken: ErrorOr.ErrorOr[String] = AzureCredentials.getAccessToken()
5151
}
5252

53+
case class StaticTokenAuth(token: String) extends AuthMethod {
54+
override def getAccessToken: ErrorOr.ErrorOr[String] = token.validNel
55+
}
56+
5357
private lazy val defaultNumThreads = 5
5458
private lazy val defaultRetryBackoff = SimpleExponentialBackoff(3.seconds, 5.minutes, 1.1)
5559
private lazy val defaultMaxRetries = 10

pact4s/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pact4s is used for contract testing.
88
val pact4sDependencies = Seq(
99
pact4sScalaTest,
1010
pact4sCirce,
11+
pact4sSpray
1112
http4sEmberClient,
1213
http4sDsl,
1314
http4sEmberServer,
@@ -47,4 +48,5 @@ docker run --rm -v $PWD:/working \
4748

4849
The generated contracts can be found in the `./target/pacts` folder
4950
- `cromwell-drshub.json`
51+
- `cromwell-cbas.json`
5052

pact4s/src/main/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/Helper.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ object PactHelper {
108108
requestBody: DslPart,
109109
status: Int,
110110
responseHeaders: Seq[(String, String)],
111-
responsBody: DslPart
111+
responseBody: DslPart
112112
): PactDslResponse =
113113
builder
114114
.`given`(state, scala.jdk.CollectionConverters.MapHasAsJava(stateParams).asJava)
@@ -120,7 +120,26 @@ object PactHelper {
120120
.willRespondWith()
121121
.status(status)
122122
.headers(scala.jdk.CollectionConverters.MapHasAsJava(responseHeaders.toMap).asJava)
123-
.body(responsBody)
123+
.body(responseBody)
124+
125+
def buildInteraction[A](builder: PactDslWithProvider,
126+
state: String,
127+
uponReceiving: String,
128+
method: String,
129+
path: String,
130+
requestHeaders: Seq[(String, String)],
131+
requestBody: A,
132+
status: Int
133+
)(implicit ev: PactBodyJsonEncoder[A]): PactDslResponse =
134+
builder
135+
.`given`(state)
136+
.uponReceiving(uponReceiving)
137+
.method(method)
138+
.path(path)
139+
.headers(scala.jdk.CollectionConverters.MapHasAsJava(requestHeaders.toMap).asJava)
140+
.body(ev.toJsonString(requestBody))
141+
.willRespondWith()
142+
.status(status)
124143

125144
def buildInteraction(builder: PactDslResponse,
126145
state: String,
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package org.broadinstitute.dsde.workbench.cromwell.consumer
2+
3+
import akka.testkit._
4+
import au.com.dius.pact.consumer.dsl._
5+
import au.com.dius.pact.consumer.{ConsumerPactBuilder, PactTestExecutionContext}
6+
import au.com.dius.pact.core.model.RequestResponsePact
7+
import cromwell.core.retry.SimpleExponentialBackoff
8+
import cromwell.core.{CallOutputs, TestKitSuite, WorkflowId, WorkflowSucceeded}
9+
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackActor.PerformCallbackCommand
10+
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackConfig.StaticTokenAuth
11+
import cromwell.engine.workflow.lifecycle.finalization.WorkflowCallbackJsonSupport._
12+
import cromwell.engine.workflow.lifecycle.finalization.{CallbackMessage, WorkflowCallbackActor, WorkflowCallbackConfig}
13+
import cromwell.services.metadata.MetadataKey
14+
import cromwell.services.metadata.MetadataService.PutMetadataAction
15+
import cromwell.util.GracefulShutdownHelper
16+
import org.broadinstitute.dsde.workbench.cromwell.consumer.PactHelper._
17+
import org.scalatest.flatspec.AnyFlatSpecLike
18+
import org.scalatest.matchers.should.Matchers
19+
import pact4s.scalatest.RequestResponsePactForger
20+
import pact4s.sprayjson.implicits._
21+
import wom.graph.GraphNodePort.GraphNodeOutputPort
22+
import wom.graph.WomIdentifier
23+
import wom.types.WomStringType
24+
import wom.values._
25+
26+
import java.net.URI
27+
import java.util.UUID
28+
import scala.concurrent.ExecutionContextExecutor
29+
import scala.concurrent.duration._
30+
31+
class CbasCallbackSpec extends TestKitSuite with AnyFlatSpecLike with Matchers with RequestResponsePactForger {
32+
33+
implicit val ec: ExecutionContextExecutor = system.dispatcher
34+
35+
// Akka test setup
36+
private val msgWait = 10.second.dilated
37+
private val serviceRegistryActor = TestProbe("testServiceRegistryActor")
38+
private val deathWatch = TestProbe("deathWatch")
39+
40+
private val callbackEndpoint = "/api/batch/v1/runs/results"
41+
private val bearerToken = "my-token"
42+
private val workflowId = WorkflowId(UUID.fromString("12345678-1234-1234-1111-111111111111"))
43+
private val basicOutputs = CallOutputs(
44+
Map(
45+
GraphNodeOutputPort(WomIdentifier("foo", "wf.foo"), WomStringType, null) -> WomString("bar"),
46+
GraphNodeOutputPort(WomIdentifier("hello", "wf.hello.hello"), WomStringType, null) -> WomString("Hello")
47+
)
48+
)
49+
50+
// This is the message that we expect Cromwell to send CBAS in this test
51+
private val expectedCallbackMessage = CallbackMessage(
52+
workflowId.toString,
53+
"Succeeded",
54+
Map(("wf.foo", WomString("bar")), ("wf.hello.hello", WomString("Hello"))),
55+
List.empty
56+
)
57+
58+
// Define the folder that the pact contracts get written to upon completion of this test suite.
59+
override val pactTestExecutionContext: PactTestExecutionContext =
60+
new PactTestExecutionContext(
61+
"./target/pacts"
62+
)
63+
64+
val consumerPactBuilder: ConsumerPactBuilder = ConsumerPactBuilder
65+
.consumer("cromwell")
66+
67+
val pactProvider: PactDslWithProvider = consumerPactBuilder
68+
.hasPactWith("cbas")
69+
70+
var pactUpdateCompletedRunDslResponse: PactDslResponse = buildInteraction(
71+
pactProvider,
72+
state = "post completed workflow results",
73+
uponReceiving = "Request to post workflow results",
74+
method = "POST",
75+
path = callbackEndpoint,
76+
requestHeaders = Seq("Authorization" -> "Bearer %s".format(bearerToken), "Content-type" -> "application/json"),
77+
requestBody = expectedCallbackMessage,
78+
status = 200
79+
)
80+
override val pact: RequestResponsePact = pactUpdateCompletedRunDslResponse.toPact
81+
82+
it should "send the right callback to the right URI" in {
83+
// Create actor
84+
val callbackConfig = WorkflowCallbackConfig.empty
85+
.copy(enabled = true)
86+
.copy(retryBackoff = SimpleExponentialBackoff(100.millis, 200.millis, 1.1))
87+
.copy(authMethod = Option(StaticTokenAuth(bearerToken)))
88+
.copy(defaultUri = Option(new URI(mockServer.getUrl + callbackEndpoint)))
89+
90+
val props = WorkflowCallbackActor.props(
91+
serviceRegistryActor.ref,
92+
callbackConfig
93+
)
94+
val workflowCallbackActor = system.actorOf(props, "testWorkflowCallbackActorPact")
95+
96+
// Send a command to trigger callback
97+
val cmd = PerformCallbackCommand(
98+
workflowId = workflowId,
99+
uri = None,
100+
terminalState = WorkflowSucceeded,
101+
workflowOutputs = basicOutputs,
102+
List.empty
103+
)
104+
workflowCallbackActor ! cmd
105+
106+
// Confirm the callback was successful
107+
serviceRegistryActor.expectMsgPF(msgWait) {
108+
case PutMetadataAction(List(resultEvent, _, _), _) =>
109+
resultEvent.key shouldBe MetadataKey(workflowId, None, "workflowCallback", "successful")
110+
case _ =>
111+
}
112+
113+
// Shut the actor down
114+
deathWatch.watch(workflowCallbackActor)
115+
workflowCallbackActor ! GracefulShutdownHelper.ShutdownCommand
116+
deathWatch.expectTerminated(workflowCallbackActor, msgWait)
117+
}
118+
}

pact4s/src/test/scala/org/broadinstitute/dsde/workbench/cromwell/consumer/DrsHubClientSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ class DrsHubClientSpec extends AnyFlatSpec with Matchers with RequestResponsePac
138138
requestBody = resourceRequestDsl,
139139
status = 200,
140140
responseHeaders = Seq(),
141-
responsBody = resourceMetadataResponseDsl
141+
responseBody = resourceMetadataResponseDsl
142142
)
143143

144144
pactDslResponse = buildInteraction(

project/Dependencies.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -839,14 +839,16 @@ object Dependencies {
839839
val http4sCirce = "org.http4s" %% "http4s-circe" % http4sV
840840
val pact4sScalaTest = "io.github.jbwheatley" %% "pact4s-scalatest" % pact4sV % Test
841841
val pact4sCirce = "io.github.jbwheatley" %% "pact4s-circe" % pact4sV
842+
val pact4sSpray = "io.github.jbwheatley" %% "pact4s-spray-json" % pact4sV
842843

843844
val pact4sDependencies = Seq(
844845
pact4sScalaTest,
845846
pact4sCirce,
847+
pact4sSpray,
846848
http4sEmberClient,
847849
http4sDsl,
848850
http4sEmberServer,
849851
http4sCirce,
850852
scalaTest,
851-
)
853+
) ++ akkaDependencies
852854
}

0 commit comments

Comments
 (0)