From 8db60604e80adef43fe445dc93759ccfa2d71ab7 Mon Sep 17 00:00:00 2001 From: Daniel Gerlag Date: Sun, 25 Mar 2018 11:31:47 -0700 Subject: [PATCH] Parallel sequences --- core/package.json | 2 +- core/spec/scenarios/parallel.spec.ts | 110 ++++++++++++++++++ core/src/fluent-builders.ts | 1 + .../fluent-builders/parallel-step-builder.ts | 29 +++++ .../fluent-builders/return-step-builder.ts | 1 - core/src/fluent-builders/step-builder.ts | 16 ++- core/src/fluent-builders/workflow-builder.ts | 5 + core/src/primitives.ts | 3 +- core/src/primitives/sequence.ts | 29 +++++ es2017-guide.md | 31 +++++ release-notes/2.1.md | 6 +- release-notes/2.2.md | 32 +++++ samples/node.js/javascript/11-parallel.js | 72 ++++++++++++ samples/node.js/javascript/package.json | 2 +- samples/node.js/typescript/11-parallel.ts | 72 ++++++++++++ typescript-guide.md | 30 +++++ 16 files changed, 433 insertions(+), 8 deletions(-) create mode 100644 core/spec/scenarios/parallel.spec.ts create mode 100644 core/src/fluent-builders/parallel-step-builder.ts create mode 100644 core/src/primitives/sequence.ts create mode 100644 release-notes/2.2.md create mode 100644 samples/node.js/javascript/11-parallel.js create mode 100644 samples/node.js/typescript/11-parallel.ts diff --git a/core/package.json b/core/package.json index ac54cb8..2234b28 100644 --- a/core/package.json +++ b/core/package.json @@ -1,6 +1,6 @@ { "name": "workflow-es", - "version": "2.1.0", + "version": "2.2.0", "description": "A lightweight workflow engine for Node.js", "main": "./build/src/index.js", "typings": "./build/src/index.d.ts", diff --git a/core/spec/scenarios/parallel.spec.ts b/core/spec/scenarios/parallel.spec.ts new file mode 100644 index 0000000..7600992 --- /dev/null +++ b/core/spec/scenarios/parallel.spec.ts @@ -0,0 +1,110 @@ +import { WorkflowHost, WorkflowBuilder, WorkflowStatus, WorkflowBase, StepBody, StepExecutionContext, ExecutionResult, WorkflowInstance, configureWorkflow, ConsoleLogger } from "../../src"; +import { MemoryPersistenceProvider } from "../../src/services/memory-persistence-provider"; +import { spinWaitCallback, spinWait } from "../helpers/spin-wait"; + + describe("parallel sequences", () => { + + let workflowScope = { + step0Ticker: 0, + step1Ticker: 0, + step2Ticker: 0, + step3Ticker: 0 + } + + class Step0 extends StepBody { + public run(context: StepExecutionContext): Promise { + workflowScope.step0Ticker++; + return ExecutionResult.next(); + } + } + + class Step1 extends StepBody { + public run(context: StepExecutionContext): Promise { + workflowScope.step1Ticker++; + return ExecutionResult.next(); + } + } + + class Step2 extends StepBody { + public run(context: StepExecutionContext): Promise { + workflowScope.step2Ticker++; + return ExecutionResult.next(); + } + } + + class Step3 extends StepBody { + public run(context: StepExecutionContext): Promise { + workflowScope.step3Ticker++; + return ExecutionResult.next(); + } + } + + class Parallel_Workflow implements WorkflowBase { + public id: string = "parallel-workflow"; + public version: number = 1; + + public build(builder: WorkflowBuilder) { + builder + .startWith(Step0) + .parallel() + .do(branch1 => branch1 + .startWith(Step1) + .waitFor("my-event", data => "0") + ) + .do(branch2 => branch2 + .startWith(Step2) + ) + .join() + .then(Step3); + } + } + + let workflowId = null; + let instance = null; + let persistence = new MemoryPersistenceProvider(); + let config = configureWorkflow(); + config.useLogger(new ConsoleLogger()); + config.usePersistence(persistence); + let host = config.getHost(); + jasmine.DEFAULT_TIMEOUT_INTERVAL = 20000; + + beforeAll(async (done) => { + host.registerWorkflow(Parallel_Workflow); + await host.start(); + + workflowId = await host.startWorkflow("parallel-workflow", 1, null); + + await spinWait(async () => { + let subs = await persistence.getSubscriptions("my-event", "0", new Date()); + return (subs.length > 0); + }); + + expect(workflowScope.step0Ticker).toBe(1); + expect(workflowScope.step1Ticker).toBe(1); + expect(workflowScope.step2Ticker).toBe(1); + expect(workflowScope.step3Ticker).toBe(0); + + await host.publishEvent("my-event", "0", "Pass", new Date()); + + spinWaitCallback(async () => { + instance = await persistence.getWorkflowInstance(workflowId); + return (instance.status != WorkflowStatus.Runnable); + }, done); + }); + + afterAll(() => { + host.stop(); + }); + + it("should be marked as complete", function() { + expect(instance.status).toBe(WorkflowStatus.Complete); + }); + + it("should have taken correct execution path", function() { + expect(workflowScope.step0Ticker).toBe(1); + expect(workflowScope.step1Ticker).toBe(1); + expect(workflowScope.step2Ticker).toBe(1); + expect(workflowScope.step3Ticker).toBe(1); + }); + + }); \ No newline at end of file diff --git a/core/src/fluent-builders.ts b/core/src/fluent-builders.ts index 6ecea42..94aaba3 100644 --- a/core/src/fluent-builders.ts +++ b/core/src/fluent-builders.ts @@ -1,2 +1,3 @@ export * from "./fluent-builders/workflow-builder"; export * from "./fluent-builders/step-builder"; +export * from "./fluent-builders/parallel-step-builder"; \ No newline at end of file diff --git a/core/src/fluent-builders/parallel-step-builder.ts b/core/src/fluent-builders/parallel-step-builder.ts new file mode 100644 index 0000000..365c2fa --- /dev/null +++ b/core/src/fluent-builders/parallel-step-builder.ts @@ -0,0 +1,29 @@ +import { StepBody, InlineStepBody } from "../abstractions"; +import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models"; +import { WorkflowBuilder } from "./workflow-builder"; +import { StepBuilder } from "./step-builder"; +import { Sequence } from "../primitives"; + +export class ParallelStepBuilder { + + private workflowBuilder: WorkflowBuilder; + private referenceBuilder: StepBuilder; + private step: WorkflowStep; + + constructor(workflowBuilder: WorkflowBuilder, step: WorkflowStep, refBuilder: StepBuilder) { + this.workflowBuilder = workflowBuilder; + this.step = step; + this.referenceBuilder = refBuilder; + } + + public do(builder: (then: WorkflowBuilder) => void): ParallelStepBuilder { + let lastStep = this.workflowBuilder.lastStep(); + builder(this.workflowBuilder); + this.step.children.push(lastStep + 1); //TODO: make more elegant + return this; + } + + public join(): StepBuilder { + return this.referenceBuilder; + } +} \ No newline at end of file diff --git a/core/src/fluent-builders/return-step-builder.ts b/core/src/fluent-builders/return-step-builder.ts index 9714517..04c0172 100644 --- a/core/src/fluent-builders/return-step-builder.ts +++ b/core/src/fluent-builders/return-step-builder.ts @@ -1,6 +1,5 @@ import { StepBody, InlineStepBody } from "../abstractions"; import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models"; -import { WaitFor, Foreach, While, If, Delay, Schedule } from "../primitives"; import { WorkflowBuilder } from "./workflow-builder"; import { StepBuilder } from "./step-builder"; diff --git a/core/src/fluent-builders/step-builder.ts b/core/src/fluent-builders/step-builder.ts index 88e9f46..0d343b0 100644 --- a/core/src/fluent-builders/step-builder.ts +++ b/core/src/fluent-builders/step-builder.ts @@ -1,9 +1,10 @@ import { StepBody, InlineStepBody } from "../abstractions"; import { WorkflowDefinition, WorkflowStepBase, WorkflowStep, StepOutcome, StepExecutionContext, ExecutionResult, WorkflowErrorHandling } from "../models"; -import { WaitFor, Foreach, While, If, Delay, Schedule } from "../primitives"; +import { WaitFor, Foreach, While, If, Delay, Schedule, Sequence } from "../primitives"; import { WorkflowBuilder } from "./workflow-builder"; import { ReturnStepBuilder } from "./return-step-builder"; import { OutcomeBuilder } from "./outcome-builder"; +import { ParallelStepBuilder } from "./parallel-step-builder"; export class StepBuilder { @@ -179,6 +180,19 @@ export class StepBuilder { return stepBuilder; } + public parallel(): ParallelStepBuilder { + var newStep = new WorkflowStep(); + newStep.body = Sequence; + this.workflowBuilder.addStep(newStep); + var newBuilder = new StepBuilder(this.workflowBuilder, newStep); + let stepBuilder = new ParallelStepBuilder(this.workflowBuilder, newStep, newBuilder); + let outcome = new StepOutcome(); + outcome.nextStep = newStep.id; + this.step.outcomes.push(outcome); + + return stepBuilder; + } + public schedule(interval: (data :TData) => number): ReturnStepBuilder { let newStep = new WorkflowStep(); newStep.body = Schedule; diff --git a/core/src/fluent-builders/workflow-builder.ts b/core/src/fluent-builders/workflow-builder.ts index bd10f84..4a5beac 100644 --- a/core/src/fluent-builders/workflow-builder.ts +++ b/core/src/fluent-builders/workflow-builder.ts @@ -42,4 +42,9 @@ export class WorkflowBuilder { public getUpstreamSteps(id: number): Array { return this.steps.filter(step => step.outcomes.filter(outcome => outcome.nextStep == id).length > 0); } + + public lastStep(): number { + let last = this.steps.reduce((prev, current) => prev.id > current.id ? prev : current); + return last.id; + } } \ No newline at end of file diff --git a/core/src/primitives.ts b/core/src/primitives.ts index 65fee62..4d06bb7 100644 --- a/core/src/primitives.ts +++ b/core/src/primitives.ts @@ -3,4 +3,5 @@ export * from "./primitives/while"; export * from "./primitives/if"; export * from "./primitives/delay"; export * from "./primitives/schedule"; -export * from "./primitives/waitFor"; \ No newline at end of file +export * from "./primitives/waitFor"; +export * from "./primitives/sequence"; \ No newline at end of file diff --git a/core/src/primitives/sequence.ts b/core/src/primitives/sequence.ts new file mode 100644 index 0000000..fce0bd5 --- /dev/null +++ b/core/src/primitives/sequence.ts @@ -0,0 +1,29 @@ +import { ExecutionResult, StepExecutionContext, ContainerData } from "../models"; +import { StepBody } from "../abstractions"; +import { ContainerStepBody } from "./container-step-body"; + +export class Sequence extends ContainerStepBody { + + public run(context: StepExecutionContext): Promise { + + if (!context.persistenceData) { + let containerData = new ContainerData(); + containerData.childrenActive = true; + return ExecutionResult.branch([null], containerData); + } + + if ((context.persistenceData as ContainerData).childrenActive) { + let complete: boolean = true; + + for(let childId of context.pointer.children) + complete = complete && this.isBranchComplete(context.workflow.executionPointers, childId); + + if (complete) + return ExecutionResult.next(); + else + return ExecutionResult.persist(context.persistenceData); + } + + return ExecutionResult.persist(context.persistenceData); + } +} \ No newline at end of file diff --git a/es2017-guide.md b/es2017-guide.md index a6d6597..e39d60e 100644 --- a/es2017-guide.md +++ b/es2017-guide.md @@ -240,6 +240,37 @@ build(builder) { } ``` +#### Parallel Sequences + +Run several sequences of steps in parallel + +```javascript +class Parallel_Workflow { + + build(builder) { + builder + .startWith(SayHello) + .parallel() + .do(branch1 => branch1 + .startWith(DoSomething) + .then(WaitForSomething) + .then(DoSomethingElse) + ) + .do(branch2 => branch2 + .startWith(DoSomething) + .then(DoSomethingElse) + ) + .do(branch3 => branch3 + .startWith(DoSomething) + .then(DoSomethingElse) + ) + .join() + .then(SayGoodbye); + } +} +``` + + ### Host The workflow host is the service responsible for executing workflows. It does this by polling the persistence provider for workflow instances that are ready to run, executes them and then passes them back to the persistence provider to by stored for the next time they are run. It is also responsible for publishing events to any workflows that may be waiting on one. diff --git a/release-notes/2.1.md b/release-notes/2.1.md index 99ca365..aadf491 100644 --- a/release-notes/2.1.md +++ b/release-notes/2.1.md @@ -1,8 +1,8 @@ # Workflow ES 2.1 -* Fixed typescript 2.4 issue +### Fixed typescript 2.4 issue -* Delay step +### Delay step Put the workflow to sleep for a specifed number of milliseconds. @@ -15,7 +15,7 @@ build(builder) { } ``` -* Schedule step +### Schedule step Schedule a sequence of steps to execution asynchronously in the future. diff --git a/release-notes/2.2.md b/release-notes/2.2.md new file mode 100644 index 0000000..2588c3d --- /dev/null +++ b/release-notes/2.2.md @@ -0,0 +1,32 @@ +# Workflow ES 2.2 + +### Parallel Sequences + +Run several sequences of steps in parallel + +```javascript +class Parallel_Workflow { + + build(builder) { + builder + .startWith(SayHello) + .parallel() + .do(branch1 => branch1 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 1") + .delay(data => 5000) + .then(DoSomething) + ) + .do(branch2 => branch2 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 2") + ) + .do(branch3 => branch3 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 3") + ) + .join() + .then(SayGoodbye); + } +} +``` diff --git a/samples/node.js/javascript/11-parallel.js b/samples/node.js/javascript/11-parallel.js new file mode 100644 index 0000000..01791e1 --- /dev/null +++ b/samples/node.js/javascript/11-parallel.js @@ -0,0 +1,72 @@ +const workflow_es = require("workflow-es"); +const workflow_mongo = require("workflow-es-mongodb"); + + +class SayHello extends workflow_es.StepBody { + run(context) { + console.log("Hello"); + return workflow_es.ExecutionResult.next(); + } +} + +class PrintMessage extends workflow_es.StepBody { + run(context) { + console.log(this.message); + return workflow_es.ExecutionResult.next(); + } +} + +class DoSomething extends workflow_es.StepBody { + run(context) { + console.log("Doing something..."); + return workflow_es.ExecutionResult.next(); + } +} + +class SayGoodbye extends workflow_es.StepBody { + run(context) { + console.log("Bye"); + return workflow_es.ExecutionResult.next(); + } +} + + +class Parallel_Workflow { + constructor() { + this.id = "parallel-sample"; + this.version = 1; + } + build(builder) { + builder + .startWith(SayHello) + .parallel() + .do(branch1 => branch1 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 1") + .delay(data => 5000) + .then(DoSomething) + ) + .do(branch2 => branch2 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 2") + ) + .do(branch3 => branch3 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 3") + ) + .join() + .then(SayGoodbye); + } +} + +async function main() { + var config = workflow_es.configureWorkflow(); + var host = config.getHost(); + + host.registerWorkflow(Parallel_Workflow); + await host.start(); + let id = await host.startWorkflow("parallel-sample", 1); + console.log("Started workflow: " + id); +} + +main(); \ No newline at end of file diff --git a/samples/node.js/javascript/package.json b/samples/node.js/javascript/package.json index 06222b6..0d76b8e 100644 --- a/samples/node.js/javascript/package.json +++ b/samples/node.js/javascript/package.json @@ -3,7 +3,7 @@ "version": "1.0.0", "description": "Workflow samples for node.js", "dependencies": { - "workflow-es": "^2.0.1", + "workflow-es": "^2.2.0", "workflow-es-mongodb": "^2.0.0", "mongodb": "^2.2.11" } diff --git a/samples/node.js/typescript/11-parallel.ts b/samples/node.js/typescript/11-parallel.ts new file mode 100644 index 0000000..e7e1f57 --- /dev/null +++ b/samples/node.js/typescript/11-parallel.ts @@ -0,0 +1,72 @@ +import { WorkflowHost, WorkflowBuilder, WorkflowBase, StepBody, StepExecutionContext, ExecutionResult, WorkflowInstance, configureWorkflow, ConsoleLogger } from "workflow-es"; +import { MongoDBPersistence } from "workflow-es-mongodb"; + +class SayHello extends StepBody { + run(context: StepExecutionContext): Promise { + console.log("Hello"); + return ExecutionResult.next(); + } +} + +class PrintMessage extends StepBody { + + public message: string; + + public run(context: StepExecutionContext): Promise { + console.log(this.message); + return ExecutionResult.next(); + } +} +class DoSomething extends StepBody { + run(context: StepExecutionContext): Promise { + console.log("Doing something..."); + return ExecutionResult.next(); + } +} + +class SayGoodbye extends StepBody { + run(context: StepExecutionContext): Promise { + console.log("Bye"); + return ExecutionResult.next(); + } +} + +class Parallel_Workflow implements WorkflowBase { + public id: string = "parallel-sample"; + public version: number = 1; + + public build(builder: WorkflowBuilder) { + builder + .startWith(SayHello) + .parallel() + .do(branch1 => branch1 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 1") + .delay(data => 5000) + .then(DoSomething) + ) + .do(branch2 => branch2 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 2") + ) + .do(branch3 => branch3 + .startWith(PrintMessage) + .input((step, data) => step.message = "Running in branch 3") + ) + .join() + .then(SayGoodbye); + } +} + +async function main() { + var config = configureWorkflow(); + //config.useLogger(new ConsoleLogger()); + var host = config.getHost(); + + host.registerWorkflow(Parallel_Workflow); + await host.start(); + let id = await host.startWorkflow("parallel-sample", 1, null); + console.log("Started workflow: " + id); +} + +main(); \ No newline at end of file diff --git a/typescript-guide.md b/typescript-guide.md index e77102c..0efa072 100644 --- a/typescript-guide.md +++ b/typescript-guide.md @@ -244,6 +244,36 @@ build(builder) { } ``` +#### Parallel Sequences + +Run several sequences of steps in parallel + +```javascript +class Parallel_Workflow { + + build(builder) { + builder + .startWith(SayHello) + .parallel() + .do(branch1 => branch1 + .startWith(DoSomething) + .then(WaitForSomething) + .then(DoSomethingElse) + ) + .do(branch2 => branch2 + .startWith(DoSomething) + .then(DoSomethingElse) + ) + .do(branch3 => branch3 + .startWith(DoSomething) + .then(DoSomethingElse) + ) + .join() + .then(SayGoodbye); + } +} +``` + ### Host The workflow host is the service responsible for executing workflows. It does this by polling the persistence provider for workflow instances that are ready to run, executes them and then passes them back to the persistence provider to by stored for the next time they are run. It is also responsible for publishing events to any workflows that may be waiting on one.