Skip to content

Commit b08f2c0

Browse files
committed
ef migrations
1 parent 583a250 commit b08f2c0

18 files changed

+1902
-137
lines changed

src/Directory.Build.props

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
<PackageLicenseUrl>https://github.com/danielgerlag/workflow-core/blob/master/LICENSE.md</PackageLicenseUrl>
55
<RepositoryType>git</RepositoryType>
66
<RepositoryUrl>https://github.com/danielgerlag/workflow-core.git</RepositoryUrl>
7-
<Version>3.5.7</Version>
8-
<AssemblyVersion>3.5.7.0</AssemblyVersion>
9-
<FileVersion>3.5.7.0</FileVersion>
7+
<Version>3.6.0</Version>
8+
<AssemblyVersion>3.6.0.0</AssemblyVersion>
9+
<FileVersion>3.6.0.0</FileVersion>
1010
<PackageIconUrl>https://github.com/danielgerlag/workflow-core/raw/master/src/logo.png</PackageIconUrl>
11-
<PackageVersion>3.5.7</PackageVersion>
11+
<PackageVersion>3.6.0</PackageVersion>
1212
</PropertyGroup>
1313
</Project>

src/WorkflowCore/Services/BackgroundTasks/RunnablePoller.cs

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,13 +69,20 @@ private async Task PollWorkflows()
6969
{
7070
if (_persistenceStore.SupportsScheduledCommands)
7171
{
72-
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
72+
try
7373
{
74-
CommandName = ScheduledCommand.ProcessWorkflow,
75-
Data = item,
76-
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
77-
});
78-
continue;
74+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
75+
{
76+
CommandName = ScheduledCommand.ProcessWorkflow,
77+
Data = item,
78+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
79+
});
80+
continue;
81+
}
82+
catch (Exception ex)
83+
{
84+
_logger.LogError(ex, ex.Message);
85+
}
7986
}
8087
if (_greylist.Contains($"wf:{item}"))
8188
{
@@ -113,13 +120,20 @@ private async Task PollEvents()
113120
{
114121
if (_persistenceStore.SupportsScheduledCommands)
115122
{
116-
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
123+
try
117124
{
118-
CommandName = ScheduledCommand.ProcessEvent,
119-
Data = item,
120-
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
121-
});
122-
continue;
125+
await _persistenceStore.ScheduleCommand(new ScheduledCommand()
126+
{
127+
CommandName = ScheduledCommand.ProcessEvent,
128+
Data = item,
129+
ExecuteTime = _dateTimeProvider.UtcNow.Ticks
130+
});
131+
continue;
132+
}
133+
catch (Exception ex)
134+
{
135+
_logger.LogError(ex, ex.Message);
136+
}
123137
}
124138
if (_greylist.Contains($"evt:{item}"))
125139
{

src/providers/WorkflowCore.Persistence.EntityFramework/ExtensionMethods.cs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ internal static PersistedEvent ToPersistable(this Event instance)
123123
return result;
124124
}
125125

126+
internal static PersistedScheduledCommand ToPersistable(this ScheduledCommand instance)
127+
{
128+
var result = new PersistedScheduledCommand();
129+
result.CommandName = instance.CommandName;
130+
result.Data = instance.Data;
131+
result.ExecuteTime = instance.ExecuteTime;
132+
133+
return result;
134+
}
135+
126136
internal static WorkflowInstance ToWorkflowInstance(this PersistedWorkflow instance)
127137
{
128138
WorkflowInstance result = new WorkflowInstance();
@@ -219,5 +229,15 @@ internal static Event ToEvent(this PersistedEvent instance)
219229

220230
return result;
221231
}
232+
233+
internal static ScheduledCommand ToScheduledCommand(this PersistedScheduledCommand instance)
234+
{
235+
var result = new ScheduledCommand();
236+
result.CommandName = instance.CommandName;
237+
result.Data = instance.Data;
238+
result.ExecuteTime = instance.ExecuteTime;
239+
240+
return result;
241+
}
222242
}
223243
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
using System;
2+
using System.ComponentModel.DataAnnotations;
3+
using System.Linq;
4+
5+
namespace WorkflowCore.Persistence.EntityFramework.Models
6+
{
7+
public class PersistedScheduledCommand
8+
{
9+
[Key]
10+
public long PersistenceId { get; set; }
11+
12+
[MaxLength(200)]
13+
public string CommandName { get; set; }
14+
15+
[MaxLength(500)]
16+
public string Data { get; set; }
17+
18+
public long ExecuteTime { get; set; }
19+
}
20+
}

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public class EntityFrameworkPersistenceProvider : IPersistenceProvider
1717
private readonly bool _canMigrateDB;
1818
private readonly IWorkflowDbContextFactory _contextFactory;
1919

