Skip to content

Commit b45525a

Browse files
authored
Merge pull request danielgerlag#936 from danielgerlag/scheduled-commands
Scheduled commands
2 parents 5fbaa59 + eb28727 commit b45525a

File tree

41 files changed

+2314
-130
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+2314
-130
lines changed

ReleaseNotes/3.6.0.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Workflow Core 3.6.0
2+
3+
## Scheduled Commands
4+
5+
Introduces the ability to schedule delayed commands to process a workflow or event, by persisting them to storage.
6+
This is the first step toward removing constant polling of the DB. It also filters out duplicate work items on the queue which is the current problem the greylist tries to solve.
7+
Initial implementation is supported by MongoDb, SQL Server, PostgeSQL, MySQL and SQLite.
8+
Additional support from the other persistence providers to follow.

WorkflowCore.sln

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
104104
ReleaseNotes\3.0.0.md = ReleaseNotes\3.0.0.md
105105
ReleaseNotes\3.1.0.md = ReleaseNotes\3.1.0.md
106106
ReleaseNotes\3.3.0.md = ReleaseNotes\3.3.0.md
107+
ReleaseNotes\3.4.0.md = ReleaseNotes\3.4.0.md
108+
ReleaseNotes\3.6.0.md = ReleaseNotes\3.6.0.md
107109
EndProjectSection
108110
EndProject
109111
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.5.7</Version>
8-
<AssemblyVersion>3.5.7.0</AssemblyVersion>
9-
<FileVersion>3.5.7.0</FileVersion>
7+
<Version>3.6.0</Version>
8+
<AssemblyVersion>3.6.0.0</AssemblyVersion>
9+
<FileVersion>3.6.0.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.5.7</PackageVersion>
11+
<PackageVersion>3.6.0</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace WorkflowCore.Interface
88
{
9-
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
9+
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
1010
{
1111

1212
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using WorkflowCore.Models;
7+
8+
namespace WorkflowCore.Interface
9+
{
10+
public interface IScheduledCommandRepository
11+
{
12+
bool SupportsScheduledCommands { get; }
13+
14+
Task ScheduleCommand(ScheduledCommand command);
15+
16+
Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default);
17+
}
18+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Models
6+
{
7+
public class ScheduledCommand
8+
{
9+
public const string ProcessWorkflow = "ProcessWorkflow";
10+
public const string ProcessEvent = "ProcessEvent";
11+
12+
public string CommandName { get; set; }
13+
public string Data { get; set; }
14+
public long ExecuteTime { get; set; }
15+
}
16+
}

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Linq;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using Microsoft.Extensions.Logging;
56
using WorkflowCore.Interface;
67
using WorkflowCore.Models;
@@ -48,17 +49,41 @@ public void Stop()
4849
/// Poll the persistence store for stashed unpublished events
4950
/// </summary>
5051
private async void PollRunnables(object target)
52+
{
53+
await PollWorkflows();
54+
await PollEvents();
55+
await PollCommands();
56+
}
57+
58+
private async Task PollWorkflows()
5159
{
5260
try
5361
{
5462
if (await _lockProvider.AcquireLock("poll runnables", new CancellationToken()))
5563
{
5664
try
5765
{
58-
_logger.LogInformation("Polling for runnable workflows");
66+
_logger.LogInformation("Polling for runnable workflows");
5967
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
6068
foreach (var item in runnables)
6169
{
70+
if (_persistenceStore.SupportsScheduledCommands)
71+
{
72+
try
73+
{
74+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
75+
{
76+
CommandName = ScheduledCommand.ProcessWorkflow,
77+
Data = item,
78+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
79+
});
80+
continue;
81+
}
82+
catch (Exception ex)
83+
{
84+
_logger.LogError(ex, ex.Message);
85+
}
86+
}
6287
if (_greylist.Contains($"wf:{item}"))
6388
{
6489
_logger.LogDebug($"Got greylisted workflow {item}");
@@ -79,17 +104,37 @@ private async void PollRunnables(object target)
79104
{
80105
_logger.LogError(ex, ex.Message);
81106
}
107+
}
82108

109+
private async Task PollEvents()
110+
{
83111
try
84112
{
85113
if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken()))
86114
{
87115
try
88116
{
89-
_logger.LogInformation("Polling for unprocessed events");
117+
_logger.LogInformation("Polling for unprocessed events");
90118
var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now);
91119
foreach (var item in events.ToList())
92120
{
121+
if (_persistenceStore.SupportsScheduledCommands)
122+
{
123+
try
124+
{
125+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
126+
{
127+
CommandName = ScheduledCommand.ProcessEvent,
128+
Data = item,
129+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
130+
});
131+
continue;
132+
}
133+
catch (Exception ex)
134+
{
135+
_logger.LogError(ex, ex.Message);
136+
}
137+
}
93138
if (_greylist.Contains($"evt:{item}"))
94139
{
95140
_logger.LogDebug($"Got greylisted event {item}");
@@ -111,5 +156,39 @@ private async void PollRunnables(object target)
111156
_logger.LogError(ex, ex.Message);
112157
}
113158
}
159+
160+
private async Task PollCommands()
161+
{
162+
try
163+
{
164+
if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken()))
165+
{
166+
try
167+
{
168+
_logger.LogInformation("Polling for scheduled commands");
169+
await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) =>
170+
{
171+
switch (command.CommandName)
172+
{
173+
case ScheduledCommand.ProcessWorkflow:
174+
await _queueProvider.QueueWork(command.Data, QueueType.Workflow);
175+
break;
176+
case ScheduledCommand.ProcessEvent:
177+
await _queueProvider.QueueWork(command.Data, QueueType.Event);
178+
break;
179+
}
180+
});
181+
}
182+
finally
183+
{
184+
await _lockProvider.ReleaseLock("poll-commands");
185+
}
186+
}
187+
}
188+
catch (Exception ex)
189+
{
190+
_logger.LogError(ex, ex.Message);
191+
}
192+
}
114193
}
115194
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6969

7070
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
7171

72-
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
73-
74-
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
72+
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7573
{
76-
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
74+
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
75+
if (workflow.NextExecution.Value < readAheadTicks)
76+
{
77+
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
78+
}
79+
else
80+
{
81+
if (_persistenceStore.SupportsScheduledCommands)
82+
{
83+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
84+
{
85+
CommandName = ScheduledCommand.ProcessWorkflow,
86+
Data = workflow.Id,
87+
ExecuteTime = workflow.NextExecution.Value
88+
});
89+
}
90+
}
7791
}
7892
}
7993
}

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public class MemoryPersistenceProvider : ISingletonMemoryProvider
2424
private readonly List<Event> _events = new List<Event>();
2525
private readonly List<ExecutionError> _errors = new List<ExecutionError>();
2626

