Skip to content

Commit 791287d

Browse files
authored
Compensation behavior in nested saga (danielgerlag#263)
1 parent 0bc1631 commit 791287d

File tree

7 files changed

+270
-23
lines changed

7 files changed

+270
-23
lines changed

src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,9 @@ public CompensateHandler(IExecutionPointerFactory pointerFactory, ILifeCycleEven
2727

2828
public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer exceptionPointer, WorkflowStep exceptionStep, Exception exception, Queue<ExecutionPointer> bubbleUpQueue)
2929
{
30-
var scope = new Stack<string>(exceptionPointer.Scope);
30+
var scope = new Stack<string>(exceptionPointer.Scope.Reverse());
3131
scope.Push(exceptionPointer.Id);
32-
33-
exceptionPointer.Active = false;
34-
exceptionPointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
35-
exceptionPointer.Status = PointerStatus.Failed;
36-
32+
3733
while (scope.Any())
3834
{
3935
var pointerId = scope.Pop();
@@ -42,14 +38,19 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
4238

4339
var resume = true;
4440
var revert = false;
45-
46-
if (scope.Any())
41+
42+
var txnStack = new Stack<string>(scope.Reverse());
43+
while (txnStack.Count > 0)
4744
{
48-
var parentId = scope.Peek();
45+
var parentId = txnStack.Pop();
4946
var parentPointer = workflow.ExecutionPointers.FindById(parentId);
5047
var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId);
51-
resume = parentStep.ResumeChildrenAfterCompensation;
52-
revert = parentStep.RevertChildrenAfterCompensation;
48+
if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation))
49+
{
50+
resume = parentStep.ResumeChildrenAfterCompensation;
51+
revert = parentStep.RevertChildrenAfterCompensation;
52+
break;
53+
}
5354
}
5455

5556
if ((scopeStep.ErrorBehavior ?? WorkflowErrorHandling.Compensate) != WorkflowErrorHandling.Compensate)
@@ -58,10 +59,12 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
5859
continue;
5960
}
6061

62+
scopePointer.Active = false;
63+
scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
64+
scopePointer.Status = PointerStatus.Failed;
65+
6166
if (scopeStep.CompensationStepId.HasValue)
6267
{
63-
scopePointer.Active = false;
64-
scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
6568
scopePointer.Status = PointerStatus.Compensated;
6669

6770
var compensationPointer = _pointerFactory.BuildCompensationPointer(def, scopePointer, exceptionPointer, scopeStep.CompensationStepId.Value);

src/WorkflowCore/Services/ExecutionPointerFactory.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ namespace WorkflowCore.Services
99
{
1010
public class ExecutionPointerFactory : IExecutionPointerFactory
1111
{
12-
1312
public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def)
1413
{
1514
return new ExecutionPointer
@@ -41,8 +40,8 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe
4140
public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch)
4241
{
4342
var childPointerId = GenerateId();
44-
var childScope = new Stack<string>(pointer.Scope);
45-
childScope.Push(pointer.Id);
43+
var childScope = new List<string>(pointer.Scope);
44+
childScope.Insert(0, pointer.Id);
4645
pointer.Children.Add(childPointerId);
4746

4847
return new ExecutionPointer()

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@
1515
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1616
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
1717
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
18-
<Version>1.8.1</Version>
19-
<AssemblyVersion>1.8.1.0</AssemblyVersion>
20-
<FileVersion>1.8.1.0</FileVersion>
18+
<Version>1.8.2</Version>
19+
<AssemblyVersion>1.8.2.0</AssemblyVersion>
20+
<FileVersion>1.8.2.0</FileVersion>
2121
<PackageReleaseNotes></PackageReleaseNotes>
2222
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
2323
</PropertyGroup>

src/extensions/WorkflowCore.Users/Primitives/UserTask.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ private void SetupEscalations(IStepExecutionContext context)
8080
Id = Guid.NewGuid().ToString(),
8181
PredecessorId = context.ExecutionPointer.Id,
8282
StepId = esc.Id,
83-
StepName = esc.Name
83+
StepName = esc.Name,
84+
Status = PointerStatus.Pending,
85+
Scope = new List<string>(context.ExecutionPointer.Scope)
8486
});
8587
}
8688
}

src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
1919
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
2020
<Description>Provides extensions for Workflow Core to enable human workflows.</Description>
21-
<Version>1.8.0</Version>
22-
<AssemblyVersion>1.8.0.0</AssemblyVersion>
23-
<FileVersion>1.8.0.0</FileVersion>
21+
<Version>1.8.2</Version>
22+
<AssemblyVersion>1.8.2.0</AssemblyVersion>
23+
<FileVersion>1.8.2.0</FileVersion>
2424
</PropertyGroup>
2525

