Skip to content

Commit 5819629

Browse files
authored
Async index (danielgerlag#272)
1 parent 7177e4f commit 5819629

File tree

23 files changed

+249
-94
lines changed

23 files changed

+249
-94
lines changed

ReleaseNotes/1.9.0.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# Workflow Core 1.9.0
2+
3+
* Indexing is now asynchronous with some simple retry mechanisms to deal with transient errors
4+
* Removed global static state from `MemoryPersistenceProvider`
5+
* Removed dependency on data flow library
6+
* Optimized background queue consumers (workflow, event and index) dispatching task

WorkflowCore.sln

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
9797
ReleaseNotes\1.7.0.md = ReleaseNotes\1.7.0.md
9898
ReleaseNotes\1.8.0.md = ReleaseNotes\1.8.0.md
9999
ReleaseNotes\1.8.1.md = ReleaseNotes\1.8.1.md
100+
ReleaseNotes\1.9.0.md = ReleaseNotes\1.9.0.md
100101
EndProjectSection
101102
EndProject
102103
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"

src/WorkflowCore/Models/WorkflowOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@ public WorkflowOptions(IServiceCollection services)
2424
Services = services;
2525
PollInterval = TimeSpan.FromSeconds(10);
2626
IdleTime = TimeSpan.FromMilliseconds(100);
27-
ErrorRetryInterval = TimeSpan.FromSeconds(60);
27+
ErrorRetryInterval = TimeSpan.FromSeconds(60);
2828

2929
QueueFactory = new Func<IServiceProvider, IQueueProvider>(sp => new SingleNodeQueueProvider());
3030
LockFactory = new Func<IServiceProvider, IDistributedLockProvider>(sp => new SingleNodeLockProvider());
31-
PersistanceFactory = new Func<IServiceProvider, IPersistenceProvider>(sp => new MemoryPersistenceProvider());
31+
PersistanceFactory = new Func<IServiceProvider, IPersistenceProvider>(sp => new TransientMemoryPersistenceProvider(sp.GetService<ISingletonMemoryProvider>()));
3232
SearchIndexFactory = new Func<IServiceProvider, ISearchIndex>(sp => new NullSearchIndex());
3333
EventHubFactory = new Func<IServiceProvider, ILifeCycleEventHub>(sp => new SingleNodeEventHub(sp.GetService<ILoggerFactory>()));
3434
}

src/WorkflowCore/ServiceCollectionExtensions.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
2424

2525
var options = new WorkflowOptions(services);
2626
setupAction?.Invoke(options);
27+
services.AddSingleton<ISingletonMemoryProvider, MemoryPersistenceProvider>();
2728
services.AddTransient<IPersistenceProvider>(options.PersistanceFactory);
2829
services.AddSingleton<IQueueProvider>(options.QueueFactory);
2930
services.AddSingleton<IDistributedLockProvider>(options.LockFactory);
@@ -36,6 +37,7 @@ public static IServiceCollection AddWorkflow(this IServiceCollection services, A
3637

3738
services.AddTransient<IBackgroundTask, WorkflowConsumer>();
3839
services.AddTransient<IBackgroundTask, EventConsumer>();
40+
services.AddTransient<IBackgroundTask, IndexConsumer>();
3941
services.AddTransient<IBackgroundTask, RunnablePoller>();
4042
services.AddTransient<IBackgroundTask>(sp => sp.GetService<ILifeCycleEventPublisher>());
4143

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ internal class EventConsumer : QueueConsumer, IBackgroundTask
1313
private readonly IPersistenceProvider _persistenceStore;
1414
private readonly IDistributedLockProvider _lockProvider;
1515
private readonly IDateTimeProvider _datetimeProvider;
16-
16+
protected override int MaxConcurrentItems => 2;
1717
protected override QueueType Queue => QueueType.Event;
1818

1919
public EventConsumer(IPersistenceProvider persistenceStore, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, WorkflowOptions options, IDateTimeProvider datetimeProvider)
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
using Microsoft.Extensions.Logging;
6+
using Microsoft.Extensions.ObjectPool;
7+
using WorkflowCore.Interface;
8+
using WorkflowCore.Models;
9+
10+
namespace WorkflowCore.Services.BackgroundTasks
11+
{
12+
internal class IndexConsumer : QueueConsumer, IBackgroundTask
13+
{
14+
private readonly ISearchIndex _searchIndex;
15+
private readonly ObjectPool<IPersistenceProvider> _persistenceStorePool;
16+
private readonly ILogger _logger;
17+
private readonly Dictionary<string, int> _errorCounts = new Dictionary<string, int>();
18+
19+
protected override QueueType Queue => QueueType.Index;
20+
protected override bool EnableSecondPasses => true;
21+
22+
public IndexConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, ISearchIndex searchIndex, WorkflowOptions options)
23+
: base(queueProvider, loggerFactory, options)
24+
{
25+
_persistenceStorePool = new DefaultObjectPool<IPersistenceProvider>(persistencePoolPolicy);
26+
_searchIndex = searchIndex;
27+
_logger = loggerFactory.CreateLogger(GetType());
28+
}
29+
30+
protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
31+
{
32+
try
33+
{
34+
var workflow = await FetchWorkflow(itemId);
35+
await _searchIndex.IndexWorkflow(workflow);
36+
lock (_errorCounts)
37+
{
38+
_errorCounts.Remove(itemId);
39+
}
40+
}
41+
catch (Exception e)
42+
{
43+
Logger.LogWarning(default(EventId), $"Error indexing workfow - {itemId} - {e.Message}");
44+
var errCount = 0;
45+
lock (_errorCounts)
46+
{
47+
if (!_errorCounts.ContainsKey(itemId))
48+
_errorCounts.Add(itemId, 0);
49+
50+
_errorCounts[itemId]++;
51+
errCount = _errorCounts[itemId];
52+
}
53+
54+
if (errCount < 5)
55+
{
56+
await QueueProvider.QueueWork(itemId, Queue);
57+
return;
58+
}
59+
if (errCount < 20)
60+
{
61+
await Task.Delay(TimeSpan.FromSeconds(10));
62+
await QueueProvider.QueueWork(itemId, Queue);
63+
return;
64+
}
65+
66+
lock (_errorCounts)
67+
{
68+
_errorCounts.Remove(itemId);
69+
}
70+
71+
Logger.LogError(default(EventId), $"Unable to index workfow - {itemId} - {e.Message}");
72+
}
73+
}
74+
75+
private async Task<WorkflowInstance> FetchWorkflow(string id)
76+
{
77+
var store = _persistenceStorePool.Get();
78+
try
79+
{
80+
return await store.GetWorkflowInstance(id);
81+
}
82+
finally
83+
{
84+
_persistenceStorePool.Return(store);
85+
}
86+
}
87+
88+
}
89+
}

