Skip to content

Commit ce1d413

Browse files
committed
compensation feature
1 parent 7789d61 commit ce1d413

File tree

11 files changed

+327
-27
lines changed

11 files changed

+327
-27
lines changed

WorkflowCore.sln

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
Microsoft Visual Studio Solution File, Format Version 12.00
33
# Visual Studio 15
4-
VisualStudioVersion = 15.0.27004.2008
4+
VisualStudioVersion = 15.0.27130.2010
55
MinimumVisualStudioVersion = 10.0.40219.1
66
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{EF47161E-E399-451C-BDE8-E92AAD3BD761}"
77
EndProject
@@ -106,6 +106,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample16", "sr
106106
EndProject
107107
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ScratchPad", "test\ScratchPad\ScratchPad.csproj", "{6396453F-4D0E-4CD4-BC89-87E8970F2A80}"
108108
EndProject
109+
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Sample17", "src\samples\WorkflowCore.Sample17\WorkflowCore.Sample17.csproj", "{42F475BC-95F4-42E1-8CCD-7B9C27487E33}"
110+
EndProject
109111
Global
110112
GlobalSection(SolutionConfigurationPlatforms) = preSolution
111113
Debug|Any CPU = Debug|Any CPU
@@ -268,6 +270,10 @@ Global
268270
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Debug|Any CPU.Build.0 = Debug|Any CPU
269271
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.ActiveCfg = Release|Any CPU
270272
{6396453F-4D0E-4CD4-BC89-87E8970F2A80}.Release|Any CPU.Build.0 = Release|Any CPU
273+
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
274+
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Debug|Any CPU.Build.0 = Debug|Any CPU
275+
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.ActiveCfg = Release|Any CPU
276+
{42F475BC-95F4-42E1-8CCD-7B9C27487E33}.Release|Any CPU.Build.0 = Release|Any CPU
271277
EndGlobalSection
272278
GlobalSection(SolutionProperties) = preSolution
273279
HideSolutionNode = FALSE
@@ -315,6 +321,7 @@ Global
315321
{9B7811AC-68D6-4D19-B1E9-65423393ED83} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
316322
{0C9617A9-C8B7-45F6-A54A-261A23AC881B} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
317323
{6396453F-4D0E-4CD4-BC89-87E8970F2A80} = {E6CEAD8D-F565-471E-A0DC-676F54EAEDEB}
324+
{42F475BC-95F4-42E1-8CCD-7B9C27487E33} = {5080DB09-CBE8-4C45-9957-C3BB7651755E}
318325
EndGlobalSection
319326
GlobalSection(ExtensibilityGlobals) = postSolution
320327
SolutionGuid = {DC0FA8D3-6449-4FDA-BB46-ECF58FAD23B4}

src/WorkflowCore/Interface/IStepBuilder.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,12 @@ public interface IStepBuilder<TData, TStepBody>
163163
/// <returns></returns>
164164
IParallelStepBuilder<TData, Sequence> Parallel();
165165

166+
/// <summary>
167+
/// Execute a sequence of steps in a container
168+
/// </summary>
169+
/// <returns></returns>
170+
IContainerStepBuilder<TData, Sequence, Sequence> Sequence();
171+
166172
/// <summary>
167173
/// Schedule a block of steps to execute in parallel sometime in the future
168174
/// </summary>
@@ -177,5 +183,28 @@ public interface IStepBuilder<TData, TStepBody>
177183
/// <param name="until">Resolves a condition to stop the recurring task</param>
178184
/// <returns></returns>
179185
IContainerStepBuilder<TData, Recur, TStepBody> Recur(Expression<Func<TData, TimeSpan>> interval, Expression<Func<TData, bool>> until);
186+
187+
188+
/// <summary>
189+
/// Undo step if unhandled exception is thrown by this step
190+
/// </summary>
191+
/// <typeparam name="TStep">The type of the step to execute</typeparam>
192+
/// <param name="stepSetup">Configure additional parameters for this step</param>
193+
/// <returns></returns>
194+
IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody;
195+
196+
/// <summary>
197+
/// Undo step if unhandled exception is thrown by this step
198+
/// </summary>
199+
/// <param name="body"></param>
200+
/// <returns></returns>
201+
IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body);
202+
203+
/// <summary>
204+
/// Undo step if unhandled exception is thrown by this step
205+
/// </summary>
206+
/// <param name="body"></param>
207+
/// <returns></returns>
208+
IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body);
180209
}
181210
}

