Skip to content

Commit 583a250

Browse files
committed
scheduled commands
1 parent 5fbaa59 commit 583a250

File tree

19 files changed

+381
-10
lines changed

19 files changed

+381
-10
lines changed

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace WorkflowCore.Interface
88
{
9-
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
9+
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
1010
{
1111

1212
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using WorkflowCore.Models;
7+
8+
namespace WorkflowCore.Interface
9+
{
10+
public interface IScheduledCommandRepository
11+
{
12+
bool SupportsScheduledCommands { get; }
13+
14+
Task ScheduleCommand(ScheduledCommand command);
15+
16+
Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default);
17+
}
18+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Models
6+
{
7+
public class ScheduledCommand
8+
{
9+
public const string ProcessWorkflow = "ProcessWorkflow";
10+
public const string ProcessEvent = "ProcessEvent";
11+
12+
public string CommandName { get; set; }
13+
public string Data { get; set; }
14+
public long ExecuteTime { get; set; }
15+
}
16+
}

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Linq;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using Microsoft.Extensions.Logging;
56
using WorkflowCore.Interface;
67
using WorkflowCore.Models;
@@ -48,17 +49,34 @@ public void Stop()
4849
/// Poll the persistence store for stashed unpublished events
4950
/// </summary>
5051
private async void PollRunnables(object target)
52+
{
53+
await PollWorkflows();
54+
await PollEvents();
55+
await PollCommands();
56+
}
57+
58+
private async Task PollWorkflows()
5159
{
5260
try
5361
{
5462
if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken()))
5563
{
5664
try
5765
{
58-
_logger.LogInformation("Polling for runnable workflows");
66+
_logger.LogInformation("Polling for runnable workflows");
5967
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
6068
foreach (var item in runnables)
6169
{
70+
if (_persistenceStore.SupportsScheduledCommands)
71+
{
72+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
73+
{
74+
CommandName = ScheduledCommand.ProcessWorkflow,
75+
Data = item,
76+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
77+
});
78+
continue;
79+
}
6280
if (_greylist.Contains($"wf:{item}"))
6381
{
6482
_logger.LogDebug($"Got greylisted workflow {item}");
@@ -79,17 +97,30 @@ private async void PollRunnables(object target)
7997
{
8098
_logger.LogError(ex, ex.Message);
8199
}
100+
}
82101

102+
private async Task PollEvents()
103+
{
83104
try
84105
{
85106
if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken()))
86107
{
87108
try
88109
{
89-
_logger.LogInformation("Polling for unprocessed events");
110+
_logger.LogInformation("Polling for unprocessed events");
90111
var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now);
91112
foreach (var item in events.ToList())
92113
{
114+
if (_persistenceStore.SupportsScheduledCommands)
115+
{
116+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
117+
{
118+
CommandName = ScheduledCommand.ProcessEvent,
119+
Data = item,
120+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
121+
});
122+
continue;
123+
}
93124
if (_greylist.Contains($"evt:{item}"))
94125
{
95126
_logger.LogDebug($"Got greylisted event {item}");
@@ -111,5 +142,39 @@ private async void PollRunnables(object target)
111142
_logger.LogError(ex, ex.Message);
112143
}
113144
}
145+
146+
private async Task PollCommands()
147+
{
148+
try
149+
{
150+
if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken()))
151+
{
152+
try
153+
{
154+
_logger.LogInformation("Polling for scheduled commands");
155+
await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) =>
156+
{
157+
switch (command.CommandName)
158+
{
159+
case ScheduledCommand.ProcessWorkflow:
160+
await _queueProvider.QueueWork(command.Data, QueueType.Workflow);
161+
break;
162+
case ScheduledCommand.ProcessEvent:
163+
await _queueProvider.QueueWork(command.Data, QueueType.Event);
164+
break;
165+
}
166+
});
167+
}
168+
finally
169+
{
170+
await _lockProvider.ReleaseLock("poll-commands");
171+
}
172+
}
173+
}
174+
catch (Exception ex)
175+
{
176+
_logger.LogError(ex, ex.Message);
177+
}
178+
}
114179
}
115180
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6969

7070
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
7171

72-
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
73-
74-
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
72+
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7573
{
76-
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
74+
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
75+
if (workflow.NextExecution.Value < readAheadTicks)
76+
{
77+
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
78+
}
79+
else
80+
{
81+
if (_persistenceStore.SupportsScheduledCommands)
82+
{
83+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
84+
{
85+
CommandName = ScheduledCommand.ProcessWorkflow,
86+
Data = workflow.Id,
87+
ExecuteTime = workflow.NextExecution.Value
88+
});
89+
}
90+
}
7791
}
7892
}
7993
}

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider
2424
private readonly List<Event> _events = new List<Event>();
2525
private readonly List<ExecutionError> _errors = new List<ExecutionError>();
2626

27+
public bool SupportsScheduledCommands => false;
28+
2729
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
2830
{
2931
lock (_instances)
@@ -255,6 +257,16 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors, Cancellation
255257
_errors.AddRange(errors);
256258
}
257259
}
260+
261+
public Task ScheduleCommand(ScheduledCommand command)
262+
{
263+
throw new NotImplementedException();
264+
}
265+
266+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
267+
{
268+
throw new NotImplementedException();
269+
}
258270
}
259271