src/WorkflowCore/Services/BackgroundTasks/QueueConsumer.cs

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
4-
using System.Threading.Tasks.Dataflow;
55
using Microsoft.Extensions.Logging;
66
using WorkflowCore.Interface;
77
using WorkflowCore.Models;
@@ -12,6 +12,7 @@ internal abstract class QueueConsumer : IBackgroundTask
1212
{
1313
protected abstract QueueType Queue { get; }
1414
protected virtual int MaxConcurrentItems => Math.Max(Environment.ProcessorCount, 2);
15+
protected virtual bool EnableSecondPasses => false;
1516

1617
protected readonly IQueueProvider QueueProvider;
1718
protected readonly ILogger Logger;
@@ -37,7 +38,7 @@ public virtual void Start()
3738

3839
_cancellationTokenSource = new CancellationTokenSource();
3940

40-
DispatchTask = new Task(Execute);
41+
DispatchTask = new Task(Execute, TaskCreationOptions.LongRunning);
4142
DispatchTask.Start();
4243
}
4344

@@ -51,20 +52,16 @@ public virtual void Stop()
5152
private async void Execute()
5253
{
5354
var cancelToken = _cancellationTokenSource.Token;
54-
var opts = new ExecutionDataflowBlockOptions()
55-
{
56-
MaxDegreeOfParallelism = MaxConcurrentItems,
57-
BoundedCapacity = MaxConcurrentItems + 1
58-
};
59-
60-
var actionBlock = new ActionBlock<string>(ExecuteItem, opts);
55+
var activeTasks = new Dictionary<string, Task>();
56+
var secondPasses = new HashSet<string>();
6157

6258
while (!cancelToken.IsCancellationRequested)
6359
{
6460
try
6561
{
66-
if (!SpinWait.SpinUntil(() => actionBlock.InputCount == 0, Options.IdleTime))
62+
if (activeTasks.Count >= MaxConcurrentItems)
6763
{
64+
await Task.Delay(Options.IdleTime);
6865
continue;
6966
}
7067

@@ -76,11 +73,40 @@ private async void Execute()
7673
await Task.Delay(Options.IdleTime, cancelToken);
7774
continue;
7875
}
76+
77+
if (activeTasks.ContainsKey(item))
78+
{
79+
secondPasses.Add(item);
80+
continue;
81+
}
82+
83+
secondPasses.Remove(item);
7984

80-
if (!actionBlock.Post(item))
85+
var task = new Task(async (object data) =>
86+
{
87+
try
88+
{
89+
await ExecuteItem((string)data);
90+
while (EnableSecondPasses && secondPasses.Contains(item))
91+
{
92+
secondPasses.Remove(item);
93+
await ExecuteItem((string)data);
94+
}
95+
}
96+
finally
97+
{
98+
lock (activeTasks)
99+
{
100+
activeTasks.Remove((string)data);
101+
}
102+
}
103+
}, item);
104+
lock (activeTasks)
81105
{
82-
await QueueProvider.QueueWork(item, Queue);
106+
activeTasks.Add(item, task);
83107
}
108+
109+
task.Start();
84110
}
85111
catch (OperationCanceledException)
86112
{
@@ -91,8 +117,7 @@ private async void Execute()
91117
}
92118
}
93119

94-
actionBlock.Complete();
95-
await actionBlock.Completion;
120+
await Task.WhenAll(activeTasks.Values);
96121
}
97122