2626
<ItemGroup>
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
using Xunit;
7+
using FluentAssertions;
8+
using System.Linq;
9+
using WorkflowCore.Testing;
10+
11+
namespace WorkflowCore.IntegrationTests.Scenarios
12+
{
13+
public class NestedRetrySagaScenario : WorkflowTest<NestedRetrySagaScenario.Workflow, NestedRetrySagaScenario.MyDataClass>
14+
{
15+
public class MyDataClass
16+
{
17+
}
18+
19+
public class Workflow : IWorkflow<MyDataClass>
20+
{
21+
public static int Event1Fired;
22+
public static int Event2Fired;
23+
public static int Event3Fired;
24+
public static int TailEventFired;
25+
public static int Compensation1Fired;
26+
public static int Compensation2Fired;
27+
public static int Compensation3Fired;
28+
public static int Compensation4Fired;
29+
30+
public string Id => "NestedRetrySagaWorkflow";
31+
public int Version => 1;
32+
public void Build(IWorkflowBuilder<MyDataClass> builder)
33+
{
34+
builder
35+
.StartWith(context => ExecutionResult.Next())
36+
.CompensateWith(context => Compensation1Fired++)
37+
.Saga(x => x
38+
.StartWith(context => ExecutionResult.Next())
39+
.CompensateWith(context => Compensation2Fired++)
40+
.If(data => true)
41+
.Do(i => i
42+
.StartWith(context =>
43+
{
44+
Event1Fired++;
45+
if (Event1Fired < 3)
46+
throw new Exception();
47+
Event2Fired++;
48+
})
49+
.CompensateWith(context => Compensation3Fired++)
50+
)
51+
.Then(context => Event3Fired++)
52+
.CompensateWith(context => Compensation4Fired++)
53+
)
54+
.OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1))
55+
.Then(context => TailEventFired++);
56+
}
57+
}
58+
59+
public NestedRetrySagaScenario()
60+
{
61+
Setup();
62+
Workflow.Event1Fired = 0;
63+
Workflow.Event2Fired = 0;
64+
Workflow.Event3Fired = 0;
65+
Workflow.Compensation1Fired = 0;
66+
Workflow.Compensation2Fired = 0;
67+
Workflow.Compensation3Fired = 0;
68+
Workflow.Compensation4Fired = 0;
69+
Workflow.TailEventFired = 0;
70+
}
71+
72+
[Fact]
73+
public void Scenario()
74+
{
75+
var workflowId = StartWorkflow(new MyDataClass());
76+
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
77+
78+
GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
79+
UnhandledStepErrors.Count.Should().Be(2);
80+
Workflow.Event1Fired.Should().Be(3);
81+
Workflow.Event2Fired.Should().Be(1);
82+
Workflow.Event3Fired.Should().Be(1);
83+
Workflow.Compensation1Fired.Should().Be(0);
84+
Workflow.Compensation2Fired.Should().Be(2);
85+
Workflow.Compensation3Fired.Should().Be(2);
86+
Workflow.Compensation4Fired.Should().Be(0);
87+
Workflow.TailEventFired.Should().Be(1);
88+
}
89+
}
90+
}
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using WorkflowCore.Interface;
4+
using WorkflowCore.Models;
5+
using Xunit;
6+
using FluentAssertions;
7+
using System.Linq;
8+
using System.Threading.Tasks;
9+
using WorkflowCore.Testing;
10+
using WorkflowCore.Users.Models;
11+
12+
namespace WorkflowCore.IntegrationTests.Scenarios
13+
{
14+
public class RetrySagaWithUserTaskScenario : WorkflowTest<RetrySagaWithUserTaskScenario.Workflow, RetrySagaWithUserTaskScenario.MyDataClass>
15+
{
16+
public class MyDataClass
17+
{
18+
}
19+
20+
public class Workflow : IWorkflow<MyDataClass>
21+
{
22+
public static int Event1Fired;
23+
public static int Event2Fired;
24+
public static int Event3Fired;
25+
public static int TailEventFired;
26+
public static int Compensation1Fired;
27+
public static int Compensation2Fired;
28+
public static int Compensation3Fired;
29+
public static int Compensation4Fired;
30+
31+
public string Id => "RetrySagaWithUserTaskWorkflow";
32+
public int Version => 1;
33+
public void Build(IWorkflowBuilder<MyDataClass> builder)
34+
{
35+
builder
36+
.StartWith(context => ExecutionResult.Next())
37+
.CompensateWith(context => Compensation1Fired++)
38+
.Saga(x => x
39+
.StartWith(context => ExecutionResult.Next())
40+
.CompensateWith(context => Compensation2Fired++)
41+
.UserTask("prompt", data => "assigner")
42+
.WithOption("a", "Option A")
43+
.Do(wb => wb
44+
.StartWith(context => ExecutionResult.Next())
45+
.Then(context =>
46+
{
47+
Event1Fired++;
48+
if (Event1Fired < 3)
49+
throw new Exception();
50+
Event2Fired++;
51+
})
52+
.CompensateWith(context => Compensation3Fired++)
53+
.Then(context => Event3Fired++)
54+
.CompensateWith(context => Compensation4Fired++)
55+
)
56+
)
57+
.OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1))
58+
.Then(context => TailEventFired++);
59+
}
60+
}
61+
62+
public RetrySagaWithUserTaskScenario()
63+
{
64+
Setup();
65+
Workflow.Event1Fired = 0;
66+
Workflow.Event2Fired = 0;
67+
Workflow.Event3Fired = 0;
68+
Workflow.Compensation1Fired = 0;
69+
Workflow.Compensation2Fired = 0;
70+
Workflow.Compensation3Fired = 0;
71+
Workflow.Compensation4Fired = 0;
72+
Workflow.TailEventFired = 0;
73+
}
74+
75+
[Fact]
76+
public async Task Scenario()
77+
{
78+
var workflowId = StartWorkflow(new MyDataClass());
79+
var instance = await Host.PersistenceStore.GetWorkflowInstance(workflowId);
80+
81+
string oldUserOptionKey = null;
82+
for (var i = 0; i != 3; ++i)
83+
{
84+
var userOptions = await WaitForDifferentUserStepAsync(instance, TimeSpan.FromSeconds(1), oldUserOptionKey);
85+
userOptions.Count.Should().Be(1);
86+
87+
var userOption = userOptions.Single();
88+
userOption.Prompt.Should().Be("prompt");
89+
userOption.AssignedPrincipal.Should().Be("assigner");
90+
userOption.Options.Count.Should().Be(1);
91+
92+
var selectionOption = userOption.Options.Single();
93+
selectionOption.Key.Should().Be("Option A");
94+
selectionOption.Value.Should().Be("a");
95+
await Host.PublishUserAction(userOption.Key, string.Empty, selectionOption.Value);
96+
97+
oldUserOptionKey = userOption.Key;
98+
}
99+
100+
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
101+
102+
GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
103+
UnhandledStepErrors.Count.Should().Be(2);
104+
Workflow.Event1Fired.Should().Be(3);
105+
Workflow.Event2Fired.Should().Be(1);
106+
Workflow.Event3Fired.Should().Be(1);
107+
Workflow.Compensation1Fired.Should().Be(0);
108+
Workflow.Compensation2Fired.Should().Be(2);
109+
Workflow.Compensation3Fired.Should().Be(2);
110+
Workflow.Compensation4Fired.Should().Be(0);
111+
Workflow.TailEventFired.Should().Be(1);
112+
}
113+
114+
private static async Task<IReadOnlyCollection<OpenUserAction>> WaitForDifferentUserStepAsync(
115+
WorkflowInstance instance,
116+
TimeSpan timeout,
117+
string oldUserActionKey = null)
118+
{
119+
var startTime = DateTime.UtcNow;
120+
121+
while (DateTime.UtcNow - startTime <= timeout)
122+
{
123+
var userActions = await WaitForUserStepAsync(instance);
124+
125+
if (oldUserActionKey != null && userActions.Any(x => x.Key == oldUserActionKey))
126+
{
127+
continue;
128+
}
129+
130+
return userActions;
131+
}
132+
133+
return Array.Empty<OpenUserAction>();
134+
}
135+
136+
private static async Task<IReadOnlyCollection<OpenUserAction>> WaitForUserStepAsync(WorkflowInstance instance)
137+
{
138+
var delayCount = 200;
139+
var openActions = instance.GetOpenUserActions()?.ToList();
140+
while ((openActions?.Count ?? 0) == 0)
141+
{
142+
await Task.Delay(TimeSpan.FromMilliseconds(10));
143+
openActions = instance.GetOpenUserActions()?.ToList();
144+
if (delayCount-- == 0)
145+
{
146+
break;
147+
}
148+
}
149+
150+
return openActions;
151+
}
152+
}
153+
}

0 commit comments

Comments
 (0)