Skip to content

Commit c2d0ace

Browse files
authored
Merge pull request danielgerlag#277 from knoxi/features/issue-261
add support to get multiple workflow instances by a list of ids
2 parents f0eb716 + ded7645 commit c2d0ace

File tree

8 files changed

+225
-30
lines changed

8 files changed

+225
-30
lines changed

src/WorkflowCore/Interface/IPersistenceProvider.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ namespace WorkflowCore.Interface
77
{
88
/// <remarks>
99
/// The implemention of this interface will be responsible for
10-
/// persisiting running workflow instances to a durable store
10+
/// persisting running workflow instances to a durable store
1111
/// </remarks>
1212
public interface IPersistenceProvider
1313
{
@@ -22,6 +22,8 @@ public interface IPersistenceProvider
2222

2323
Task<WorkflowInstance> GetWorkflowInstance(string Id);
2424

25+
Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids);
26+
2527
Task<string> CreateEventSubscription(EventSubscription subscription);
2628

2729
Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf);

src/WorkflowCore/Services/DefaultProviders/MemoryPersistenceProvider.cs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,22 @@
88

99
namespace WorkflowCore.Services
1010
{
11-
11+
1212
public interface ISingletonMemoryProvider : IPersistenceProvider
1313
{
1414
}
15-
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
15+
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
1616

1717
/// <summary>
1818
/// In-memory implementation of IPersistenceProvider for demo and testing purposes
1919
/// </summary>
2020
public class MemoryPersistenceProvider : ISingletonMemoryProvider
21-
{
21+
{
2222
private readonly List<WorkflowInstance> _instances = new List<WorkflowInstance>();
2323
private readonly List<EventSubscription> _subscriptions = new List<EventSubscription>();
2424
private readonly List<Event> _events = new List<Event>();
2525
private readonly List<ExecutionError> _errors = new List<ExecutionError>();
26-
26+
2727
public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
2828
{
2929
lock (_instances)
@@ -61,6 +61,19 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
6161
}
6262
}
6363

64+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
65+
{
66+
if (ids == null)
67+
{
68+
return new List<WorkflowInstance>();
69+
}
70+
71+
lock (_instances)
72+
{
73+
return _instances.Where(x => ids.Contains(x.Id));
74+
}
75+
}
76+
6477
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
6578
{
6679
lock (_instances)
@@ -121,7 +134,7 @@ public async Task TerminateSubscription(string eventSubscriptionId)
121134
}
122135

123136
public void EnsureStoreExists()
124-
{
137+
{
125138
}
126139

127140
public async Task<string> CreateEvent(Event newEvent)
@@ -133,7 +146,7 @@ public async Task<string> CreateEvent(Event newEvent)
133146
return newEvent.Id;
134147
}
135148
}
136-
149+
137150
public async Task MarkEventProcessed(string id)
138151
{
139152
lock (_events)
@@ -197,5 +210,5 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors)
197210
}
198211
}
199212

200-
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
213+
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
201214
}

src/WorkflowCore/Services/DefaultProviders/TransientMemoryPersistenceProvider.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)
3636

3737
public Task<WorkflowInstance> GetWorkflowInstance(string Id) => _innerService.GetWorkflowInstance(Id);
3838

39+
public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids) => _innerService.GetWorkflowInstances(ids);
40+
3941
public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);
4042

4143
public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);

src/providers/WorkflowCore.Persistence.EntityFramework/Services/EntityFrameworkPersistenceProvider.cs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFacto
2626
_canCreateDB = canCreateDB;
2727
_canMigrateDB = canMigrateDB;
2828
}
29-
29+
3030
public async Task<string> CreateEventSubscription(EventSubscription subscription)
3131
{
3232
using (var db = ConstructDbContext())
@@ -96,7 +96,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
9696
return result;
9797
}
9898
}
99-
99+
100100
public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
101101
{
102102
using (var db = ConstructDbContext())
@@ -115,6 +115,26 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
115115
}
116116
}
117117