src/WorkflowCore/Models/ExecutionPointer.cs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,20 @@ public class ExecutionPointer
4040
public string PredecessorId { get; set; }
4141

4242
public object Outcome { get; set; }
43+
44+
public PointerStatus Status { get; set; }
45+
46+
public List<string> SuccessorIds { get; set; } = new List<string>();
47+
}
48+
49+
public enum PointerStatus
50+
{
51+
Legacy = 0,
52+
Pending = 1,
53+
Running = 2,
54+
Complete = 3,
55+
Sleeping = 4,
56+
WaitingForEvent = 5,
57+
Failed = 6
4358
}
4459
}

src/WorkflowCore/Models/WorkflowStep.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ public abstract class WorkflowStep
2525

2626
public WorkflowErrorHandling? ErrorBehavior { get; set; }
2727

28-
public TimeSpan? RetryInterval { get; set; }
28+
public TimeSpan? RetryInterval { get; set; }
29+
30+
public int? CompensationStepId { get; set; }
2931

3032
public virtual ExecutionPipelineDirective InitForExecution(WorkflowExecutorResult executorResult, WorkflowDefinition defintion, WorkflowInstance workflow, ExecutionPointer executionPointer)
3133
{

src/WorkflowCore/Services/FluentBuilders/StepBuilder.cs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,15 @@ public IContainerStepBuilder<TData, When, OutcomeSwitch> When(Expression<Func<TD
333333
return stepBuilder;
334334
}
335335

336+
public IContainerStepBuilder<TData, Sequence, Sequence> Sequence()
337+
{
338+
var newStep = new WorkflowStep<Sequence>();
339+
WorkflowBuilder.AddStep(newStep);
340+
var stepBuilder = new StepBuilder<TData, Sequence>(WorkflowBuilder, newStep);
341+
Step.Outcomes.Add(new StepOutcome() { NextStep = newStep.Id });
342+
return stepBuilder;
343+
}
344+
336345
public IParallelStepBuilder<TData, Sequence> Parallel()
337346
{
338347
var newStep = new WorkflowStep<Sequence>();
@@ -387,5 +396,42 @@ public IStepBuilder<TData, TStepBody> Do(Action<IWorkflowBuilder<TData>> builder
387396

388397
return this;
389398
}
399+
400+
public IStepBuilder<TData, TStepBody> CompensateWith<TStep>(Action<IStepBuilder<TData, TStep>> stepSetup = null) where TStep : IStepBody
401+
{
402+
WorkflowStep<TStep> newStep = new WorkflowStep<TStep>();
403+
WorkflowBuilder.AddStep(newStep);
404+
var stepBuilder = new StepBuilder<TData, TStep>(WorkflowBuilder, newStep);
405+
406+
if (stepSetup != null)
407+
{
408+
stepSetup.Invoke(stepBuilder);
409+
}
410+
411+
newStep.Name = newStep.Name ?? typeof(TStep).Name;
412+
Step.CompensationStepId = newStep.Id;
413+
414+
return this;
415+
}
416+
417+
public IStepBuilder<TData, TStepBody> CompensateWith(Func<IStepExecutionContext, ExecutionResult> body)
418+
{
419+
WorkflowStepInline newStep = new WorkflowStepInline();
420+
newStep.Body = body;
421+
WorkflowBuilder.AddStep(newStep);
422+
var stepBuilder = new StepBuilder<TData, InlineStepBody>(WorkflowBuilder, newStep);
423+
Step.CompensationStepId = newStep.Id;
424+
return this;
425+
}
426+
427+
public IStepBuilder<TData, TStepBody> CompensateWith(Action<IStepExecutionContext> body)
428+
{
429+
var newStep = new WorkflowStep<ActionStepBody>();
430+
WorkflowBuilder.AddStep(newStep);
431+
var stepBuilder = new StepBuilder<TData, ActionStepBody>(WorkflowBuilder, newStep);
432+
stepBuilder.Input(x => x.Body, x => body);
433+
Step.CompensationStepId = newStep.Id;
434+
return this;
435+
}
390436
}
391437
}

