Skip to content

Commit 1515aa8

Browse files
authored
WX-1595 GCP Batch backend refactor to include the PAPI request manager (#7412)
The main goal is to refactor Batch backend to include [PipelinesApiRequestManager](https://github.com/broadinstitute/cromwell/blob/5448b85bf334e0970665a69549e796199acc8bd7/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestManager.scala) and [PipelinesApiRequestWorker](https://github.com/broadinstitute/cromwell/blob/5448b85bf334e0970665a69549e796199acc8bd7/supportedBackends/google/pipelines/common/src/main/scala/cromwell/backend/google/pipelines/common/api/PipelinesApiRequestWorker.scala). This also fixes a few missing details from the initial Batch integration (#7177), for example: 1. Missing metrics are now published. 2. The job status is queried before deleting it to try preventing the deletion of jobs that are in a final state (PAPI can abort jobs but Batch deletes them instead).
1 parent 408951f commit 1515aa8

39 files changed

+2276
-470
lines changed

core/src/main/resources/reference.conf

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -625,6 +625,10 @@ load-control {
625625
# Google requests to the Pipelines API are also queued and batched
626626
papi-requests = 10000
627627

628+
## Backend specific ##
629+
# Google requests to the Batch API are also queued and batched
630+
batch-requests = 10000
631+
628632
## Misc. ##
629633
# How often each actor should update its perceived load
630634
monitoring-frequency = 5 seconds

core/src/main/scala/cromwell/core/LoadConfig.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ object LoadConfig {
1919
val MetadataWriteThreshold = conf.as[Int]("metadata-write")
2020
val MonitoringFrequency = conf.as[FiniteDuration]("monitoring-frequency")
2121
val PAPIThreshold = conf.as[Int]("papi-requests")
22+
val BatchThreshold = conf.as[Int]("batch-requests")
2223
}

core/src/test/scala/cromwell/core/LoadConfigSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ class LoadConfigSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers
2222
LoadConfig.IoNormalWindowMaximum shouldBe 60.seconds
2323
LoadConfig.MonitoringFrequency shouldBe 5.seconds
2424
LoadConfig.PAPIThreshold shouldBe 10000
25+
LoadConfig.BatchThreshold shouldBe 10000
2526
}
2627
}

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,26 +2,22 @@ package cromwell.backend.google.batch
22

33
import akka.actor.{ActorRef, Props}
44
import com.google.api.client.util.ExponentialBackOff
5+
import com.google.api.gax.rpc.FixedHeaderProvider
6+
import com.google.cloud.batch.v1.BatchServiceSettings
7+
import com.google.common.collect.ImmutableMap
58
import com.typesafe.scalalogging.StrictLogging
9+
import cromwell.backend._
610
import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{
711
preemptionCountKey,
812
robustBuildAttributes,
913
unexpectedRetryCountKey
1014
}
1115
import cromwell.backend.google.batch.actors._
12-
import cromwell.backend.google.batch.api.{GcpBatchApiRequestHandler, GcpBatchRequestFactoryImpl}
13-
import cromwell.backend.google.batch.models.{GcpBatchConfiguration, GcpBatchConfigurationAttributes}
16+
import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler}
1417
import cromwell.backend.google.batch.callcaching.{BatchBackendCacheHitCopyingActor, BatchBackendFileHashingActor}
18+
import cromwell.backend.google.batch.models.{GcpBatchConfiguration, GcpBatchConfigurationAttributes}
1519
import cromwell.backend.standard._
1620
import cromwell.backend.standard.callcaching.{StandardCacheHitCopyingActor, StandardFileHashingActor}
17-
import cromwell.backend.{
18-
BackendConfigurationDescriptor,
19-
BackendInitializationData,
20-
BackendWorkflowDescriptor,
21-
Gcp,
22-
JobExecutionMap,
23-
Platform
24-
}
2521
import cromwell.cloudsupport.gcp.GoogleConfiguration
2622
import cromwell.core.CallOutputs
2723
import wom.graph.CommandCallNode
@@ -70,7 +66,6 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
7066

7167
override def workflowFinalizationActorParams(workflowDescriptor: BackendWorkflowDescriptor,
7268
ioActor: ActorRef,
73-
// batchConfiguration: GcpBatchConfiguration,
7469
calls: Set[CommandCallNode],
7570
jobExecutionMap: JobExecutionMap,
7671
workflowOutputs: CallOutputs,
@@ -93,10 +88,19 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String,
9388
)
9489

