Skip to content

Misc improvements #269

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/WorkflowCore/Interface/IQueueProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ public interface IQueueProvider : IDisposable
Task Stop();
}

public enum QueueType { Workflow = 0, Event = 1 }
public enum QueueType { Workflow = 0, Event = 1, Index = 2 }
}
14 changes: 14 additions & 0 deletions src/WorkflowCore/Interface/IStepBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Name(string name);

/// <summary>
/// Specifies a custom Id to reference this step
/// </summary>
/// <param name="id">A custom Id to reference this step</param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Id(string id);

/// <summary>
/// Specify the next step in the workflow
/// </summary>
Expand Down Expand Up @@ -51,6 +58,13 @@ public interface IStepBuilder<TData, TStepBody>
/// <returns></returns>
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);

/// <summary>
/// Specify the next step in the workflow by Id
/// </summary>
/// <param name="id"></param>
/// <returns></returns>
IStepBuilder<TData, TStepBody> Attach(string id);

/// <summary>
/// Configure an outcome for this step, then wire it to another step
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/StepOutcome.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public Expression<Func<object, object>> Value

public string Label { get; set; }

public string Tag { get; set; }
public string ExternalNextStepId { get; set; }

public object GetValue(object data)
{
Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/WorkflowDefinition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public class WorkflowDefinition

public string Description { get; set; }

public List<WorkflowStep> Steps { get; set; }
public WorkflowStepCollection Steps { get; set; } = new WorkflowStepCollection();

public Type DataType { get; set; }

Expand Down
2 changes: 1 addition & 1 deletion src/WorkflowCore/Models/WorkflowStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class WorkflowStep

public virtual string Name { get; set; }

public virtual string Tag { get; set; }
public virtual string ExternalId { get; set; }

public virtual List<int> Children { get; set; } = new List<int>();

Expand Down
81 changes: 81 additions & 0 deletions src/WorkflowCore/Models/WorkflowStepCollection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace WorkflowCore.Models
{
public class WorkflowStepCollection : ICollection<WorkflowStep>
{
private readonly Dictionary<int, WorkflowStep> _dictionary = new Dictionary<int, WorkflowStep>();

public WorkflowStepCollection()
{
}

public WorkflowStepCollection(int capacity)
{
_dictionary = new Dictionary<int, WorkflowStep>(capacity);
}

public WorkflowStepCollection(ICollection<WorkflowStep> steps)
{
foreach (var step in steps)
{
Add(step);
}
}

public IEnumerator<WorkflowStep> GetEnumerator()
{
return _dictionary.Values.GetEnumerator();
}

IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}

public WorkflowStep FindById(int id)
{
if (!_dictionary.ContainsKey(id))
return null;

return _dictionary[id];
}

public void Add(WorkflowStep item)
{
_dictionary.Add(item.Id, item);
}

public void Clear()
{
_dictionary.Clear();
}

public bool Contains(WorkflowStep item)
{
return _dictionary.ContainsValue(item);
}

public void CopyTo(WorkflowStep[] array, int arrayIndex)
{
_dictionary.Values.CopyTo(array, arrayIndex);
}

public bool Remove(WorkflowStep item)
{
return _dictionary.Remove(item.Id);
}

public WorkflowStep Find(Predicate<WorkflowStep> match)
{
return _dictionary.Values.FirstOrDefault(x => match(x));
}

public int Count => _dictionary.Count;
public bool IsReadOnly => false;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using WorkflowCore.Interface;
Expand All @@ -12,48 +13,42 @@ namespace WorkflowCore.Services
/// </summary>
public class SingleNodeQueueProvider : IQueueProvider
{

private readonly BlockingCollection<string> _runQueue = new BlockingCollection<string>();
private readonly BlockingCollection<string> _eventQueue = new BlockingCollection<string>();

private readonly Dictionary<QueueType, BlockingCollection<string>> _queues = new Dictionary<QueueType, BlockingCollection<string>>()
{
[QueueType.Workflow] = new BlockingCollection<string>(),
[QueueType.Event] = new BlockingCollection<string>(),
[QueueType.Index] = new BlockingCollection<string>()
};

public bool IsDequeueBlocking => true;

public async Task QueueWork(string id, QueueType queue)
{
SelectQueue(queue).Add(id);
_queues[queue].Add(id);
}

public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
{
if (SelectQueue(queue).TryTake(out string id, 100, cancellationToken))
if (_queues[queue].TryTake(out string id, 100, cancellationToken))
return id;

return null;
}

public async Task Start()
public Task Start()
{
return Task.CompletedTask;
}

public async Task Stop()
public Task Stop()
{
return Task.CompletedTask;
}

public void Dispose()
{
}

private BlockingCollection<string> SelectQueue(QueueType queue)
{
switch (queue)
{
case QueueType.Workflow:
return _runQueue;
case QueueType.Event:
return _eventQueue;
}
return null;
}

}

Expand Down
26 changes: 13 additions & 13 deletions src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ private WorkflowDefinition Convert(DefinitionSourceV1 source)
}


private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
{
var result = new List<WorkflowStep>();
var result = new WorkflowStepCollection();
int i = 0;
var stack = new Stack<StepSourceV1>(source.Reverse<StepSourceV1>());
var parents = new List<StepSourceV1>();
Expand Down Expand Up @@ -87,7 +87,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
targetStep.Name = nextStep.Name;
targetStep.ErrorBehavior = nextStep.ErrorBehavior;
targetStep.RetryInterval = nextStep.RetryInterval;
targetStep.Tag = $"{nextStep.Id}";
targetStep.ExternalId = $"{nextStep.Id}";

AttachInputs(nextStep, dataType, stepType, targetStep);
AttachOutputs(nextStep, dataType, stepType, targetStep);
Expand All @@ -114,7 +114,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
}

if (!string.IsNullOrEmpty(nextStep.NextStepId))
targetStep.Outcomes.Add(new StepOutcome() { Tag = $"{nextStep.NextStepId}" });
targetStep.Outcomes.Add(new StepOutcome() { ExternalNextStepId = $"{nextStep.NextStepId}" });

result.Add(targetStep);

Expand All @@ -123,26 +123,26 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d

foreach (var step in result)
{
if (result.Any(x => x.Tag == step.Tag && x.Id != step.Id))
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.Tag}");
if (result.Any(x => x.ExternalId == step.ExternalId && x.Id != step.Id))
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.ExternalId}");

