Skip to content

Compensation behavior in nested saga #263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ public CompensateHandler(IExecutionPointerFactory pointerFactory, ILifeCycleEven

public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionPointer exceptionPointer, WorkflowStep exceptionStep, Exception exception, Queue<ExecutionPointer> bubbleUpQueue)
{
var scope = new Stack<string>(exceptionPointer.Scope);
var scope = new Stack<string>(exceptionPointer.Scope.Reverse());
scope.Push(exceptionPointer.Id);

exceptionPointer.Active = false;
exceptionPointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
exceptionPointer.Status = PointerStatus.Failed;


while (scope.Any())
{
var pointerId = scope.Pop();
Expand All @@ -42,14 +38,19 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP

var resume = true;
var revert = false;

if (scope.Any())

var txnStack = new Stack<string>(scope.Reverse());
while (txnStack.Count > 0)
{
var parentId = scope.Peek();
var parentId = txnStack.Pop();
var parentPointer = workflow.ExecutionPointers.FindById(parentId);
var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId);
resume = parentStep.ResumeChildrenAfterCompensation;
revert = parentStep.RevertChildrenAfterCompensation;
if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation))
{
resume = parentStep.ResumeChildrenAfterCompensation;
revert = parentStep.RevertChildrenAfterCompensation;
break;
}
}

if ((scopeStep.ErrorBehavior ?? WorkflowErrorHandling.Compensate) != WorkflowErrorHandling.Compensate)
Expand All @@ -58,10 +59,12 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
continue;
}

scopePointer.Active = false;
scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
scopePointer.Status = PointerStatus.Failed;

if (scopeStep.CompensationStepId.HasValue)
{
scopePointer.Active = false;
scopePointer.EndTime = _datetimeProvider.Now.ToUniversalTime();
scopePointer.Status = PointerStatus.Compensated;

var compensationPointer = _pointerFactory.BuildCompensationPointer(def, scopePointer, exceptionPointer, scopeStep.CompensationStepId.Value);
Expand Down
5 changes: 2 additions & 3 deletions src/WorkflowCore/Services/ExecutionPointerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ namespace WorkflowCore.Services
{
public class ExecutionPointerFactory : IExecutionPointerFactory
{

public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def)
{
return new ExecutionPointer
Expand Down Expand Up @@ -41,8 +40,8 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe
public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPointer pointer, int childDefinitionId, object branch)
{
var childPointerId = GenerateId();
var childScope = new Stack<string>(pointer.Scope);
childScope.Push(pointer.Id);
var childScope = new List<string>(pointer.Scope);
childScope.Insert(0, pointer.Id);
pointer.Children.Add(childPointerId);

return new ExecutionPointer()
Expand Down
6 changes: 3 additions & 3 deletions src/WorkflowCore/WorkflowCore.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<Description>Workflow Core is a light weight workflow engine targeting .NET Standard.</Description>
<Version>1.8.1</Version>
<AssemblyVersion>1.8.1.0</AssemblyVersion>
<FileVersion>1.8.1.0</FileVersion>
<Version>1.8.2</Version>
<AssemblyVersion>1.8.2.0</AssemblyVersion>
<FileVersion>1.8.2.0</FileVersion>
<PackageReleaseNotes></PackageReleaseNotes>
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
</PropertyGroup>
Expand Down
4 changes: 3 additions & 1 deletion src/extensions/WorkflowCore.Users/Primitives/UserTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ private void SetupEscalations(IStepExecutionContext context)
Id = Guid.NewGuid().ToString(),
PredecessorId = context.ExecutionPointer.Id,
StepId = esc.Id,
StepName = esc.Name
StepName = esc.Name,
Status = PointerStatus.Pending,
Scope = new List<string>(context.ExecutionPointer.Scope)
});
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/extensions/WorkflowCore.Users/WorkflowCore.Users.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
<GenerateAssemblyCompanyAttribute>false</GenerateAssemblyCompanyAttribute>
<GenerateAssemblyProductAttribute>false</GenerateAssemblyProductAttribute>
<Description>Provides extensions for Workflow Core to enable human workflows.</Description>
<Version>1.8.0</Version>
<AssemblyVersion>1.8.0.0</AssemblyVersion>
<FileVersion>1.8.0.0</FileVersion>
<Version>1.8.2</Version>
<AssemblyVersion>1.8.2.0</AssemblyVersion>
<FileVersion>1.8.2.0</FileVersion>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using Xunit;
using FluentAssertions;
using System.Linq;
using WorkflowCore.Testing;

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class NestedRetrySagaScenario : WorkflowTest<NestedRetrySagaScenario.Workflow, NestedRetrySagaScenario.MyDataClass>
{
public class MyDataClass
{
}

public class Workflow : IWorkflow<MyDataClass>
{
public static int Event1Fired;
public static int Event2Fired;
public static int Event3Fired;
public static int TailEventFired;
public static int Compensation1Fired;
public static int Compensation2Fired;
public static int Compensation3Fired;
public static int Compensation4Fired;

public string Id => "NestedRetrySagaWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyDataClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.CompensateWith(context => Compensation1Fired++)
.Saga(x => x
.StartWith(context => ExecutionResult.Next())
.CompensateWith(context => Compensation2Fired++)
.If(data => true)
.Do(i => i
.StartWith(context =>
{
Event1Fired++;
if (Event1Fired < 3)
throw new Exception();
Event2Fired++;
})
.CompensateWith(context => Compensation3Fired++)
)
.Then(context => Event3Fired++)
.CompensateWith(context => Compensation4Fired++)
)
.OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1))
.Then(context => TailEventFired++);
}
}

