Skip to content

Commit

Permalink
Merge pull request #1581 from twitter/oscar/minor-fixes
Browse files Browse the repository at this point in the history
Use Caching for FlowDefExecution
  • Loading branch information
johnynek authored Jul 21, 2016
2 parents ede6974 + ac9e126 commit 868abf5
Showing 1 changed file with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,14 +591,17 @@ object Execution {
* This allows you to run any cascading flowDef as an Execution.
*/
private case class FlowDefExecution(result: (Config, Mode) => FlowDef) extends Execution[Unit] {
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) =
Trampoline(
protected def runStats(conf: Config, mode: Mode, cache: EvalCache)(implicit cec: ConcurrentExecutionContext) = {
lazy val future =
for {
flowDef <- toFuture(Try(result(conf, mode)))
_ = FlowStateMap.validateSources(flowDef, mode)
(id, jobStats) <- cache.runFlowDef(conf, mode, flowDef)
_ = FlowStateMap.clear(flowDef)
} yield ((), Map(id -> ExecutionCounters.fromJobStats(jobStats))))
} yield ((), Map(id -> ExecutionCounters.fromJobStats(jobStats)))

Trampoline(cache.getOrElseInsert(conf, this, future))
}
}

/*
Expand Down

0 comments on commit 868abf5

Please sign in to comment.