foreach (var outcome in step.Outcomes)
{
if (result.All(x => x.Tag != outcome.Tag))
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.Tag}");
if (result.All(x => x.ExternalId != outcome.ExternalNextStepId))
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.ExternalNextStepId}");

outcome.NextStep = result.Single(x => x.Tag == outcome.Tag).Id;
outcome.NextStep = result.Single(x => x.ExternalId == outcome.ExternalNextStepId).Id;
}
}

foreach (var parent in parents)
{
var target = result.Single(x => x.Tag == parent.Id);
var target = result.Single(x => x.ExternalId == parent.Id);
foreach (var branch in parent.Do)
{
var childTags = branch.Select(x => x.Id).ToList();
target.Children.AddRange(result
.Where(x => childTags.Contains(x.Tag))
.Where(x => childTags.Contains(x.ExternalId))
.OrderBy(x => x.Id)
.Select(x => x.Id)
.Take(1)
Expand All @@ -152,11 +152,11 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d

foreach (var item in compensatables)
{
var target = result.Single(x => x.Tag == item.Id);
var target = result.Single(x => x.ExternalId == item.Id);
var tag = item.CompensateWith.Select(x => x.Id).FirstOrDefault();
if (tag != null)
{
var compStep = result.FirstOrDefault(x => x.Tag == tag);
var compStep = result.FirstOrDefault(x => x.ExternalId == tag);
if (compStep != null)
target.CompensationStepId = compStep.Id;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
{
var pointerId = scope.Pop();
var scopePointer = workflow.ExecutionPointers.FindById(pointerId);
var scopeStep = def.Steps.First(x => x.Id == scopePointer.StepId);
var scopeStep = def.Steps.FindById(scopePointer.StepId);

var resume = true;
var revert = false;
Expand All @@ -44,7 +44,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
{
var parentId = txnStack.Pop();
var parentPointer = workflow.ExecutionPointers.FindById(parentId);
var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId);
var parentStep = def.Steps.FindById(parentPointer.StepId);
if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation))
{
resume = parentStep.ResumeChildrenAfterCompensation;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP

foreach (var siblingPointer in prevSiblings)
{
var siblingStep = def.Steps.First(x => x.Id == siblingPointer.StepId);
var siblingStep = def.Steps.FindById(siblingPointer.StepId);
if (siblingStep.CompensationStepId.HasValue)
{
var compensationPointer = _pointerFactory.BuildCompensationPointer(def, siblingPointer, exceptionPointer, siblingStep.CompensationStepId.Value);
Expand Down
8 changes: 4 additions & 4 deletions src/WorkflowCore/Services/ExecutionPointerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def)
StepId = 0,
Active = true,
Status = PointerStatus.Pending,
StepName = Enumerable.First<WorkflowStep>(def.Steps, x => x.Id == 0).Name
StepName = def.Steps.FindById(0).Name
};
}

Expand All @@ -32,7 +32,7 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe
Active = true,
ContextItem = pointer.ContextItem,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name,
StepName = def.Steps.FindById(outcomeTarget.NextStep).Name,
Scope = new List<string>(pointer.Scope)
};
}
Expand All @@ -52,7 +52,7 @@ public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPoint
Active = true,
ContextItem = branch,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == childDefinitionId).Name,
StepName = def.Steps.FindById(childDefinitionId).Name,
Scope = new List<string>(childScope)
};
}
Expand All @@ -68,7 +68,7 @@ public ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, Executi
Active = true,
ContextItem = pointer.ContextItem,
Status = PointerStatus.Pending,
StepName = def.Steps.First(x => x.Id == compensationStepId).Name,
StepName = def.Steps.FindById(compensationStepId).Name,
Scope = new List<string>(pointer.Scope)
};
}
Expand Down
4 changes: 2 additions & 2 deletions src/WorkflowCore/Services/ExecutionResultProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
while (queue.Count > 0)
{
var exceptionPointer = queue.Dequeue();
var exceptionStep = def.Steps.Find(x => x.Id == exceptionPointer.StepId);
var exceptionStep = def.Steps.FindById(exceptionPointer.StepId);
var compensatingStepId = FindScopeCompensationStepId(workflow, def, exceptionPointer);
var errorOption = (exceptionStep.ErrorBehavior ?? (compensatingStepId.HasValue ? WorkflowErrorHandling.Compensate : def.DefaultErrorBehavior));

Expand All @@ -129,7 +129,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
{
var pointerId = scope.Pop();
var pointer = workflow.ExecutionPointers.FindById(pointerId);
var step = def.Steps.First(x => x.Id == pointer.StepId);
var step = def.Steps.FindById(pointer.StepId);
if (step.CompensationStepId.HasValue)
return step.CompensationStepId.Value;
}
Expand Down
Loading