-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WDL Draft 2 workflow source + imports => namespace cache #3971
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,34 +1,43 @@ | ||
package languages.wdl.draft2 | ||
|
||
import java.security.MessageDigest | ||
import java.util.concurrent.{Callable, TimeUnit} | ||
|
||
import cats.data.EitherT.fromEither | ||
import cats.effect.IO | ||
import cats.instances.either._ | ||
import cats.instances.list._ | ||
import cats.syntax.functor._ | ||
import cats.data.EitherT.fromEither | ||
import cats.effect.IO | ||
import cats.syntax.traverse._ | ||
import com.google.common.cache.{Cache, CacheBuilder} | ||
import com.typesafe.config.Config | ||
import common.Checked | ||
import common.validation.Validation._ | ||
import common.validation.Checked._ | ||
import common.validation.ErrorOr.{ErrorOr, _} | ||
import common.validation.ErrorOr._ | ||
import common.validation.Parse.Parse | ||
import common.validation.Validation._ | ||
import cromwell.core._ | ||
import cromwell.languages.util.ImportResolver.{ImportResolutionRequest, ImportResolver} | ||
import cromwell.languages.util.{ImportResolver, LanguageFactoryUtil} | ||
import cromwell.languages.{LanguageFactory, ValidatedWomNamespace} | ||
import languages.wdl.draft2.WdlDraft2LanguageFactory._ | ||
import mouse.all._ | ||
import net.ceedubs.ficus.Ficus._ | ||
import wdl.draft2.model.{Draft2ImportResolver, WdlNamespace, WdlNamespaceWithWorkflow} | ||
import wdl.shared.transforms.wdlom2wom.WdlSharedInputParsing | ||
import wdl.transforms.draft2.wdlom2wom.WdlDraft2WomBundleMakers._ | ||
import wom.core.{WorkflowJson, WorkflowOptionsJson, WorkflowSource} | ||
import wom.graph.GraphNodePort.OutputPort | ||
import wdl.transforms.draft2.wdlom2wom.WdlDraft2WomExecutableMakers._ | ||
import wom.core.{WorkflowJson, WorkflowOptionsJson, WorkflowSource} | ||
import wom.executable.WomBundle | ||
import wom.expression.IoFunctionSet | ||
import wom.transforms.WomExecutableMaker.ops._ | ||
import wom.graph.GraphNodePort.OutputPort | ||
import wom.transforms.WomBundleMaker.ops._ | ||
import wom.values.WomValue | ||
import languages.wdl.draft2.WdlDraft2LanguageFactory._ | ||
import wom.transforms.WomExecutableMaker.ops._ | ||
import wom.values._ | ||
|
||
class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends LanguageFactory { | ||
import scala.concurrent.duration.FiniteDuration | ||
|
||
class WdlDraft2LanguageFactory(override val config: Config) extends LanguageFactory { | ||
|
||
override val languageName: String = "WDL" | ||
override val languageVersionName: String = "draft-2" | ||
|
@@ -54,19 +63,25 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La | |
list.sequence[Checked, Unit].void | ||
} | ||
|
||
import common.validation.Validation._ | ||
|
||
lazy val wdlNamespaceValidation: ErrorOr[WdlNamespaceWithWorkflow] = source match { | ||
case w: WorkflowSourceFilesWithDependenciesZip => | ||
for { | ||
importsDir <- LanguageFactoryUtil.validateImportsDirectory(w.importsZip) | ||
wf <- WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr | ||
_ = importsDir.delete(swallowIOExceptions = true) | ||
} yield wf | ||
case _: WorkflowSourceFilesWithoutImports => | ||
WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr | ||
def workflowHashKey: String = { | ||
workflowSource.md5Sum + (source.importsZipFileOption map { bytes => new String(MessageDigest.getInstance("MD5").digest(bytes)) }).getOrElse("") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think so but to double check - are we happy to assert that MD5suming the zip file is less CPU intensive than just parsing the WDL? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems a pretty safe assumption that digesting the array of bytes is cheaper than stringifying it and parsing. 🙂 |
||
} | ||
|
||
def validationCallable = new Callable[ErrorOr[WdlNamespaceWithWorkflow]] { | ||
def call: ErrorOr[WdlNamespaceWithWorkflow] = source match { | ||
case w: WorkflowSourceFilesWithDependenciesZip => | ||
for { | ||
importsDir <- LanguageFactoryUtil.validateImportsDirectory(w.importsZip) | ||
wf <- WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr | ||
_ = importsDir.delete(swallowIOExceptions = true) | ||
} yield wf | ||
case _: WorkflowSourceFilesWithoutImports => | ||
WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr | ||
} | ||
} | ||
|
||
lazy val wdlNamespaceValidation: ErrorOr[WdlNamespaceWithWorkflow] = namespaceCache.map(_.get(workflowHashKey, validationCallable)).getOrElse(validationCallable.call) | ||
|
||
def evaluateImports(wdlNamespace: WdlNamespace): Map[String, String] = { | ||
// Descend the namespace looking for imports and construct `MetadataEvent`s for them. | ||
def collectImportEvents: Map[String, String] = { | ||
|
@@ -87,7 +102,7 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La | |
wdlNamespace <- wdlNamespaceValidation.toEither | ||
_ <- validateWorkflowNameLengths(wdlNamespace) | ||
importedUris = evaluateImports(wdlNamespace) | ||
womExecutable <- wdlNamespace.toWomExecutable(Option(source.inputsJson), ioFunctions, standardConfig.strictValidation) | ||
womExecutable <- wdlNamespace.toWomExecutable(Option(source.inputsJson), ioFunctions, strictValidation) | ||
validatedWomNamespaceBeforeMetadata <- LanguageFactoryUtil.validateWomNamespace(womExecutable, ioFunctions) | ||
_ <- checkTypes(wdlNamespace, validatedWomNamespaceBeforeMetadata.womValueInputs) | ||
} yield validatedWomNamespaceBeforeMetadata.copy(importedFileContent = importedUris) | ||
|
@@ -116,12 +131,32 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La | |
|
||
override def createExecutable(womBundle: WomBundle, inputs: WorkflowJson, ioFunctions: IoFunctionSet): Checked[ValidatedWomNamespace] = for { | ||
_ <- enabledCheck | ||
executable <- WdlSharedInputParsing.buildWomExecutable(womBundle, Option(inputs), ioFunctions, standardConfig.strictValidation) | ||
executable <- WdlSharedInputParsing.buildWomExecutable(womBundle, Option(inputs), ioFunctions, strictValidation) | ||
validatedNamespace <- LanguageFactoryUtil.validateWomNamespace(executable, ioFunctions) | ||
} yield validatedNamespace | ||
|
||
// Commentary: we'll set this as the default in the reference.conf, so most people will get WDL draft 2 if nothing else looks parsable. | ||
override def looksParsable(content: String): Boolean = false | ||
|
||
private[draft2] lazy val cacheConfig: Option[CacheConfig] = { | ||
// WDL version 2 namespace caching is now opt-in. | ||
for { | ||
_ <- enabled.option(()) | ||
caching <- config.as[Option[Config]]("caching") | ||
_ <- caching.as[Option[Boolean]]("enabled").contains(true).option(()) | ||
ttl = caching.as[Option[FiniteDuration]]("ttl").getOrElse(FiniteDuration.apply(20, TimeUnit.MINUTES)) | ||
concurrency = caching.as[Option[Int]]("concurrency").getOrElse(2) | ||
size = caching.as[Option[Long]]("size").getOrElse(1000L) | ||
} yield CacheConfig(concurrency = concurrency, size = size, ttl = ttl) | ||
} | ||
|
||
private[draft2] lazy val namespaceCache: Option[Cache[WorkflowSource, ErrorOr[WdlNamespaceWithWorkflow]]] = cacheConfig map { c => | ||
CacheBuilder.newBuilder() | ||
.concurrencyLevel(c.concurrency) | ||
.expireAfterAccess(c.ttl.length, c.ttl.unit) | ||
.maximumSize(c.size) | ||
.build[WorkflowSource, ErrorOr[WdlNamespaceWithWorkflow]]() | ||
} | ||
} | ||
|
||
object WdlDraft2LanguageFactory { | ||
|
@@ -132,4 +167,6 @@ object WdlDraft2LanguageFactory { | |
|
||
val httpResolver = resolverConverter(ImportResolver.HttpResolver()) | ||
def httpResolverWithHeaders(headers: Map[String, String]) = resolverConverter(ImportResolver.HttpResolver(headers = headers)) | ||
|
||
private [draft2] case class CacheConfig(concurrency: Int, size: Long, ttl: FiniteDuration) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does concurrency mean in cache terms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's literally "Guava cache concurrency" in that this parameter is passed to the cache builder's
.concurrency()
method.