Skip to content

Commit 1cd457d

Browse files
Merge branch 'master' into master
2 parents 2020d30 + a03137b commit 1cd457d

File tree

55 files changed

+2530
-159
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2530
-159
lines changed

ReleaseNotes/3.6.0.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Workflow Core 3.6.0
2+
3+
## Scheduled Commands
4+
5+
Introduces the ability to schedule delayed commands to process a workflow or event, by persisting them to storage.
6+
This is the first step toward removing constant polling of the DB. It also filters out duplicate work items on the queue which is the current problem the greylist tries to solve.
7+
Initial implementation is supported by MongoDb, SQL Server, PostgeSQL, MySQL and SQLite.
8+
Additional support from the other persistence providers to follow.

WorkflowCore.sln

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "ReleaseNotes", "ReleaseNote
104104
ReleaseNotes\3.0.0.md = ReleaseNotes\3.0.0.md
105105
ReleaseNotes\3.1.0.md = ReleaseNotes\3.1.0.md
106106
ReleaseNotes\3.3.0.md = ReleaseNotes\3.3.0.md
107+
ReleaseNotes\3.4.0.md = ReleaseNotes\3.4.0.md
108+
ReleaseNotes\3.6.0.md = ReleaseNotes\3.6.0.md
107109
EndProjectSection
108110
EndProject
109111
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample14", "src\samples\WorkflowCore.Sample14\WorkflowCore.Sample14.csproj", "{6BC66637-B42A-4334-ADFB-DBEC9F29D293}"
@@ -150,7 +152,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Tests.QueuePro
150152
EndProject
151153
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Sample19", "src\samples\WorkflowCore.Sample19\WorkflowCore.Sample19.csproj", "{1223ED47-3E5E-4960-B70D-DFAF550F6666}"
152154
EndProject
153-
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowCore.Persistence.RavenDB", "src\providers\WorkflowCore.Persistence.RavenDB\WorkflowCore.Persistence.RavenDB.csproj", "{AF205715-C8B7-42EF-BF14-AFC9E7F27242}"
155+
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowCore.Persistence.RavenDB", "src\providers\WorkflowCore.Persistence.RavenDB\WorkflowCore.Persistence.RavenDB.csproj", "{AF205715-C8B7-42EF-BF14-AFC9E7F27242}"
154156
EndProject
155157
Global
156158
GlobalSection(SolutionConfigurationPlatforms) = preSolution

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.5.3</Version>
8-
<AssemblyVersion>3.5.4.0</AssemblyVersion>
9-
<FileVersion>3.5.4.0</FileVersion>
7+
<Version>3.6.1</Version>
8+
<AssemblyVersion>3.6.1.0</AssemblyVersion>
9+
<FileVersion>3.6.1.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.5.4</PackageVersion>
11+
<PackageVersion>3.6.1</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore.DSL/Services/DefinitionLoader.cs