98123
private async Task ExecuteItem(string itemId)
@@ -107,7 +132,7 @@ private async Task ExecuteItem(string itemId)
107132
}
108133
catch (Exception ex)
109134
{
110-
Logger.LogError($"Error executing item {itemId} - {ex.Message}");
135+
Logger.LogError(default(EventId), ex, $"Error executing item {itemId} - {ex.Message}");
111136
}
112137
}
113138
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,17 @@ internal class WorkflowConsumer : QueueConsumer, IBackgroundTask
1212
{
1313
private readonly IDistributedLockProvider _lockProvider;
1414
private readonly IDateTimeProvider _datetimeProvider;
15-
private readonly ISearchIndex _searchIndex;
1615
private readonly ObjectPool<IPersistenceProvider> _persistenceStorePool;
1716
private readonly ObjectPool<IWorkflowExecutor> _executorPool;
1817

1918
protected override QueueType Queue => QueueType.Workflow;
2019

21-
public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IPooledObjectPolicy<IWorkflowExecutor> executorPoolPolicy, IDateTimeProvider datetimeProvider, ISearchIndex searchIndex, WorkflowOptions options)
20+
public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoolPolicy, IQueueProvider queueProvider, ILoggerFactory loggerFactory, IServiceProvider serviceProvider, IWorkflowRegistry registry, IDistributedLockProvider lockProvider, IPooledObjectPolicy<IWorkflowExecutor> executorPoolPolicy, IDateTimeProvider datetimeProvider, WorkflowOptions options)
2221
: base(queueProvider, loggerFactory, options)
2322
{
2423
_persistenceStorePool = new DefaultObjectPool<IPersistenceProvider>(persistencePoolPolicy);
2524
_executorPool = new DefaultObjectPool<IWorkflowExecutor>(executorPoolPolicy);
2625
_lockProvider = lockProvider;
27-
_searchIndex = searchIndex;
2826
_datetimeProvider = datetimeProvider;
2927
}
3028

@@ -52,7 +50,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5250
{
5351
_executorPool.Return(executor);
5452
await persistenceStore.PersistWorkflow(workflow);
55-
await _searchIndex.IndexWorkflow(workflow);
53+
await QueueProvider.QueueWork(itemId, QueueType.Index);
5654
}
5755
}
5856
}

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,34 +8,22 @@
88

99
namespace WorkflowCore.Services
1010
{
11+
12+
public interface ISingletonMemoryProvider : IPersistenceProvider
13+
{
14+
}
1115
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
1216

1317
/// <summary>
1418
/// In-memory implementation of IPersistenceProvider for demo and testing purposes
1519
/// </summary>
16-
public class MemoryPersistenceProvider : IPersistenceProvider
17-
{
18-
private static readonly ConcurrentDictionary<string, Tuple<List<WorkflowInstance>, List<EventSubscription>, List<Event>, List<ExecutionError>>> Environments =
19-
new ConcurrentDictionary<string, Tuple<List<WorkflowInstance>, List<EventSubscription>, List<Event>, List<ExecutionError>>>();
20-
private readonly List<WorkflowInstance> _instances;
21-
private readonly List<EventSubscription> _subscriptions;
22-
private readonly List<Event> _events;
23-
private readonly List<ExecutionError> _errors;
24-
25-
public MemoryPersistenceProvider()
26-
: this("")
27-
{
28-
}
29-
30-
public MemoryPersistenceProvider(string environmentKey)
31-
{
32-
var environment = Environments.GetOrAdd(environmentKey, _ => Tuple.Create(new List<WorkflowInstance>(), new List<EventSubscription>(), new List<Event>(), new List<ExecutionError>()));
33-
_instances = environment.Item1;
34-
_subscriptions = environment.Item2;
35-
_events = environment.Item3;
36-
_errors = environment.Item4;
37-
}
38-
20+
public class MemoryPersistenceProvider : ISingletonMemoryProvider
21+
{
22+
private readonly List<WorkflowInstance> _instances = new List<WorkflowInstance>();
23+
private readonly List<EventSubscription> _subscriptions = new List<EventSubscription>();
24+
private readonly List<Event> _events = new List<Event>();
25+
private readonly List<ExecutionError> _errors = new List<ExecutionError>();
26+
3927
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
4028
{
4129
lock (_instances)

src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@ public class SingleNodeQueueProvider : IQueueProvider
2121
[QueueType.Index] = new BlockingCollection<string>()
2222
};
2323

24-
public bool IsDequeueBlocking => false;
24+
public bool IsDequeueBlocking => true;
2525

26-
public async Task QueueWork(string id, QueueType queue)
26+
public Task QueueWork(string id, QueueType queue)
2727
{
2828
_queues[queue].Add(id);
29+
return Task.CompletedTask;
2930
}
3031

31-
public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
32-
{
33-
if (_queues[queue].TryTake(out string id))
34-
return id;
32+
public Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
33+
{
34+
if (_queues[queue].TryTake(out string id, 100, cancellationToken))
35+
return Task.FromResult(id);
3536

36-
return null;
37+
return Task.FromResult<string>(null);
3738
}
3839

3940
public Task Start()

0 commit comments

Comments
 (0)