20-
public bool SupportsScheduledCommands => false;
20+
public bool SupportsScheduledCommands => true;
2121

2222
public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFactory, bool canCreateDB, bool canMigrateDB)
2323
{
@@ -368,14 +368,45 @@ public async Task ClearSubscriptionToken(string eventSubscriptionId, string toke
368368
}
369369
}
370370

371-
public Task ScheduleCommand(ScheduledCommand command)
371+
public async Task ScheduleCommand(ScheduledCommand command)
372372
{
373-
throw new NotImplementedException();
373+
try
374+
{
375+
using (var db = ConstructDbContext())
376+
{
377+
var persistable = command.ToPersistable();
378+
var result = db.Set<PersistedScheduledCommand>().Add(persistable);
379+
await db.SaveChangesAsync();
380+
}
381+
}
382+
catch (DbUpdateException)
383+
{
384+
//log
385+
}
374386
}
375387

376-
public Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
388+
public async Task ProcessCommands(DateTimeOffset asOf, Func<ScheduledCommand, Task> action, CancellationToken cancellationToken = default)
377389
{
378-
throw new NotImplementedException();
390+
using (var db = ConstructDbContext())
391+
{
392+
var cursor = db.Set<PersistedScheduledCommand>()
393+
.Where(x => x.ExecuteTime < asOf.UtcDateTime.Ticks)
394+
.AsAsyncEnumerable();
395+
396+
await foreach (var command in cursor)
397+
{
398+
try
399+
{
400+
await action(command.ToScheduledCommand());
401+
db.Set<PersistedScheduledCommand>().Remove(command);
402+
await db.SaveChangesAsync();
403+
}
404+
catch (Exception)
405+
{
406+
//TODO: add logger
407+
}
408+
}
409+
}
379410
}
380411
}
381412
}

src/providers/WorkflowCore.Persistence.EntityFramework/Services/WorkflowDbContext.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ protected override void OnModelCreating(ModelBuilder modelBuilder)
3838
events.HasIndex(x => x.EventTime);
3939
events.HasIndex(x => x.IsProcessed);
4040

41+
var commands = modelBuilder.Entity<PersistedScheduledCommand>();
42+
commands.HasIndex(x => x.ExecuteTime);
43+
commands.HasIndex(x => new { x.CommandName, x.Data}).IsUnique();
44+
4145
ConfigureWorkflowStorage(workflows);
4246
ConfigureExecutionPointerStorage(executionPointers);
4347
ConfigureExecutionErrorStorage(executionErrors);

src/providers/WorkflowCore.Persistence.EntityFramework/WorkflowCore.Persistence.EntityFramework.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<PropertyGroup>
44
<AssemblyTitle>Workflow Core EntityFramework Core Persistence Provider</AssemblyTitle>
55
<Authors>Daniel Gerlag</Authors>
6-
<TargetFramework>netstandard2.0</TargetFramework>
6+
<TargetFramework>netstandard2.1</TargetFramework>
77
<AssemblyName>WorkflowCore.Persistence.EntityFramework</AssemblyName>
88
<PackageId>WorkflowCore.Persistence.EntityFramework</PackageId>
99
<PackageTags>workflow;.NET;Core;state machine;WorkflowCore;EntityFramework;EntityFrameworkCore</PackageTags>

0 commit comments

Comments
 (0)