Lines changed: 98 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -217,30 +217,116 @@ private void AttachOutputs(StepSourceV1 source, Type dataType, Type stepType, Wo
217217
var sourceExpr = DynamicExpressionParser.ParseLambda(new[] { stepParameter }, typeof(object), output.Value);
218218

219219
var dataParameter = Expression.Parameter(dataType, "data");
220-
Expression targetProperty;
221220

222-
// Check if our datatype has a matching property
223-
var propertyInfo = dataType.GetProperty(output.Key);
224-
if (propertyInfo != null)
221+
222+
if(output.Key.Contains(".") || output.Key.Contains("["))
223+
{
224+
AttachNestedOutput(output, step, source, sourceExpr, dataParameter);
225+
}else
225226
{
226-
targetProperty = Expression.Property(dataParameter, propertyInfo);
227-
var targetExpr = Expression.Lambda(targetProperty, dataParameter);
228-
step.Outputs.Add(new MemberMapParameter(sourceExpr, targetExpr));
227+
AttachDirectlyOutput(output, step, dataType, sourceExpr, dataParameter);
229228
}
230-
else
229+
}
230+
}
231+
232+
private void AttachDirectlyOutput(KeyValuePair<string, string> output, WorkflowStep step, Type dataType, LambdaExpression sourceExpr, ParameterExpression dataParameter)
233+
{
234+
Expression targetProperty;
235+
236+
// Check if our datatype has a matching property
237+
var propertyInfo = dataType.GetProperty(output.Key);
238+
if (propertyInfo != null)
239+
{
240+
targetProperty = Expression.Property(dataParameter, propertyInfo);
241+
var targetExpr = Expression.Lambda(targetProperty, dataParameter);
242+
step.Outputs.Add(new MemberMapParameter(sourceExpr, targetExpr));
243+
}
244+
else
245+
{
246+
// If we did not find a matching property try to find a Indexer with string parameter
247+
propertyInfo = dataType.GetProperty("Item");
248+
targetProperty = Expression.Property(dataParameter, propertyInfo, Expression.Constant(output.Key));
249+
250+
Action<IStepBody, object> acn = (pStep, pData) =>
251+
{
252+
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ;
253+
propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key });
254+
};
255+
256+
step.Outputs.Add(new ActionParameter<IStepBody, object>(acn));
257+
}
258+
259+
}
260+
261+
private void AttachNestedOutput( KeyValuePair<string, string> output, WorkflowStep step, StepSourceV1 source, LambdaExpression sourceExpr, ParameterExpression dataParameter)
262+
{
263+
PropertyInfo propertyInfo = null;
264+
String[] paths = output.Key.Split('.');
265+
266+
Expression targetProperty = dataParameter;
267+
268+
bool hasAddOutput = false;
269+
270+
foreach (String propertyName in paths)
271+
{
272+
if (hasAddOutput)
273+
{
274+
throw new ArgumentException($"Unknown property for output {output.Key} on {source.Id}");
275+
}
276+
277+
if (targetProperty == null)
278+
{
279+
break;
280+
}
281+
282+
if (propertyName.Contains("["))
231283
{
232-
// If we did not find a matching property try to find a Indexer with string parameter
233-
propertyInfo = dataType.GetProperty("Item");
234-
targetProperty = Expression.Property(dataParameter, propertyInfo, Expression.Constant(output.Key));
284+
String[] items = propertyName.Split('[');
285+
286+
if (items.Length != 2)
287+
{
288+
throw new ArgumentException($"Unknown property for output {output.Key} on {source.Id}");
289+
}
290+
291+
items[1] = items[1].Trim().TrimEnd(']').Trim().Trim('"');
292+
293+
MemberExpression memberExpression = Expression.Property(targetProperty, items[0]);
294+
295+
if (memberExpression == null)
296+
{
297+
throw new ArgumentException($"Unknown property for output {output.Key} on {source.Id}");
298+
}
299+
propertyInfo = ((PropertyInfo)memberExpression.Member).PropertyType.GetProperty("Item");
235300

236301
Action<IStepBody, object> acn = (pStep, pData) =>
237302
{
303+
var targetExpr = Expression.Lambda(memberExpression, dataParameter);
304+
object data = targetExpr.Compile().DynamicInvoke(pData);
238305
object resolvedValue = sourceExpr.Compile().DynamicInvoke(pStep); ;
239-
propertyInfo.SetValue(pData, resolvedValue, new object[] { output.Key });
306+
propertyInfo.SetValue(data, resolvedValue, new object[] { items[1] });
240307
};
241308

242309
step.Outputs.Add(new ActionParameter<IStepBody, object>(acn));
310+
hasAddOutput = true;
243311
}
312+
else
313+
{
314+
try
315+
{
316+
targetProperty = Expression.Property(targetProperty, propertyName);
317+
}
318+
catch
319+
{
320+
targetProperty = null;
321+
break;
322+
}
323+
}
324+
}
325+
326+
if (targetProperty != null && !hasAddOutput)
327+
{
328+
var targetExpr = Expression.Lambda(targetProperty, dataParameter);
329+
step.Outputs.Add(new MemberMapParameter(sourceExpr, targetExpr));
244330
}
245331
}
246332

src/WorkflowCore.DSL/WorkflowCore.DSL.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<ItemGroup>
1212
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
1313
<PackageReference Include="SharpYaml" Version="1.6.5" />
14-
<PackageReference Include="System.Linq.Dynamic.Core" Version="1.2.7" />
14+
<PackageReference Include="System.Linq.Dynamic.Core" Version="1.2.13" />
1515
</ItemGroup>
1616

1717
<ItemGroup>