27+
public bool SupportsScheduledCommands => false;
28+
2729
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow, CancellationToken _ = default)
2830
{
2931
lock (_instances)
@@ -255,6 +257,16 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors, Cancellation
255257
_errors.AddRange(errors);
256258
}
257259
}
260+
261+
public Task ScheduleCommand(ScheduledCommand command)
262+
{
263+
throw new NotImplementedException();
264+
}
265+
266+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
267+
{
268+
throw new NotImplementedException();
269+
}
258270
}
259271

260272
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ public class TransientMemoryPersistenceProvider : IPersistenceProvider
1111
{
1212
private readonly ISingletonMemoryProvider _innerService;
1313

14+
public bool SupportsScheduledCommands => false;
15+
1416
public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
1517
{
1618
_innerService = innerService;
@@ -56,5 +58,15 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
5658
public Task<bool> SetSubscriptionToken(string eventSubscriptionId, string token, string workerId, DateTime expiry, CancellationToken _ = default) => _innerService.SetSubscriptionToken(eventSubscriptionId, token, workerId, expiry);
5759

5860
public Task ClearSubscriptionToken(string eventSubscriptionId, string token, CancellationToken _ = default) => _innerService.ClearSubscriptionToken(eventSubscriptionId, token);
61+
62+
public Task ScheduleCommand(ScheduledCommand command)
63+
{
64+
throw new NotImplementedException();
65+
}
66+
67+
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
68+
{
69+
throw new NotImplementedException();
70+
}
5971
}
6072
}

src/WorkflowCore/WorkflowCore.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<PackageReference Include="System.Reflection.TypeExtensions" Version="4.7.0" />
2727
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
2828
<PackageReference Include="System.Linq.Queryable" Version="4.3.0" />
29+
<InternalsVisibleTo Include="WorkflowCore.IntegrationTests" />
2930
</ItemGroup>
3031

3132
<ItemGroup>

src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ internal static PersistedEvent ToPersistable(this Event instance)
123123
return result;
124124
}
125125

126+
internal static PersistedScheduledCommand ToPersistable(this ScheduledCommand instance)
127+
{
128+
var result = new PersistedScheduledCommand();
129+
result.CommandName = instance.CommandName;
130+
result.Data = instance.Data;
131+
result.ExecuteTime = instance.ExecuteTime;
132+
133+
return result;
134+
}
135+
126136
internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow instance)
127137
{
128138
WorkflowInstance result = new WorkflowInstance();
@@ -219,5 +229,15 @@ internal static Event ToEvent(this PersistedEvent instance)
219229

220230
return result;
221231
}
232+
233+
internal static ScheduledCommand ToScheduledCommand(this PersistedScheduledCommand instance)
234+
{
235+
var result = new ScheduledCommand();
236+
result.CommandName = instance.CommandName;
237+
result.Data = instance.Data;
238+
result.ExecuteTime = instance.ExecuteTime;
239+
240+
return result;
241+
}
222242
}
223243
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.ComponentModel.DataAnnotations;
3+
using System.Linq;
4+
5+
namespace WorkflowCore.Persistence.EntityFramework.Models
6+
{
7+
public class PersistedScheduledCommand
8+
{
9+
[Key]
10+
public long PersistenceId { get; set; }
11+
12+
[MaxLength(200)]
13+
public string CommandName { get; set; }
14+
15+
[MaxLength(500)]
16+
public string Data { get; set; }
17+
18+
public long ExecuteTime { get; set; }
19+
}
20+
}

0 commit comments

Comments
 (0)