9590
override def backendSingletonActorProps(serviceRegistryActor: ActorRef): Option[Props] = {
96-
val requestHandler = new GcpBatchApiRequestHandler
97-
val requestFactory = new GcpBatchRequestFactoryImpl()(batchConfiguration.batchAttributes.gcsTransferConfiguration)
91+
implicit val requestHandler: RequestHandler = new RequestHandler
92+
93+
val batchSettings = BatchServiceSettings.newBuilder
94+
.setHeaderProvider(FixedHeaderProvider.create(ImmutableMap.of("user-agent", "cromwell")))
95+
.build
96+
9897
Option(
99-
GcpBatchBackendSingletonActor.props(requestFactory, serviceRegistryActor = serviceRegistryActor)(requestHandler)
98+
GcpBatchBackendSingletonActor.props(
99+
qps = batchConfiguration.batchAttributes.qps,
100+
requestWorkers = batchConfiguration.batchAttributes.requestWorkers,
101+
serviceRegistryActor = serviceRegistryActor,
102+
batchRequestExecutor = new BatchRequestExecutor.CloudImpl(batchSettings)
103+
)(requestHandler)
100104
)
101105
}
102106

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiAbortClient.scala

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,47 @@ package cromwell.backend.google.batch.actors
22

33
import akka.actor.{Actor, ActorLogging, ActorRef}
44
import com.google.cloud.batch.v1.JobName
5+
import cromwell.backend.google.batch.api.BatchApiRequestManager.{BatchAbortRequest, BatchApiAbortQueryFailed}
6+
import cromwell.backend.google.batch.api.GcpBatchRequestFactory
57
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
8+
import cromwell.backend.standard.StandardAsyncJob
9+
import cromwell.core.WorkflowId
10+
import cromwell.core.logging.JobLogging
611