260272
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class TransientMemoryPersistenceProvider : IPersistenceProvider
1111
{
1212
private readonly ISingletonMemoryProvider _innerService;
1313

14+
public bool SupportsScheduledCommands => false;
15+
1416
public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
1517
{
1618
_innerService = innerService;
@@ -56,5 +58,15 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
5658
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
5759

5860
public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
61+
62+
public Task ScheduleCommand(ScheduledCommand command)
63+
{
64+
throw new NotImplementedException();
65+
}
66+
67+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
68+
{
69+
throw new NotImplementedException();
70+
}
5971
}
6072
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.7.0" />
2727
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
2828
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
29+
<InternalsVisibleTo Include="WorkflowCore.IntegrationTests" />
2930
</ItemGroup>
3031

3132
<ItemGroup>

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class EntityFrameworkPersistenceProvider : IPersistenceProvider
1717
private readonly bool _canMigrateDB;
1818
private readonly IWorkflowDbContextFactory _contextFactory;
1919

20+
public bool SupportsScheduledCommands => false;
21+
2022
public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFactory, bool canCreateDB, bool canMigrateDB)
2123
{
2224
_contextFactory = contextFactory;
@@ -365,5 +367,15 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
365367
await db.SaveChangesAsync(cancellationToken);
366368
}
367369
}
370+
371+
public Task ScheduleCommand(ScheduledCommand command)
372+
{
373+
throw new NotImplementedException();
374+
}
375+
376+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
377+
{
378+
throw new NotImplementedException();
379+
}
368380
}
369381
}

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ static MongoPersistenceProvider()
8383
BsonClassMap.RegisterClassMap<ControlPersistenceData>(x => x.AutoMap());
8484
BsonClassMap.RegisterClassMap<SchedulePersistenceData>(x => x.AutoMap());
8585
BsonClassMap.RegisterClassMap<IteratorPersistenceData>(x => x.AutoMap());
86+
BsonClassMap.RegisterClassMap<ScheduledCommand>(x => x.AutoMap())
87+
.SetIgnoreExtraElements(true);
8688
}
8789

8890
static bool indexesCreated = false;
@@ -111,6 +113,17 @@ static void CreateIndexes(MongoPersistenceProvider instance)
111113
.Ascending(x => x.EventKey),
112114
new CreateIndexOptions { Background = true, Name = "idx_namekey" }));
113115

116+
instance.ScheduledCommands.Indexes.CreateOne(new CreateIndexModel<ScheduledCommand>(
117+
Builders<ScheduledCommand>.IndexKeys
118+
.Descending(x => x.ExecuteTime),
119+
new CreateIndexOptions { Background = true, Name = "idx_exectime" }));
120+
121+
instance.ScheduledCommands.Indexes.CreateOne(new CreateIndexModel<ScheduledCommand>(
122+
Builders<ScheduledCommand>.IndexKeys
123+
.Ascending(x => x.CommandName)
124+
.Ascending(x => x.Data),
125+
new CreateIndexOptions { Background = true, Unique = true, Name = "idx_key" }));
126+
114127
indexesCreated = true;
115128
}
116129
}
@@ -123,6 +136,8 @@ static void CreateIndexes(MongoPersistenceProvider instance)
123136

124137
private IMongoCollection<ExecutionError> ExecutionErrors => _database.GetCollection<ExecutionError>("wfc.execution_errors");
125138

139+
private IMongoCollection<ScheduledCommand> ScheduledCommands => _database.GetCollection<ScheduledCommand>("wfc.scheduled_commands");
140+
126141
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken cancellationToken = default)
127142
{
128143
await WorkflowInstances.InsertOneAsync(workflow, cancellationToken: cancellationToken);
@@ -291,5 +306,41 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors, Cancellation
291306
if (errors.Any())
292307
await ExecutionErrors.InsertManyAsync(errors, cancellationToken: cancellationToken);
293308
}
309+
310+
public bool SupportsScheduledCommands => true;
311+
312+
public async Task ScheduleCommand(ScheduledCommand command)
313+
{
314+
try
315+
{
316+
await ScheduledCommands.InsertOneAsync(command);
317+
}
318+
catch (MongoBulkWriteException ex)
319+
{
320+
if (ex.WriteErrors.All(x => x.Category == ServerErrorCategory.DuplicateKey))
321+
return;
322+
throw;
323+
}
324+
}
325+
326+
public async Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
327+
{
328+
var cursor = await ScheduledCommands.FindAsync(x => x.ExecuteTime < asOf.UtcDateTime.Ticks);
329+
while (await cursor.MoveNextAsync(cancellationToken))
330+
{
331+
foreach (var command in cursor.Current)
332+
{
333+
try
334+
{
335+
await action(command);
336+
await ScheduledCommands.DeleteOneAsync(x => x.CommandName == command.CommandName && x.Data == command.Data);
337+
}
338+
catch (Exception)
339+
{
340+
//TODO: add logger
341+
}
342+
}
343+
}
344+
}
294345
}
295346
}

src/providers/WorkflowCore.Persistence.RavenDB/Services/RavendbPersistenceProvider.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ public class RavendbPersistenceProvider : IPersistenceProvider
1818
private readonly IDocumentStore _database;
1919
static bool indexesCreated = false;
2020

21-
public RavendbPersistenceProvider(IDocumentStore database)
21+
public bool SupportsScheduledCommands => false;
22+
23+
public RavendbPersistenceProvider(IDocumentStore database)
2224
{
2325
_database = database;
2426
CreateIndexes(this);
@@ -316,5 +318,14 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors, Cancellation
316318
}
317319
}
318320

319-
}
321+
public Task ScheduleCommand(ScheduledCommand command)
322+
{
323+
throw new NotImplementedException();
324+
}
325+
326+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
327+
{
328+
throw new NotImplementedException();
329+
}
330+
}
320331
}

0 commit comments

Comments
 (0)