src/WorkflowCore/Services/WorkflowController.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ public async Task<string> StartWorkflow<TData>(string workflowId, int? version,
7575
Id = Guid.NewGuid().ToString(),
7676
StepId = 0,
7777
Active = true,
78+
Status = PointerStatus.Pending,
7879
StepName = Enumerable.First<WorkflowStep>(def.Steps, x => x.Id == 0).Name
7980
});
8081

src/WorkflowCore/Services/WorkflowExecutor.cs

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Wor
4848
{
4949
try
5050
{
51+
pointer.Status = PointerStatus.Running;
5152
switch (step.InitForExecution(wfResult, def, workflow, pointer))
5253
{
5354
case ExecutionPipelineDirective.Defer:
@@ -114,30 +115,7 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Wor
114115
}
115116
catch (Exception ex)
116117
{
117-
pointer.RetryCount++;
118-
_logger.LogError("Workflow {0} raised error on step {1} Message: {2}", workflow.Id, pointer.StepId, ex.Message);
119-
wfResult.Errors.Add(new ExecutionError()
120-
{
121-
WorkflowId = workflow.Id,
122-
ExecutionPointerId = pointer.Id,
123-
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
124-
Message = ex.Message
125-
});
126-
127-
switch (step.ErrorBehavior ?? def.DefaultErrorBehavior)
128-
{
129-
case WorkflowErrorHandling.Retry:
130-
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? options.ErrorRetryInterval);
131-
break;
132-
case WorkflowErrorHandling.Suspend:
133-
workflow.Status = WorkflowStatus.Suspended;
134-
break;
135-
case WorkflowErrorHandling.Terminate:
136-
workflow.Status = WorkflowStatus.Terminated;
137-
break;
138-
}
139-
140-
Host.ReportStepError(workflow, step, ex);
118+
HandleStepException(workflow, options, wfResult, def, pointer, step, ex);
141119
}
142120
}
143121
else
@@ -160,6 +138,59 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow, Wor
160138
return wfResult;
161139
}
162140