7-
trait BatchApiAbortClient { this: Actor with ActorLogging with BatchInstrumentation =>
12+
object BatchApiAbortClient {
13+
sealed trait BatchAbortRequestSuccess
14+
case class BatchAbortRequestSuccessful(jobId: String) extends BatchAbortRequestSuccess
815

9-
def abortJob(jobName: JobName, backendSingletonActor: ActorRef): Unit =
10-
backendSingletonActor ! GcpBatchBackendSingletonActor.Action.AbortJob(jobName)
16+
// The operation is no longer running. Maybe it was already cancelled, maybe it finished on its own. We don't know
17+
// the details and for abort they don't really matter.
18+
case class BatchOperationIsAlreadyTerminal(jobId: String) extends BatchAbortRequestSuccess
19+
}
20+
21+
trait BatchApiAbortClient { this: Actor with ActorLogging with JobLogging with BatchInstrumentation =>
22+
import BatchApiAbortClient._
23+
24+
def abortJob(workflowId: WorkflowId,
25+
jobName: JobName,
26+
backendSingletonActor: ActorRef,
27+
requestFactory: GcpBatchRequestFactory
28+
): Unit =
29+
backendSingletonActor ! BatchAbortRequest(
30+
workflowId = workflowId,
31+
requester = self,
32+
httpRequest = requestFactory.abortRequest(jobName),
33+
jobId = StandardAsyncJob(jobName.toString)
34+
)
1135

1236
def abortActorClientReceive: Actor.Receive = {
13-
case GcpBatchBackendSingletonActor.Event.JobAbortRequestSent(job) =>
14-
log.info(s"Job aborted on GCP: ${job.getName}")
37+
case BatchAbortRequestSuccessful(jobId) =>
1538
abortSuccess()
39+
jobLogger.info(s"Successfully requested cancellation of $jobId")
40+
41+
// In this case we could immediately return an aborted handle and spare ourselves a round of polling
42+
case BatchOperationIsAlreadyTerminal(jobId) =>
43+
jobLogger.info(s"Job $jobId has already finished")
1644

17-
case GcpBatchBackendSingletonActor.Event.ActionFailed(jobName, cause) =>
18-
val msg = s"Failed to abort job ($jobName) from GCP"
19-
log.error(cause, msg)
20-
abortFailed()
45+
case BatchApiAbortQueryFailed(query, e) =>
46+
jobLogger.error(s"Could not request cancellation of job ${query.jobId}", e)
2147
}
2248
}

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/BatchApiFetchJobClient.scala

Lines changed: 0 additions & 46 deletions
This file was deleted.
Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,34 @@
11
package cromwell.backend.google.batch.actors
22

33
import akka.actor.{Actor, ActorLogging, ActorRef}
4+
import cromwell.backend.google.batch.api.BatchApiRequestManager.{
5+
BatchApiRunCreationQueryFailed,
6+
SystemBatchApiException
7+
}
8+
import cromwell.backend.google.batch.api.{BatchApiRequestManager, GcpBatchRequestFactory}
49
import cromwell.backend.google.batch.models.GcpBatchRequest
510
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
611
import cromwell.backend.standard.StandardAsyncJob
12+
import cromwell.core.logging.JobLogger
713

814
import scala.concurrent.{Future, Promise}
915
import scala.util.{Failure, Success, Try}
1016

1117
/**
12-
* Handles the flow for submitting a single job to GCP, we can't do anything when that fails
18+
* Handles the flow for submitting a single job to GCP
1319
*/
1420
trait BatchApiRunCreationClient { this: Actor with ActorLogging with BatchInstrumentation =>
1521
private var runCreationClientPromise: Option[Promise[StandardAsyncJob]] = None
1622

1723
// handles messages produced from GcpBatchBackendSingletonActor
1824
def runCreationClientReceive: Actor.Receive = {
19-
case GcpBatchBackendSingletonActor.Event.JobSubmitted(job) =>
20-
log.info(s"Job submitted to GCP: ${job.getName}")
25+
case job: StandardAsyncJob =>
26+
log.info(s"A job was submitted successfully: ${job.jobId}")
2127
runSuccess()
22-
completePromise(Success(StandardAsyncJob(job.getName)))
23-
24-
case GcpBatchBackendSingletonActor.Event.ActionFailed(jobName, cause) =>
25-
val msg = s"Failed to submit job ($jobName) to GCP"
26-
log.error(cause, msg)
27-
runFailed()
28-
completePromise(Failure(cause))
28+
completePromise(Success(job))
29+
case BatchApiRunCreationQueryFailed(query, e) =>
30+
log.error(e, s"Failed to submit job ${query.httpRequest.getJobId}: ${e.getMessage}")
31+
completePromise(Failure(e))
2932
}
3033

3134
private def completePromise(job: Try[StandardAsyncJob]): Unit = {
@@ -35,15 +38,37 @@ trait BatchApiRunCreationClient { this: Actor with ActorLogging with BatchInstru
3538
runCreationClientPromise = None
3639
}
3740

38-
def runBatchJob(request: GcpBatchRequest, backendSingletonActor: ActorRef): Future[StandardAsyncJob] =
41+
def runBatchJob(
42+
request: GcpBatchRequest,
43+
backendSingletonActor: ActorRef,
44+
requestFactory: GcpBatchRequestFactory,
45+
jobLogger: JobLogger
46+
): Future[StandardAsyncJob] =
3947
runCreationClientPromise match {
4048
case Some(p) =>
4149
p.future
4250
case None =>
43-
log.info(s"Asking singleton actor to submit a job: ${request.jobName}")
44-
backendSingletonActor ! GcpBatchBackendSingletonActor.Action.SubmitJob(request)
51+
jobLogger.info(s"Asking singleton actor to submit a job: ${request.jobName}")
52+
53+
backendSingletonActor ! BatchApiRequestManager.BatchRunCreationRequest(
54+
request.workflowId,
55+
self,
56+
requestFactory.submitRequest(request)
57+
)
4558
val newPromise = Promise[StandardAsyncJob]()
4659
runCreationClientPromise = Option(newPromise)
4760
newPromise.future
4861
}
4962
}
63+
64+
object BatchApiRunCreationClient {
65+
66+
/**
67+
* Exception used to represent the fact that a job was aborted before a creation attempt was made.
68+
* Meaning it was in the queue when the abort request was made, so it was just removed from the queue.
69+
*/
70+
case object JobAbortedException
71+
extends SystemBatchApiException(
72+
new Exception("The job was removed from the queue before a Batch creation request was made")
73+
)
74+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package cromwell.backend.google.batch.actors
2+
3+
import akka.actor.{Actor, ActorLogging, ActorRef}
4+
import com.google.cloud.batch.v1.JobName
5+
import cromwell.backend.google.batch.api.BatchApiRequestManager.{BatchApiStatusQueryFailed, BatchStatusPollRequest}
6+
import cromwell.backend.google.batch.api.GcpBatchRequestFactory
7+
import cromwell.backend.google.batch.models.RunStatus
8+
import cromwell.backend.google.batch.monitoring.BatchInstrumentation
9+
import cromwell.backend.standard.StandardAsyncJob
10+
import cromwell.core.WorkflowId
11+
12+
import scala.concurrent.{Future, Promise}
13+
import scala.util.{Failure, Success, Try}
14+
15+
/**
16+
* Allows fetching a job status
17+
*/
18+
trait BatchApiStatusRequestClient { this: Actor with ActorLogging with BatchInstrumentation =>
19+
20+
private var pollingActorClientPromise: Option[Promise[RunStatus]] = None
21+
22+
def pollingActorClientReceive: Actor.Receive = {
23+
case status: RunStatus =>
24+
log.debug(s"Polled status received: $status")
25+
pollSuccess()
26+
completePromise(Success(status))
27+
case BatchApiStatusQueryFailed(query, e) =>
28+
log.error(e, s"Poll status failed for job ${query.jobId}: ${e.getMessage}")
29+
completePromise(Failure(e))
30+
}
31+
32+
private def completePromise(result: Try[RunStatus]): Unit = {
33+
pollingActorClientPromise foreach { _.complete(result) }
34+
pollingActorClientPromise = None
35+
}
36+
37+
def pollStatus(
38+
workflowId: WorkflowId,
39+
jobName: JobName,
40+
backendSingletonActor: ActorRef,
41+
requestFactory: GcpBatchRequestFactory
42+
): Future[RunStatus] =
43+
pollingActorClientPromise match {
44+
case Some(p) => p.future
45+
case None =>
46+
backendSingletonActor ! BatchStatusPollRequest(
47+
workflowId,
48+
self,
49+
requestFactory.queryRequest(jobName),
50+
StandardAsyncJob(jobName.toString)
51+
)
52+
53+
val newPromise = Promise[RunStatus]()
54+
pollingActorClientPromise = Option(newPromise)
55+
newPromise.future
56+
}
57+
}

0 commit comments

Comments
 (0)