118+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
119+
{
120+
if (ids == null)
121+
{
122+
return new List<WorkflowInstance>();
123+
}
124+
125+
using (var db = ConstructDbContext())
126+
{
127+
var uids = ids.Select(i => new Guid(i));
128+
var raw = db.Set<PersistedWorkflow>()
129+
.Include(wf => wf.ExecutionPointers)
130+
.ThenInclude(ep => ep.ExtensionAttributes)
131+
.Include(wf => wf.ExecutionPointers)
132+
.Where(x => uids.Contains(x.InstanceId));
133+
134+
return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance());
135+
}
136+
}
137+
118138
public async Task PersistWorkflow(WorkflowInstance workflow)
119139
{
120140
using (var db = ConstructDbContext())
@@ -143,7 +163,7 @@ public async Task TerminateSubscription(string eventSubscriptionId)
143163
await db.SaveChangesAsync();
144164
}
145165
}
146-
166+
147167
public virtual void EnsureStoreExists()
148168
{
149169
using (var context = ConstructDbContext())
@@ -283,7 +303,7 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors)
283303
}
284304
}
285305
}
286-
306+
287307
private WorkflowDbContext ConstructDbContext()
288308
{
289309
return _contextFactory.Build();

src/providers/WorkflowCore.Persistence.MongoDB/Services/MongoPersistenceProvider.cs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ public MongoPersistenceProvider(IMongoDatabase database)
2626
static MongoPersistenceProvider()
2727
{
2828
BsonClassMap.RegisterClassMap<WorkflowInstance>(x =>
29-
{
30-
x.MapIdProperty(y => y.Id)
29+
{
30+
x.MapIdProperty(y => y.Id)
3131
.SetIdGenerator(new StringObjectIdGenerator());
3232
x.MapProperty(y => y.Data)
3333
.SetSerializer(new DataObjectSerializer());
@@ -48,9 +48,9 @@ static MongoPersistenceProvider()
4848
.SetIdGenerator(new StringObjectIdGenerator());
4949
x.MapProperty(y => y.EventName);
5050
x.MapProperty(y => y.EventKey);
51-
x.MapProperty(y => y.StepId);
51+
x.MapProperty(y => y.StepId);
5252
x.MapProperty(y => y.WorkflowId);
53-
x.MapProperty(y => y.SubscribeAsOf);
53+
x.MapProperty(y => y.SubscribeAsOf);
5454
});
5555

5656
BsonClassMap.RegisterClassMap<Event>(x =>
@@ -113,7 +113,18 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
113113
var result = await WorkflowInstances.FindAsync(x => x.Id == Id);
114114
return await result.FirstAsync();
115115
}
116-
116+
117+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
118+
{
119+
if (ids == null)
120+
{
121+
return new List<WorkflowInstance>();
122+
}
123+
124+
var result = await WorkflowInstances.FindAsync(x => ids.Contains(x.Id));
125+
return await result.ToListAsync();
126+
}
127+
117128
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
118129
{
119130
IQueryable<WorkflowInstance> result = WorkflowInstances.AsQueryable();
@@ -138,17 +149,17 @@ public async Task<string> CreateEventSubscription(EventSubscription subscription
138149
await EventSubscriptions.InsertOneAsync(subscription);
139150
return subscription.Id;
140151
}
141-
152+
142153
public async Task TerminateSubscription(string eventSubscriptionId)
143154
{
144155
await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId);
145156
}
146157

147158
public void EnsureStoreExists()
148159
{
149-
150-
}
151-
160+
161+
}
162+
152163
public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
153164
{
154165
var query = EventSubscriptions
@@ -192,7 +203,7 @@ public async Task<IEnumerable<string>> GetEvents(string eventName, string eventK
192203
var query = Events
193204
.Find(x => x.EventName == eventName && x.EventKey == eventKey && x.EventTime >= asOf)
194205
.Project(x => x.Id);
195-
206+
196207
return await query.ToListAsync();
197208
}
198209

src/providers/WorkflowCore.Providers.AWS/Services/DynamoPersistenceProvider.cs

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using Microsoft.Extensions.Logging;
55
using System;
66
using System.Collections.Generic;
7+
using System.Linq;
78
using System.Text;
89
using System.Threading.Tasks;
910
using WorkflowCore.Providers.AWS.Interface;
@@ -86,7 +87,7 @@ public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
8687
},
8788
ScanIndexForward = true
8889
};
89-
90+
9091
var response = await _client.QueryAsync(request);
9192

9293
foreach (var item in response.Items)
@@ -117,6 +118,57 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
117118
return response.Item.ToWorkflowInstance();
118119
}
119120

121+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
122+
{
123+
if (ids == null)
124+
{
125+
return new List<WorkflowInstance>();
126+
}
127+
128+
var keys = new KeysAndAttributes() { Keys = new List<Dictionary<string, AttributeValue>>() };
129+
foreach (var id in ids)
130+
{
131+
var key = new Dictionary<string, AttributeValue>()
132+
{
133+
{
134+
"id", new AttributeValue { S = id }
135+
}
136+
};
137+
keys.Keys.Add(key);
138+
}
139+
140+
var request = new BatchGetItemRequest
141+
{
142+
RequestItems = new Dictionary<string, KeysAndAttributes>()
143+
{
144+
{
145+
$"{_tablePrefix}-{WORKFLOW_TABLE}", keys
146+
}
147+
}
148+
};
149+
150+
var result = new List<Dictionary<string, AttributeValue>>();
151+
BatchGetItemResponse response;
152+
do
153+
{
154+
// Making request
155+
response = await _client.BatchGetItemAsync(request);
156+
157+
// Check the response
158+
var responses = response.Responses; // Attribute list in the response.
159+
foreach (var tableResponse in responses)
160+
{
161+
result.AddRange(tableResponse.Value);
162+
}
163+
164+
// Any unprocessed keys? could happen if you exceed ProvisionedThroughput or some other error.
165+
Dictionary<string, KeysAndAttributes> unprocessedKeys = response.UnprocessedKeys;
166+
request.RequestItems = unprocessedKeys;
167+
} while (response.UnprocessedKeys.Count > 0);
168+
169+
return result.Select(i => i.ToWorkflowInstance());
170+
}
171+
120172
public async Task<string> CreateEventSubscription(EventSubscription subscription)
121173
{
122174
subscription.Id = Guid.NewGuid().ToString();
@@ -326,4 +378,4 @@ public void EnsureStoreExists()
326378
_provisioner.ProvisionTables().Wait();
327379
}
328380
}
329-
}
381+
}

src/providers/WorkflowCore.Providers.Redis/Services/RedisPersistenceProvider.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.Linq;
34
using System.Text;
45
using System.Threading.Tasks;
56
using Microsoft.Extensions.Logging;
@@ -75,6 +76,17 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
7576
return JsonConvert.DeserializeObject<WorkflowInstance>(raw, _serializerSettings);
7677
}
7778

79+
public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
80+
{
81+
if (ids == null)
82+
{
83+
return new List<WorkflowInstance>();
84+
}
85+
86+
var raw = await _redis.HashGetAsync($"{_prefix}.{WORKFLOW_SET}", Array.ConvertAll(ids.ToArray(), x => (RedisValue)x));
87+
return raw.Select(r => JsonConvert.DeserializeObject<WorkflowInstance>(r, _serializerSettings));
88+
}
89+
7890
public async Task<string> CreateEventSubscription(EventSubscription subscription)
7991
{
8092
subscription.Id = Guid.NewGuid().ToString();

0 commit comments

Comments
 (0)