Skip to content

Commit b7584e5

Browse files
authored
Event order (danielgerlag#300)
1 parent 49b4621 commit b7584e5

File tree

2 files changed

+98
-9
lines changed

2 files changed

+98
-9
lines changed

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Linq;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -39,17 +40,17 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
3940
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
4041
{
4142
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
42-
var success = true;
43+
var toQueue = new List<string>();
44+
var complete = true;
4345

4446
foreach (var sub in subs.ToList())
45-
{
46-
success = success && await SeedSubscription(evt, sub, cancellationToken);
47-
}
47+
complete = complete && await SeedSubscription(evt, sub, toQueue, cancellationToken);
4848

49-
if (success)
50-
{
49+
if (complete)
5150
await _persistenceStore.MarkEventProcessed(itemId);
52-
}
51+
52+
foreach (var eventId in toQueue)
53+
await QueueProvider.QueueWork(eventId, QueueType.Event);
5354
}
5455
}
5556
finally
@@ -58,8 +59,24 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5859
}
5960
}
6061

61-
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
62-
{
62+
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, List<string> toQueue, CancellationToken cancellationToken)
63+
{
64+
foreach (var eventId in await _persistenceStore.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf))
65+
{
66+
if (eventId == evt.Id)
67+
continue;
68+
69+
var siblingEvent = await _persistenceStore.GetEvent(eventId);
70+
if ((!siblingEvent.IsProcessed) && (siblingEvent.EventTime < evt.EventTime))
71+
{
72+
await QueueProvider.QueueWork(eventId, QueueType.Event);
73+
return false;
74+
}
75+
76+
if (!siblingEvent.IsProcessed)
77+
toQueue.Add(siblingEvent.Id);
78+
}
79+
6380
if (!await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
6481
{
6582
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using WorkflowCore.Interface;
5+
using WorkflowCore.Models;
6+
using Xunit;
7+
using FluentAssertions;
8+
using System.Linq;
9+
using WorkflowCore.Testing;
10+
11+
namespace WorkflowCore.IntegrationTests.Scenarios
12+
{
13+
public class EventOrderScenario : WorkflowTest<EventOrderScenario.EventWorkflow, EventOrderScenario.MyDataClass>
14+
{
15+
public class MyDataClass
16+
{
17+
public int Value1 { get; set; }
18+
public int Value2 { get; set; }
19+
public int Value3 { get; set; }
20+
public int Value4 { get; set; }
21+
public int Value5 { get; set; }
22+
}
23+
24+
public class EventWorkflow : IWorkflow<MyDataClass>
25+
{
26+
public string Id => "EventOrder";
27+
public int Version => 1;
28+
public void Build(IWorkflowBuilder<MyDataClass> builder)
29+
{
30+
builder
31+
.StartWith(context => ExecutionResult.Next())
32+
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 1, 0))
33+
.Output(data => data.Value1, step => step.EventData)
34+
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 2, 0))
35+
.Output(data => data.Value2, step => step.EventData)
36+
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 3, 0))
37+
.Output(data => data.Value3, step => step.EventData)
38+
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 4, 0))
39+
.Output(data => data.Value4, step => step.EventData)
40+
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 5, 0))
41+
.Output(data => data.Value5, step => step.EventData);
42+
}
43+
}
44+
45+
public EventOrderScenario()
46+
{
47+
Setup();
48+
}
49+
50+
[Fact]
51+
public void Scenario()
52+
{
53+
Host.PublishEvent("OrderedEvent", string.Empty, 1, new DateTime(2000, 1, 1, 0, 1, 1));
54+
Host.PublishEvent("OrderedEvent", string.Empty, 2, new DateTime(2000, 1, 1, 0, 2, 1));
55+
Host.PublishEvent("OrderedEvent", string.Empty, 3, new DateTime(2000, 1, 1, 0, 3, 1));
56+
Host.PublishEvent("OrderedEvent", string.Empty, 4, new DateTime(2000, 1, 1, 0, 4, 1));
57+
Host.PublishEvent("OrderedEvent", string.Empty, 5, new DateTime(2000, 1, 1, 0, 5, 1));
58+
59+
var workflowId = StartWorkflow(new MyDataClass());
60+
61+
WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));
62+
63+
GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
64+
UnhandledStepErrors.Count.Should().Be(0);
65+
GetData(workflowId).Value1.Should().Be(1);
66+
GetData(workflowId).Value2.Should().Be(2);
67+
GetData(workflowId).Value3.Should().Be(3);
68+
GetData(workflowId).Value4.Should().Be(4);
69+
GetData(workflowId).Value5.Should().Be(5);
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)