141+
private void HandleStepException(WorkflowInstance workflow, WorkflowOptions options, WorkflowExecutorResult wfResult, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, Exception ex)
142+
{
143+
pointer.RetryCount++;
144+
pointer.Status = PointerStatus.Failed;
145+
_logger.LogError("Workflow {0} raised error on step {1} Message: {2}", workflow.Id, pointer.StepId, ex.Message);
146+
wfResult.Errors.Add(new ExecutionError()
147+
{
148+
WorkflowId = workflow.Id,
149+
ExecutionPointerId = pointer.Id,
150+
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
151+
Message = ex.Message
152+
});
153+
154+
if (step.CompensationStepId.HasValue)
155+
{
156+
pointer.Active = false;
157+
pointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
158+
pointer.Status = PointerStatus.Failed;
159+
160+
var nextId = Guid.NewGuid().ToString();
161+
workflow.ExecutionPointers.Add(new ExecutionPointer()
162+
{
163+
Id = nextId,
164+
PredecessorId = pointer.Id,
165+
StepId = step.CompensationStepId.Value,
166+
Active = true,
167+
ContextItem = pointer.ContextItem,
168+
Status = PointerStatus.Pending,
169+
StepName = def.Steps.First(x => x.Id == step.CompensationStepId.Value).Name
170+
});
171+
172+
pointer.SuccessorIds.Add(nextId);
173+
}
174+
else
175+
{
176+
177+
switch (step.ErrorBehavior ?? def.DefaultErrorBehavior)
178+
{
179+
case WorkflowErrorHandling.Retry:
180+
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(step.RetryInterval ?? def.DefaultErrorRetryInterval ?? options.ErrorRetryInterval);
181+
break;
182+
case WorkflowErrorHandling.Suspend:
183+
workflow.Status = WorkflowStatus.Suspended;
184+
break;
185+
case WorkflowErrorHandling.Terminate:
186+
workflow.Status = WorkflowStatus.Terminated;
187+
break;
188+
}
189+
}
190+
191+
Host.ReportStepError(workflow, step, ex);
192+
}
193+
163194
private void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer pointer, WorkflowStep step, ExecutionResult result, WorkflowExecutorResult workflowResult)
164195
{
165196
//TODO: refactor this into it's own class
@@ -168,13 +199,15 @@ private void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinitio
168199
if (result.SleepFor.HasValue)
169200
{
170201
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(result.SleepFor.Value);
202+
pointer.Status = PointerStatus.Sleeping;
171203
}
172204

173205
if (!string.IsNullOrEmpty(result.EventName))
174206
{
175207
pointer.EventName = result.EventName;
176208
pointer.EventKey = result.EventKey;
177209
pointer.Active = false;
210+
pointer.Status = PointerStatus.WaitingForEvent;
178211

179212
workflowResult.Subscriptions.Add(new EventSubscription()
180213
{
@@ -190,18 +223,23 @@ private void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinitio
190223
{
191224
pointer.Active = false;
192225
pointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
226+
pointer.Status = PointerStatus.Complete;
193227

194228
foreach (var outcomeTarget in step.Outcomes.Where(x => object.Equals(x.GetValue(workflow.Data), result.OutcomeValue) || x.GetValue(workflow.Data) == null))
195229
{
230+
var nextId = Guid.NewGuid().ToString();
196231
workflow.ExecutionPointers.Add(new ExecutionPointer()
197232
{
198-
Id = Guid.NewGuid().ToString(),
233+
Id = nextId,
199234
PredecessorId = pointer.Id,
200235
StepId = outcomeTarget.NextStep,
201236
Active = true,
202237
ContextItem = pointer.ContextItem,
238+
Status = PointerStatus.Pending,
203239
StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name
204240
});
241+
242+
pointer.SuccessorIds.Add(nextId);
205243
}
206244
}
207245
else
@@ -218,6 +256,7 @@ private void ProcessExecutionResult(WorkflowInstance workflow, WorkflowDefinitio
218256
StepId = childDefId,
219257
Active = true,
220258
ContextItem = branch,
259+
Status = PointerStatus.Pending,
221260
StepName = def.Steps.First(x => x.Id == childDefId).Name
222261
});
223262

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
6+
namespace WorkflowCore.Sample17
7+
{
8+
class CompensatingWorkflow : IWorkflow
9+
{
10+
public string Id => "compensate-sample";
11+
public int Version => 1;
12+
13+
public void Build(IWorkflowBuilder<object> builder)
14+
{
15+
builder
16+
.StartWith(context => Console.WriteLine("Hello"))
17+
.Sequence().Do(seq => seq
18+
.StartWith(context => Console.WriteLine("1"))
19+
.Then(context =>
20+
{
21+
Console.WriteLine("2");
22+
//throw new Exception("boo");
23+
Console.WriteLine("2.5");
24+
})
25+
.CompensateWith(context => Console.WriteLine("fail"))
26+
.Then(context => Console.WriteLine("3")))
27+
//.OnError(Models.WorkflowErrorHandling.)
28+
.Then(context => Console.WriteLine("end"));
29+
}
30+
}
31+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
using System;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using WorkflowCore.Interface;
4+
5+
namespace WorkflowCore.Sample17
6+
{
7+
class Program
8+
{
9+
static void Main(string[] args)
10+
{
11+
var serviceProvider = ConfigureServices();
12+
13+
//start the workflow host
14+
var host = serviceProvider.GetService<IWorkflowHost>();
15+
host.RegisterWorkflow<CompensatingWorkflow>();
16+
host.Start();
17+
18+
Console.WriteLine("Starting workflow...");
19+
var workflowId = host.StartWorkflow("compensate-sample").Result;
20+
21+
Console.ReadLine();
22+
host.Stop();
23+
}
24+
25+
private static IServiceProvider ConfigureServices()
26+
{
27+
//setup dependency injection
28+
IServiceCollection services = new ServiceCollection();
29+
services.AddLogging();
30+
services.AddWorkflow();
31+
32+
var serviceProvider = services.BuildServiceProvider();
33+
return serviceProvider;
34+
}
35+
}
36+
}

0 commit comments

Comments
 (0)