Skip to content

add support to get multiple workflow instances by a list of ids #277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/WorkflowCore/Interface/IPersistenceProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace WorkflowCore.Interface
{
/// <remarks>
/// The implemention of this interface will be responsible for
/// persisiting running workflow instances to a durable store
/// persisting running workflow instances to a durable store
/// </remarks>
public interface IPersistenceProvider
{
Expand All @@ -22,6 +22,8 @@ public interface IPersistenceProvider

Task<WorkflowInstance> GetWorkflowInstance(string Id);

Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids);

Task<string> CreateEventSubscription(EventSubscription subscription);

Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@

namespace WorkflowCore.Services
{

public interface ISingletonMemoryProvider : IPersistenceProvider
{
}
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously

/// <summary>
/// In-memory implementation of IPersistenceProvider for demo and testing purposes
/// </summary>
public class MemoryPersistenceProvider : ISingletonMemoryProvider
{
{
private readonly List<WorkflowInstance> _instances = new List<WorkflowInstance>();
private readonly List<EventSubscription> _subscriptions = new List<EventSubscription>();
private readonly List<Event> _events = new List<Event>();
private readonly List<ExecutionError> _errors = new List<ExecutionError>();

public async Task<string> CreateNewWorkflow(WorkflowInstance workflow)
{
lock (_instances)
Expand Down Expand Up @@ -61,6 +61,19 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
}
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}

lock (_instances)
{
return _instances.Where(x => ids.Contains(x.Id));
}
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
{
lock (_instances)
Expand Down Expand Up @@ -121,7 +134,7 @@ public async Task TerminateSubscription(string eventSubscriptionId)
}

public void EnsureStoreExists()
{
{
}

public async Task<string> CreateEvent(Event newEvent)
Expand All @@ -133,7 +146,7 @@ public async Task<string> CreateEvent(Event newEvent)
return newEvent.Id;
}
}

public async Task MarkEventProcessed(string id)
{
lock (_events)
Expand Down Expand Up @@ -197,5 +210,5 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors)
}
}

#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
#pragma warning restore CS1998 // Async method lacks 'await' operators and will run synchronously
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ public TransientMemoryPersistenceProvider(ISingletonMemoryProvider innerService)

public Task<WorkflowInstance> GetWorkflowInstance(string Id) => _innerService.GetWorkflowInstance(Id);

public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids) => _innerService.GetWorkflowInstances(ids);

public Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take) => _innerService.GetWorkflowInstances(status, type, createdFrom, createdTo, skip, take);

public Task MarkEventProcessed(string id) => _innerService.MarkEventProcessed(id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public EntityFrameworkPersistenceProvider(IWorkflowDbContextFactory contextFacto
_canCreateDB = canCreateDB;
_canMigrateDB = canMigrateDB;
}

public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
using (var db = ConstructDbContext())
Expand Down Expand Up @@ -96,7 +96,7 @@ public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowSt
return result;
}
}

public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
{
using (var db = ConstructDbContext())
Expand All @@ -115,6 +115,26 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
}
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}

using (var db = ConstructDbContext())
{
var uids = ids.Select(i => new Guid(i));
var raw = db.Set<PersistedWorkflow>()
.Include(wf => wf.ExecutionPointers)
.ThenInclude(ep => ep.ExtensionAttributes)
.Include(wf => wf.ExecutionPointers)
.Where(x => uids.Contains(x.InstanceId));

return (await raw.ToListAsync()).Select(i => i.ToWorkflowInstance());
}
}

public async Task PersistWorkflow(WorkflowInstance workflow)
{
using (var db = ConstructDbContext())
Expand Down Expand Up @@ -143,7 +163,7 @@ public async Task TerminateSubscription(string eventSubscriptionId)
await db.SaveChangesAsync();
}
}

public virtual void EnsureStoreExists()
{
using (var context = ConstructDbContext())
Expand Down Expand Up @@ -283,7 +303,7 @@ public async Task PersistErrors(IEnumerable<ExecutionError> errors)
}
}
}

