Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WDL Draft 2 workflow source + imports => namespace cache #3971

Merged
merged 1 commit into from
Aug 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,23 @@
Cromwell now allows for a user to submit the URL pointing to workflow file to run a workflow using `workflowUrl` parameter. Currently, this is only supported in `Server` mode.
More details on how to use it can be found [here](http://cromwell.readthedocs.io/en/develop/api/RESTAPI/).

### Languages

- Added an opt-in namespace cache for the WDL Draft 2 language factory. Please see the Cromwell example configuration for details. NOTE: if upgrading from a hotfix version of Cromwell
that relied upon this cache, the cache is now opt-in and must be turned on explicitly in config.

### Bug Fixes

#### API
- The `releaseHold` endpoint will now return `404 Not Found` for an unrecognized workflow ID and `400 Bad Request` for a malformed or invalid workflow ID.

#### Languages

- Fixed a bug that allowed values to be "auto-boxed" into a single-element `Array` of that type, which is not allowed in the WDL spec (Closes [#3478](https://github.com/broadinstitute/cromwell/issues/3478)).
- Fixed a bug that allowed values to be "auto-boxed" into a single-element `Array` of that type, which is not allowed in the WDL spec (Closes [#3478](https://github.com/broadinstitute/cromwell/issues/3478)).

#### PAPI version 1

- Restored standard output and error streaming for jobs.

## 34 Release Notes

Expand Down
10 changes: 10 additions & 0 deletions cromwell.examples.conf
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,16 @@ languages {
# config {
# strict-validation: true
# enabled: true
# caching {
# # WDL Draft 2 namespace caching is off by default, this value must be set to true to enable it.
# enabled: false
# # Guava cache concurrency
# concurrency: 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does concurrency mean in cache terms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's literally "Guava cache concurrency" in that this parameter is passed to the cache builder's .concurrency() method.

# # How long entries in the cache should live from the time of their last access.
# ttl: 20 minutes
# # Maximum number of entries in the cache (i.e. the number of workflow source + imports => namespace entries).
# size: 1000
# }
# }
}
# draft-3 is the same as 1.0 so files should be able to be submitted to Cromwell as 1.0
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.engine.language

import com.typesafe.config.Config
import cromwell.engine.language.CromwellLanguages._
import cromwell.languages.LanguageFactory

Expand All @@ -18,9 +19,9 @@ final case class CromwellLanguages private(languageConfig: LanguagesConfiguratio
lc.name.toUpperCase -> LanguageVersions(versions, default)
}).toMap

private def makeLanguageFactory(className: String, config: Map[String, Any]) = {
private def makeLanguageFactory(className: String, config: Config) = {
Class.forName(className)
.getConstructor(classOf[Map[String, Any]])
.getConstructor(classOf[Config])
.newInstance(config)
.asInstanceOf[LanguageFactory]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package cromwell.engine.language

import java.util.Map.Entry

import com.typesafe.config.ConfigFactory
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.engine.language.CromwellLanguages.{CromwellLanguageName, CromwellLanguageVersion}

import scala.collection.JavaConverters._

final case class LanguagesConfiguration(languages: List[LanguageVersionConfigurationEntry], default: Option[String])
final case class LanguageVersionConfigurationEntry(name: CromwellLanguageName, versions: Map[CromwellLanguageVersion, LanguageVersionConfigurationEntryFields], default: Option[String])
final case class LanguageVersionConfigurationEntryFields(className: String, config: Map[String, Any])
final case class LanguageVersionConfigurationEntry(name: CromwellLanguageName, versions: Map[CromwellLanguageVersion, LanguageVersionConfig], default: Option[String])
final case class LanguageVersionConfig(className: String, config: Config)

object LanguageConfiguration {
private val LanguagesConfig = ConfigFactory.load.getConfig("languages")
Expand All @@ -28,8 +28,8 @@ object LanguageConfiguration {
val versions = (languageVersionNames.toList map { languageVersionName =>
val configEntry = versionSet.getConfig(s""""$languageVersionName"""")
val className: String = configEntry.getString("language-factory")
val factoryConfig: Map[String, Any] = if (configEntry.hasPath("config")) { configEntry.getObject("config").unwrapped().asScala.toMap } else Map.empty[String, Any]
val fields = LanguageVersionConfigurationEntryFields(className, factoryConfig)
val factoryConfig: Config = if (configEntry.hasPath("config")) configEntry.getConfig("config") else ConfigFactory.empty()
val fields = LanguageVersionConfig(className, factoryConfig)
languageVersionName -> fields
}).toMap

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import better.files.File
import cats.Monad
import cats.data.EitherT.fromEither
import cats.effect.IO
import com.typesafe.config.Config
import common.Checked
import common.validation.Checked._
import common.validation.Parse.{Parse, errorOrParse, goParse, tryParse}
Expand All @@ -17,7 +18,7 @@ import wom.core.{WorkflowJson, WorkflowOptionsJson, WorkflowSource}
import wom.executable.WomBundle
import wom.expression.IoFunctionSet

class CwlV1_0LanguageFactory(override val config: Map[String, Any]) extends LanguageFactory {
class CwlV1_0LanguageFactory(override val config: Config) extends LanguageFactory {

override val languageName: String = "CWL"
override val languageVersionName: String = "v1.0"
Expand Down Expand Up @@ -56,7 +57,7 @@ class CwlV1_0LanguageFactory(override val config: Map[String, Any]) extends Lang
cwlFile <- writeCwlFileToNewTempDir()
_ <- unzipDependencies(cwlFile)
cwl <- CwlDecoder.decodeCwlFile(cwlFile, source.workflowRoot)
executable <- fromEither[IO](cwl.womExecutable(AcceptAllRequirements, Option(source.inputsJson), ioFunctions, standardConfig.strictValidation))
executable <- fromEither[IO](cwl.womExecutable(AcceptAllRequirements, Option(source.inputsJson), ioFunctions, strictValidation))
validatedWomNamespace <- fromEither[IO](LanguageFactoryUtil.validateWomNamespace(executable, ioFunctions))
_ <- CwlDecoder.todoDeleteCwlFileParentDirectory(cwlFile.parent)
} yield validatedWomNamespace
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.languages

import com.typesafe.config.Config
import common.Checked
import common.validation.Parse.Parse
import common.validation.Checked._
Expand All @@ -15,16 +16,16 @@ trait LanguageFactory {
def languageVersionName: String

// Passed in by the constructor:
def config: Map[String, Any]
def config: Config

def enabledCheck: Checked[Unit] = if (standardConfig.enabled) {
().validNelCheck
} else {
import net.ceedubs.ficus.Ficus._

lazy val enabled = !config.as[Option[Boolean]]("enabled").contains(false)
lazy val enabledCheck: Checked[Unit] = if (enabled) ().validNelCheck else
s"The language factory for $languageName ($languageVersionName) is not currently enabled in this Cromwell".invalidNelCheck
}

// Override if you want to accumulate extra options instead of throwing an exception:
lazy val standardConfig: StandardLanguageFactoryConfig = StandardLanguageFactoryConfig.parse(config, allowExtras = false)

lazy val strictValidation: Boolean = !config.as[Option[Boolean]]("strict-validation").contains(false)

def getWomBundle(workflowSource: WorkflowSource,
workflowOptionsJson: WorkflowOptionsJson,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package languages.wdl.biscayne
import cats.data.EitherT.fromEither
import cats.effect.IO
import cats.instances.either._
import com.typesafe.config.Config
import common.Checked
import common.transforms.CheckedAtoB
import common.validation.Parse.Parse
Expand All @@ -20,7 +21,7 @@ import wom.executable.WomBundle
import wom.expression.IoFunctionSet
import wom.transforms.WomExecutableMaker.ops._

class WdlBiscayneLanguageFactory(override val config: Map[String, Any]) extends LanguageFactory {
class WdlBiscayneLanguageFactory(override val config: Config) extends LanguageFactory {

override val languageName: String = "WDL"
override val languageVersionName: String = "Biscayne"
Expand Down Expand Up @@ -54,7 +55,7 @@ class WdlBiscayneLanguageFactory(override val config: Map[String, Any]) extends
override def createExecutable(womBundle: WomBundle, inputsJson: WorkflowJson, ioFunctions: IoFunctionSet): Checked[ValidatedWomNamespace] = {
for {
_ <- enabledCheck
executable <- womBundle.toWomExecutable(Option(inputsJson), ioFunctions, standardConfig.strictValidation)
executable <- womBundle.toWomExecutable(Option(inputsJson), ioFunctions, strictValidation)
validated <- LanguageFactoryUtil.validateWomNamespace(executable, ioFunctions)
} yield validated
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
package languages.wdl.draft2

import java.security.MessageDigest
import java.util.concurrent.{Callable, TimeUnit}

import cats.data.EitherT.fromEither
import cats.effect.IO
import cats.instances.either._
import cats.instances.list._
import cats.syntax.functor._
import cats.data.EitherT.fromEither
import cats.effect.IO
import cats.syntax.traverse._
import com.google.common.cache.{Cache, CacheBuilder}
import com.typesafe.config.Config
import common.Checked
import common.validation.Validation._
import common.validation.Checked._
import common.validation.ErrorOr.{ErrorOr, _}
import common.validation.ErrorOr._
import common.validation.Parse.Parse
import common.validation.Validation._
import cromwell.core._
import cromwell.languages.util.ImportResolver.{ImportResolutionRequest, ImportResolver}
import cromwell.languages.util.{ImportResolver, LanguageFactoryUtil}
import cromwell.languages.{LanguageFactory, ValidatedWomNamespace}
import languages.wdl.draft2.WdlDraft2LanguageFactory._
import mouse.all._
import net.ceedubs.ficus.Ficus._
import wdl.draft2.model.{Draft2ImportResolver, WdlNamespace, WdlNamespaceWithWorkflow}
import wdl.shared.transforms.wdlom2wom.WdlSharedInputParsing
import wdl.transforms.draft2.wdlom2wom.WdlDraft2WomBundleMakers._
import wom.core.{WorkflowJson, WorkflowOptionsJson, WorkflowSource}
import wom.graph.GraphNodePort.OutputPort
import wdl.transforms.draft2.wdlom2wom.WdlDraft2WomExecutableMakers._
import wom.core.{WorkflowJson, WorkflowOptionsJson, WorkflowSource}
import wom.executable.WomBundle
import wom.expression.IoFunctionSet
import wom.transforms.WomExecutableMaker.ops._
import wom.graph.GraphNodePort.OutputPort
import wom.transforms.WomBundleMaker.ops._
import wom.values.WomValue
import languages.wdl.draft2.WdlDraft2LanguageFactory._
import wom.transforms.WomExecutableMaker.ops._
import wom.values._

class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends LanguageFactory {
import scala.concurrent.duration.FiniteDuration

class WdlDraft2LanguageFactory(override val config: Config) extends LanguageFactory {

override val languageName: String = "WDL"
override val languageVersionName: String = "draft-2"
Expand All @@ -54,19 +63,25 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La
list.sequence[Checked, Unit].void
}

import common.validation.Validation._

lazy val wdlNamespaceValidation: ErrorOr[WdlNamespaceWithWorkflow] = source match {
case w: WorkflowSourceFilesWithDependenciesZip =>
for {
importsDir <- LanguageFactoryUtil.validateImportsDirectory(w.importsZip)
wf <- WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr
_ = importsDir.delete(swallowIOExceptions = true)
} yield wf
case _: WorkflowSourceFilesWithoutImports =>
WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr
def workflowHashKey: String = {
workflowSource.md5Sum + (source.importsZipFileOption map { bytes => new String(MessageDigest.getInstance("MD5").digest(bytes)) }).getOrElse("")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so but to double check - are we happy to assert that MD5suming the zip file is less CPU intensive than just parsing the WDL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a pretty safe assumption that digesting the array of bytes is cheaper than stringifying it and parsing. 🙂

}

def validationCallable = new Callable[ErrorOr[WdlNamespaceWithWorkflow]] {
def call: ErrorOr[WdlNamespaceWithWorkflow] = source match {
case w: WorkflowSourceFilesWithDependenciesZip =>
for {
importsDir <- LanguageFactoryUtil.validateImportsDirectory(w.importsZip)
wf <- WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr
_ = importsDir.delete(swallowIOExceptions = true)
} yield wf
case _: WorkflowSourceFilesWithoutImports =>
WdlNamespaceWithWorkflow.load(workflowSource, importResolvers map resolverConverter).toErrorOr
}
}

lazy val wdlNamespaceValidation: ErrorOr[WdlNamespaceWithWorkflow] = namespaceCache.map(_.get(workflowHashKey, validationCallable)).getOrElse(validationCallable.call)

def evaluateImports(wdlNamespace: WdlNamespace): Map[String, String] = {
// Descend the namespace looking for imports and construct `MetadataEvent`s for them.
def collectImportEvents: Map[String, String] = {
Expand All @@ -87,7 +102,7 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La
wdlNamespace <- wdlNamespaceValidation.toEither
_ <- validateWorkflowNameLengths(wdlNamespace)
importedUris = evaluateImports(wdlNamespace)
womExecutable <- wdlNamespace.toWomExecutable(Option(source.inputsJson), ioFunctions, standardConfig.strictValidation)
womExecutable <- wdlNamespace.toWomExecutable(Option(source.inputsJson), ioFunctions, strictValidation)
validatedWomNamespaceBeforeMetadata <- LanguageFactoryUtil.validateWomNamespace(womExecutable, ioFunctions)
_ <- checkTypes(wdlNamespace, validatedWomNamespaceBeforeMetadata.womValueInputs)
} yield validatedWomNamespaceBeforeMetadata.copy(importedFileContent = importedUris)
Expand Down Expand Up @@ -116,12 +131,32 @@ class WdlDraft2LanguageFactory(override val config: Map[String, Any]) extends La

override def createExecutable(womBundle: WomBundle, inputs: WorkflowJson, ioFunctions: IoFunctionSet): Checked[ValidatedWomNamespace] = for {
_ <- enabledCheck
executable <- WdlSharedInputParsing.buildWomExecutable(womBundle, Option(inputs), ioFunctions, standardConfig.strictValidation)
executable <- WdlSharedInputParsing.buildWomExecutable(womBundle, Option(inputs), ioFunctions, strictValidation)
validatedNamespace <- LanguageFactoryUtil.validateWomNamespace(executable, ioFunctions)
} yield validatedNamespace

// Commentary: we'll set this as the default in the reference.conf, so most people will get WDL draft 2 if nothing else looks parsable.
override def looksParsable(content: String): Boolean = false

private[draft2] lazy val cacheConfig: Option[CacheConfig] = {
// WDL version 2 namespace caching is now opt-in.
for {
_ <- enabled.option(())
caching <- config.as[Option[Config]]("caching")
_ <- caching.as[Option[Boolean]]("enabled").contains(true).option(())
ttl = caching.as[Option[FiniteDuration]]("ttl").getOrElse(FiniteDuration.apply(20, TimeUnit.MINUTES))
concurrency = caching.as[Option[Int]]("concurrency").getOrElse(2)
size = caching.as[Option[Long]]("size").getOrElse(1000L)
} yield CacheConfig(concurrency = concurrency, size = size, ttl = ttl)
}

private[draft2] lazy val namespaceCache: Option[Cache[WorkflowSource, ErrorOr[WdlNamespaceWithWorkflow]]] = cacheConfig map { c =>
CacheBuilder.newBuilder()
.concurrencyLevel(c.concurrency)
.expireAfterAccess(c.ttl.length, c.ttl.unit)
.maximumSize(c.size)
.build[WorkflowSource, ErrorOr[WdlNamespaceWithWorkflow]]()
}
}

object WdlDraft2LanguageFactory {
Expand All @@ -132,4 +167,6 @@ object WdlDraft2LanguageFactory {

val httpResolver = resolverConverter(ImportResolver.HttpResolver())
def httpResolverWithHeaders(headers: Map[String, String]) = resolverConverter(ImportResolver.HttpResolver(headers = headers))

private [draft2] case class CacheConfig(concurrency: Int, size: Long, ttl: FiniteDuration)
}
Loading