public NestedRetrySagaScenario()
{
Setup();
Workflow.Event1Fired = 0;
Workflow.Event2Fired = 0;
Workflow.Event3Fired = 0;
Workflow.Compensation1Fired = 0;
Workflow.Compensation2Fired = 0;
Workflow.Compensation3Fired = 0;
Workflow.Compensation4Fired = 0;
Workflow.TailEventFired = 0;
}

[Fact]
public void Scenario()
{
var workflowId = StartWorkflow(new MyDataClass());
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(2);
Workflow.Event1Fired.Should().Be(3);
Workflow.Event2Fired.Should().Be(1);
Workflow.Event3Fired.Should().Be(1);
Workflow.Compensation1Fired.Should().Be(0);
Workflow.Compensation2Fired.Should().Be(2);
Workflow.Compensation3Fired.Should().Be(2);
Workflow.Compensation4Fired.Should().Be(0);
Workflow.TailEventFired.Should().Be(1);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
using System;
using System.Collections.Generic;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using Xunit;
using FluentAssertions;
using System.Linq;
using System.Threading.Tasks;
using WorkflowCore.Testing;
using WorkflowCore.Users.Models;

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class RetrySagaWithUserTaskScenario : WorkflowTest<RetrySagaWithUserTaskScenario.Workflow, RetrySagaWithUserTaskScenario.MyDataClass>
{
public class MyDataClass
{
}

public class Workflow : IWorkflow<MyDataClass>
{
public static int Event1Fired;
public static int Event2Fired;
public static int Event3Fired;
public static int TailEventFired;
public static int Compensation1Fired;
public static int Compensation2Fired;
public static int Compensation3Fired;
public static int Compensation4Fired;

public string Id => "RetrySagaWithUserTaskWorkflow";
public int Version => 1;
public void Build(IWorkflowBuilder<MyDataClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.CompensateWith(context => Compensation1Fired++)
.Saga(x => x
.StartWith(context => ExecutionResult.Next())
.CompensateWith(context => Compensation2Fired++)
.UserTask("prompt", data => "assigner")
.WithOption("a", "Option A")
.Do(wb => wb
.StartWith(context => ExecutionResult.Next())
.Then(context =>
{
Event1Fired++;
if (Event1Fired < 3)
throw new Exception();
Event2Fired++;
})
.CompensateWith(context => Compensation3Fired++)
.Then(context => Event3Fired++)
.CompensateWith(context => Compensation4Fired++)
)
)
.OnError(WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(1))
.Then(context => TailEventFired++);
}
}

public RetrySagaWithUserTaskScenario()
{
Setup();
Workflow.Event1Fired = 0;
Workflow.Event2Fired = 0;
Workflow.Event3Fired = 0;
Workflow.Compensation1Fired = 0;
Workflow.Compensation2Fired = 0;
Workflow.Compensation3Fired = 0;
Workflow.Compensation4Fired = 0;
Workflow.TailEventFired = 0;
}

[Fact]
public async Task Scenario()
{
var workflowId = StartWorkflow(new MyDataClass());
var instance = await Host.PersistenceStore.GetWorkflowInstance(workflowId);

string oldUserOptionKey = null;
for (var i = 0; i != 3; ++i)
{
var userOptions = await WaitForDifferentUserStepAsync(instance, TimeSpan.FromSeconds(1), oldUserOptionKey);
userOptions.Count.Should().Be(1);

var userOption = userOptions.Single();
userOption.Prompt.Should().Be("prompt");
userOption.AssignedPrincipal.Should().Be("assigner");
userOption.Options.Count.Should().Be(1);

var selectionOption = userOption.Options.Single();
selectionOption.Key.Should().Be("Option A");
selectionOption.Value.Should().Be("a");
await Host.PublishUserAction(userOption.Key, string.Empty, selectionOption.Value);

oldUserOptionKey = userOption.Key;
}

WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(2);
Workflow.Event1Fired.Should().Be(3);
Workflow.Event2Fired.Should().Be(1);
Workflow.Event3Fired.Should().Be(1);
Workflow.Compensation1Fired.Should().Be(0);
Workflow.Compensation2Fired.Should().Be(2);
Workflow.Compensation3Fired.Should().Be(2);
Workflow.Compensation4Fired.Should().Be(0);
Workflow.TailEventFired.Should().Be(1);
}

private static async Task<IReadOnlyCollection<OpenUserAction>> WaitForDifferentUserStepAsync(
WorkflowInstance instance,
TimeSpan timeout,
string oldUserActionKey = null)
{
var startTime = DateTime.UtcNow;

while (DateTime.UtcNow - startTime <= timeout)
{
var userActions = await WaitForUserStepAsync(instance);

if (oldUserActionKey != null && userActions.Any(x => x.Key == oldUserActionKey))
{
continue;
}

return userActions;
}

return Array.Empty<OpenUserAction>();
}

private static async Task<IReadOnlyCollection<OpenUserAction>> WaitForUserStepAsync(WorkflowInstance instance)
{
var delayCount = 200;
var openActions = instance.GetOpenUserActions()?.ToList();
while ((openActions?.Count ?? 0) == 0)
{
await Task.Delay(TimeSpan.FromMilliseconds(10));
openActions = instance.GetOpenUserActions()?.ToList();
if (delayCount-- == 0)
{
break;
}
}

return openActions;
}
}
}