src/WorkflowCore/Interface/Persistence/IPersistenceProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
namespace WorkflowCore.Interface
88
{
9-
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository
9+
public interface IPersistenceProvider : IWorkflowRepository, ISubscriptionRepository, IEventRepository, IScheduledCommandRepository
1010
{
1111

1212
Task PersistErrors(IEnumerable<ExecutionError> errors, CancellationToken cancellationToken = default);
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using WorkflowCore.Models;
7+
8+
namespace WorkflowCore.Interface
9+
{
10+
public interface IScheduledCommandRepository
11+
{
12+
bool SupportsScheduledCommands { get; }
13+
14+
Task ScheduleCommand(ScheduledCommand command);
15+
16+
Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default);
17+
}
18+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Text;
4+
5+
namespace WorkflowCore.Models
6+
{
7+
public class ScheduledCommand
8+
{
9+
public const string ProcessWorkflow = "ProcessWorkflow";
10+
public const string ProcessEvent = "ProcessEvent";
11+
12+
public string CommandName { get; set; }
13+
public string Data { get; set; }
14+
public long ExecuteTime { get; set; }
15+
}
16+
}

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Linq;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using Microsoft.Extensions.Logging;
56
using WorkflowCore.Interface;
67
using WorkflowCore.Models;
@@ -48,6 +49,13 @@ public void Stop()
4849
/// Poll the persistence store for stashed unpublished events
4950
/// </summary>
5051
private async void PollRunnables(object target)
52+
{
53+
await PollWorkflows();
54+
await PollEvents();
55+
await PollCommands();
56+
}
57+
58+
private async Task PollWorkflows()
5159
{
5260
try
5361
{
@@ -56,9 +64,27 @@ private async void PollRunnables(object target)
5664
try
5765
{
5866
_logger.LogDebug("Polling for runnable workflows");
67+
5968
var runnables = await _persistenceStore.GetRunnableInstances(_dateTimeProvider.Now);
6069
foreach (var item in runnables)
6170
{
71+
if (_persistenceStore.SupportsScheduledCommands)
72+
{
73+
try
74+
{
75+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
76+
{
77+
CommandName = ScheduledCommand.ProcessWorkflow,
78+
Data = item,
79+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
80+
});
81+
continue;
82+
}
83+
catch (Exception ex)
84+
{
85+
_logger.LogError(ex, ex.Message);
86+
}
87+
}
6288
if (_greylist.Contains($"wf:{item}"))
6389
{
6490
_logger.LogDebug($"Got greylisted workflow {item}");
@@ -79,17 +105,38 @@ private async void PollRunnables(object target)
79105
{
80106
_logger.LogError(ex, ex.Message);
81107
}
108+
}
82109

110+
private async Task PollEvents()
111+
{
83112
try
84113
{
85114
if (await _lockProvider.AcquireLock("unprocessed events", new CancellationToken()))
86115
{
87116
try
88117
{
89118
_logger.LogDebug("Polling for unprocessed events");
119+
90120
var events = await _persistenceStore.GetRunnableEvents(_dateTimeProvider.Now);
91121
foreach (var item in events.ToList())
92122
{
123+
if (_persistenceStore.SupportsScheduledCommands)
124+
{
125+
try
126+
{
127+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
128+
{
129+
CommandName = ScheduledCommand.ProcessEvent,
130+
Data = item,
131+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
132+
});
133+
continue;
134+
}
135+
catch (Exception ex)
136+
{
137+
_logger.LogError(ex, ex.Message);
138+
}
139+
}
93140
if (_greylist.Contains($"evt:{item}"))
94141
{
95142
_logger.LogDebug($"Got greylisted event {item}");
@@ -111,5 +158,42 @@ private async void PollRunnables(object target)
111158
_logger.LogError(ex, ex.Message);
112159
}
113160
}
161+
162+
private async Task PollCommands()
163+
{
164+
try
165+
{
166+
if (!_persistenceStore.SupportsScheduledCommands)
167+
return;
168+
169+
if (await _lockProvider.AcquireLock("poll-commands", new CancellationToken()))
170+
{
171+
try
172+
{
173+
_logger.LogDebug("Polling for scheduled commands");
174+
await _persistenceStore.ProcessCommands(new DateTimeOffset(_dateTimeProvider.UtcNow), async (command) =>
175+
{
176+
switch (command.CommandName)
177+
{
178+
case ScheduledCommand.ProcessWorkflow:
179+
await _queueProvider.QueueWork(command.Data, QueueType.Workflow);
180+
break;
181+
case ScheduledCommand.ProcessEvent:
182+
await _queueProvider.QueueWork(command.Data, QueueType.Event);
183+
break;
184+
}
185+
});
186+
}
187+
finally
188+
{
189+
await _lockProvider.ReleaseLock("poll-commands");
190+
}
191+
}
192+
}
193+
catch (Exception ex)
194+
{
195+
_logger.LogError(ex, ex.Message);
196+
}
197+
}
114198
}
115199
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,25 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6969

7070
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
7171

72-
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
73-
74-
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
72+
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7573
{
76-
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
74+
var readAheadTicks = _datetimeProvider.UtcNow.Add(Options.PollInterval).Ticks;
75+
if (workflow.NextExecution.Value < readAheadTicks)
76+
{
77+
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
78+
}
79+
else
80+
{
81+
if (_persistenceStore.SupportsScheduledCommands)
82+
{
83+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
84+
{
85+
CommandName = ScheduledCommand.ProcessWorkflow,
86+
Data = workflow.Id,
87+
ExecuteTime = workflow.NextExecution.Value
88+
});
89+
}
90+
}
7791
}
7892
}
7993
}

0 commit comments

Comments
 (0)