Skip to content

Event order #300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -39,17 +40,17 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
{
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
var success = true;
var toQueue = new List<string>();
var complete = true;

foreach (var sub in subs.ToList())
{
success = success && await SeedSubscription(evt, sub, cancellationToken);
}
complete = complete && await SeedSubscription(evt, sub, toQueue, cancellationToken);

if (success)
{
if (complete)
await _persistenceStore.MarkEventProcessed(itemId);
}

foreach (var eventId in toQueue)
await QueueProvider.QueueWork(eventId, QueueType.Event);
}
}
finally
Expand All @@ -58,8 +59,24 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
}
}

private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
{
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, List<string> toQueue, CancellationToken cancellationToken)
{
foreach (var eventId in await _persistenceStore.GetEvents(sub.EventName, sub.EventKey, sub.SubscribeAsOf))
{
if (eventId == evt.Id)
continue;

var siblingEvent = await _persistenceStore.GetEvent(eventId);
if ((!siblingEvent.IsProcessed) && (siblingEvent.EventTime < evt.EventTime))
{
await QueueProvider.QueueWork(eventId, QueueType.Event);
return false;
}

if (!siblingEvent.IsProcessed)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you do this? It seems to me that future events will be queued by the runnable poller when their time will come... or am I missing something?

toQueue.Add(siblingEvent.Id);
}

if (!await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
{
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
using System;
using System.Collections.Generic;
using System.Text;
using WorkflowCore.Interface;
using WorkflowCore.Models;
using Xunit;
using FluentAssertions;
using System.Linq;
using WorkflowCore.Testing;

namespace WorkflowCore.IntegrationTests.Scenarios
{
public class EventOrderScenario : WorkflowTest<EventOrderScenario.EventWorkflow, EventOrderScenario.MyDataClass>
{
public class MyDataClass
{
public int Value1 { get; set; }
public int Value2 { get; set; }
public int Value3 { get; set; }
public int Value4 { get; set; }
public int Value5 { get; set; }
}

public class EventWorkflow : IWorkflow<MyDataClass>
{
public string Id => "EventOrder";
public int Version => 1;
public void Build(IWorkflowBuilder<MyDataClass> builder)
{
builder
.StartWith(context => ExecutionResult.Next())
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 1, 0))
.Output(data => data.Value1, step => step.EventData)
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 2, 0))
.Output(data => data.Value2, step => step.EventData)
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 3, 0))
.Output(data => data.Value3, step => step.EventData)
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 4, 0))
.Output(data => data.Value4, step => step.EventData)
.WaitFor("OrderedEvent", data => string.Empty, data => new DateTime(2000, 1, 1, 0, 5, 0))
.Output(data => data.Value5, step => step.EventData);
}
}

public EventOrderScenario()
{
Setup();
}

[Fact]
public void Scenario()
{
Host.PublishEvent("OrderedEvent", string.Empty, 1, new DateTime(2000, 1, 1, 0, 1, 1));
Host.PublishEvent("OrderedEvent", string.Empty, 2, new DateTime(2000, 1, 1, 0, 2, 1));
Host.PublishEvent("OrderedEvent", string.Empty, 3, new DateTime(2000, 1, 1, 0, 3, 1));
Host.PublishEvent("OrderedEvent", string.Empty, 4, new DateTime(2000, 1, 1, 0, 4, 1));
Host.PublishEvent("OrderedEvent", string.Empty, 5, new DateTime(2000, 1, 1, 0, 5, 1));

var workflowId = StartWorkflow(new MyDataClass());

WaitForWorkflowToComplete(workflowId, TimeSpan.FromSeconds(30));

GetStatus(workflowId).Should().Be(WorkflowStatus.Complete);
UnhandledStepErrors.Count.Should().Be(0);
GetData(workflowId).Value1.Should().Be(1);
GetData(workflowId).Value2.Should().Be(2);
GetData(workflowId).Value3.Should().Be(3);
GetData(workflowId).Value4.Should().Be(4);
GetData(workflowId).Value5.Should().Be(5);
}
}
}