private WorkflowDbContext ConstructDbContext()
{
return _contextFactory.Build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public MongoPersistenceProvider(IMongoDatabase database)
static MongoPersistenceProvider()
{
BsonClassMap.RegisterClassMap<WorkflowInstance>(x =>
{
x.MapIdProperty(y => y.Id)
{
x.MapIdProperty(y => y.Id)
.SetIdGenerator(new StringObjectIdGenerator());
x.MapProperty(y => y.Data)
.SetSerializer(new DataObjectSerializer());
Expand All @@ -48,9 +48,9 @@ static MongoPersistenceProvider()
.SetIdGenerator(new StringObjectIdGenerator());
x.MapProperty(y => y.EventName);
x.MapProperty(y => y.EventKey);
x.MapProperty(y => y.StepId);
x.MapProperty(y => y.StepId);
x.MapProperty(y => y.WorkflowId);
x.MapProperty(y => y.SubscribeAsOf);
x.MapProperty(y => y.SubscribeAsOf);
});

BsonClassMap.RegisterClassMap<Event>(x =>
Expand Down Expand Up @@ -113,7 +113,18 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
var result = await WorkflowInstances.FindAsync(x => x.Id == Id);
return await result.FirstAsync();
}


public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}

var result = await WorkflowInstances.FindAsync(x => ids.Contains(x.Id));
return await result.ToListAsync();
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(WorkflowStatus? status, string type, DateTime? createdFrom, DateTime? createdTo, int skip, int take)
{
IQueryable<WorkflowInstance> result = WorkflowInstances.AsQueryable();
Expand All @@ -138,17 +149,17 @@ public async Task<string> CreateEventSubscription(EventSubscription subscription
await EventSubscriptions.InsertOneAsync(subscription);
return subscription.Id;
}

public async Task TerminateSubscription(string eventSubscriptionId)
{
await EventSubscriptions.DeleteOneAsync(x => x.Id == eventSubscriptionId);
}

public void EnsureStoreExists()
{
}

}

public async Task<IEnumerable<EventSubscription>> GetSubcriptions(string eventName, string eventKey, DateTime asOf)
{
var query = EventSubscriptions
Expand Down Expand Up @@ -192,7 +203,7 @@ public async Task<IEnumerable<string>> GetEvents(string eventName, string eventK
var query = Events
.Find(x => x.EventName == eventName && x.EventKey == eventKey && x.EventTime >= asOf)
.Project(x => x.Id);

return await query.ToListAsync();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using WorkflowCore.Providers.AWS.Interface;
Expand Down Expand Up @@ -86,7 +87,7 @@ public async Task<IEnumerable<string>> GetRunnableInstances(DateTime asAt)
},
ScanIndexForward = true
};

var response = await _client.QueryAsync(request);

foreach (var item in response.Items)
Expand Down Expand Up @@ -117,6 +118,57 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
return response.Item.ToWorkflowInstance();
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}

var keys = new KeysAndAttributes() { Keys = new List<Dictionary<string, AttributeValue>>() };
foreach (var id in ids)
{
var key = new Dictionary<string, AttributeValue>()
{
{
"id", new AttributeValue { S = id }
}
};
keys.Keys.Add(key);
}

var request = new BatchGetItemRequest
{
RequestItems = new Dictionary<string, KeysAndAttributes>()
{
{
$"{_tablePrefix}-{WORKFLOW_TABLE}", keys
}
}
};

var result = new List<Dictionary<string, AttributeValue>>();
BatchGetItemResponse response;
do
{
// Making request
response = await _client.BatchGetItemAsync(request);

// Check the response
var responses = response.Responses; // Attribute list in the response.
foreach (var tableResponse in responses)
{
result.AddRange(tableResponse.Value);
}

// Any unprocessed keys? could happen if you exceed ProvisionedThroughput or some other error.
Dictionary<string, KeysAndAttributes> unprocessedKeys = response.UnprocessedKeys;
request.RequestItems = unprocessedKeys;
} while (response.UnprocessedKeys.Count > 0);

return result.Select(i => i.ToWorkflowInstance());
}

public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
Expand Down Expand Up @@ -326,4 +378,4 @@ public void EnsureStoreExists()
_provisioner.ProvisionTables().Wait();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -75,6 +76,17 @@ public async Task<WorkflowInstance> GetWorkflowInstance(string Id)
return JsonConvert.DeserializeObject<WorkflowInstance>(raw, _serializerSettings);
}

public async Task<IEnumerable<WorkflowInstance>> GetWorkflowInstances(IEnumerable<string> ids)
{
if (ids == null)
{
return new List<WorkflowInstance>();
}

var raw = await _redis.HashGetAsync($"{_prefix}.{WORKFLOW_SET}", Array.ConvertAll(ids.ToArray(), x => (RedisValue)x));
return raw.Select(r => JsonConvert.DeserializeObject<WorkflowInstance>(r, _serializerSettings));
}

public async Task<string> CreateEventSubscription(EventSubscription subscription)
{
subscription.Id = Guid.NewGuid().ToString();
Expand Down
Loading