Skip to content

Commit b4cba8d

Browse files
authored
Misc improvements (danielgerlag#269)
1 parent 791287d commit b4cba8d

File tree

29 files changed

+303
-162
lines changed

29 files changed

+303
-162
lines changed

src/WorkflowCore/Interface/IQueueProvider.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,5 @@ public interface IQueueProvider : IDisposable
3232
Task Stop();
3333
}
3434

35-
public enum QueueType { Workflow = 0, Event = 1 }
35+
public enum QueueType { Workflow = 0, Event = 1, Index = 2 }
3636
}

src/WorkflowCore/Interface/IStepBuilder.cs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ public interface IStepBuilder<TData, TStepBody>
2121
/// <returns></returns>
2222
IStepBuilder<TData, TStepBody> Name(string name);
2323

24+
/// <summary>
25+
/// Specifies a custom Id to reference this step
26+
/// </summary>
27+
/// <param name="id">A custom Id to reference this step</param>
28+
/// <returns></returns>
29+
IStepBuilder<TData, TStepBody> Id(string id);
30+
2431
/// <summary>
2532
/// Specify the next step in the workflow
2633
/// </summary>
@@ -51,6 +58,13 @@ public interface IStepBuilder<TData, TStepBody>
5158
/// <returns></returns>
5259
IStepBuilder<TData, ActionStepBody> Then(Action<IStepExecutionContext> body);
5360

61+
/// <summary>
62+
/// Specify the next step in the workflow by Id
63+
/// </summary>
64+
/// <param name="id"></param>
65+
/// <returns></returns>
66+
IStepBuilder<TData, TStepBody> Attach(string id);
67+
5468
/// <summary>
5569
/// Configure an outcome for this step, then wire it to another step
5670
/// </summary>

src/WorkflowCore/Models/StepOutcome.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public Expression<Func<object, object>> Value
1616

1717
public string Label { get; set; }
1818

19-
public string Tag { get; set; }
19+
public string ExternalNextStepId { get; set; }
2020

2121
public object GetValue(object data)
2222
{

src/WorkflowCore/Models/WorkflowDefinition.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public class WorkflowDefinition
1212

1313
public string Description { get; set; }
1414

15-
public List<WorkflowStep> Steps { get; set; }
15+
public WorkflowStepCollection Steps { get; set; } = new WorkflowStepCollection();
1616

1717
public Type DataType { get; set; }
1818

src/WorkflowCore/Models/WorkflowStep.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public abstract class WorkflowStep
1414

1515
public virtual string Name { get; set; }
1616

17-
public virtual string Tag { get; set; }
17+
public virtual string ExternalId { get; set; }
1818

1919
public virtual List<int> Children { get; set; } = new List<int>();
2020

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
using System;
2+
using System.Collections;
3+
using System.Collections.Generic;
4+
using System.Linq;
5+
using System.Text;
6+
7+
namespace WorkflowCore.Models
8+
{
9+
public class WorkflowStepCollection : ICollection<WorkflowStep>
10+
{
11+
private readonly Dictionary<int, WorkflowStep> _dictionary = new Dictionary<int, WorkflowStep>();
12+
13+
public WorkflowStepCollection()
14+
{
15+
}
16+
17+
public WorkflowStepCollection(int capacity)
18+
{
19+
_dictionary = new Dictionary<int, WorkflowStep>(capacity);
20+
}
21+
22+
public WorkflowStepCollection(ICollection<WorkflowStep> steps)
23+
{
24+
foreach (var step in steps)
25+
{
26+
Add(step);
27+
}
28+
}
29+
30+
public IEnumerator<WorkflowStep> GetEnumerator()
31+
{
32+
return _dictionary.Values.GetEnumerator();
33+
}
34+
35+
IEnumerator IEnumerable.GetEnumerator()
36+
{
37+
return GetEnumerator();
38+
}
39+
40+
public WorkflowStep FindById(int id)
41+
{
42+
if (!_dictionary.ContainsKey(id))
43+
return null;
44+
45+
return _dictionary[id];
46+
}
47+
48+
public void Add(WorkflowStep item)
49+
{
50+
_dictionary.Add(item.Id, item);
51+
}
52+
53+
public void Clear()
54+
{
55+
_dictionary.Clear();
56+
}
57+
58+
public bool Contains(WorkflowStep item)
59+
{
60+
return _dictionary.ContainsValue(item);
61+
}
62+
63+
public void CopyTo(WorkflowStep[] array, int arrayIndex)
64+
{
65+
_dictionary.Values.CopyTo(array, arrayIndex);
66+
}
67+
68+
public bool Remove(WorkflowStep item)
69+
{
70+
return _dictionary.Remove(item.Id);
71+
}
72+
73+
public WorkflowStep Find(Predicate<WorkflowStep> match)
74+
{
75+
return _dictionary.Values.FirstOrDefault(x => match(x));
76+
}
77+
78+
public int Count => _dictionary.Count;
79+
public bool IsReadOnly => false;
80+
}
81+
}

src/WorkflowCore/Services/DefaultProviders/SingleNodeQueueProvider.cs

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Concurrent;
2+
using System.Collections.Generic;
23
using System.Threading;
34
using System.Threading.Tasks;
45
using WorkflowCore.Interface;
@@ -12,48 +13,42 @@ namespace WorkflowCore.Services
1213
/// </summary>
1314
public class SingleNodeQueueProvider : IQueueProvider
1415
{
15-
16-
private readonly BlockingCollection<string> _runQueue = new BlockingCollection<string>();
17-
private readonly BlockingCollection<string> _eventQueue = new BlockingCollection<string>();
16+
17+
private readonly Dictionary<QueueType, BlockingCollection<string>> _queues = new Dictionary<QueueType, BlockingCollection<string>>()
18+
{
19+
[QueueType.Workflow] = new BlockingCollection<string>(),
20+
[QueueType.Event] = new BlockingCollection<string>(),
21+
[QueueType.Index] = new BlockingCollection<string>()
22+
};
1823

1924
public bool IsDequeueBlocking => true;
2025

2126
public async Task QueueWork(string id, QueueType queue)
2227
{
23-
SelectQueue(queue).Add(id);
28+
_queues[queue].Add(id);
2429
}
2530

2631
public async Task<string> DequeueWork(QueueType queue, CancellationToken cancellationToken)
2732
{
28-
if (SelectQueue(queue).TryTake(out string id, 100, cancellationToken))
33+
if (_queues[queue].TryTake(out string id, 100, cancellationToken))
2934
return id;
3035

3136
return null;
3237
}
3338

34-
public async Task Start()
39+
public Task Start()
3540
{
41+
return Task.CompletedTask;
3642
}
3743

38-
public async Task Stop()
44+
public Task Stop()
3945
{
46+
return Task.CompletedTask;
4047
}
4148

4249
public void Dispose()
4350
{
4451
}
45-
46-
private BlockingCollection<string> SelectQueue(QueueType queue)
47-
{
48-
switch (queue)
49-
{
50-
case QueueType.Workflow:
51-
return _runQueue;
52-
case QueueType.Event:
53-
return _eventQueue;
54-
}
55-
return null;
56-
}
5752

5853
}
5954

src/WorkflowCore/Services/DefinitionStorage/DefinitionLoader.cs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ private WorkflowDefinition Convert(DefinitionSourceV1 source)
5353
}
5454

5555

56-
private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
56+
private WorkflowStepCollection ConvertSteps(ICollection<StepSourceV1> source, Type dataType)
5757
{
58-
var result = new List<WorkflowStep>();
58+
var result = new WorkflowStepCollection();
5959
int i = 0;
6060
var stack = new Stack<StepSourceV1>(source.Reverse<StepSourceV1>());
6161
var parents = new List<StepSourceV1>();
@@ -87,7 +87,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
8787
targetStep.Name = nextStep.Name;
8888
targetStep.ErrorBehavior = nextStep.ErrorBehavior;
8989
targetStep.RetryInterval = nextStep.RetryInterval;
90-
targetStep.Tag = $"{nextStep.Id}";
90+
targetStep.ExternalId = $"{nextStep.Id}";
9191

9292
AttachInputs(nextStep, dataType, stepType, targetStep);
9393
AttachOutputs(nextStep, dataType, stepType, targetStep);
@@ -114,7 +114,7 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
114114
}
115115

116116
if (!string.IsNullOrEmpty(nextStep.NextStepId))
117-
targetStep.Outcomes.Add(new StepOutcome() { Tag = $"{nextStep.NextStepId}" });
117+
targetStep.Outcomes.Add(new StepOutcome() { ExternalNextStepId = $"{nextStep.NextStepId}" });
118118

119119
result.Add(targetStep);
120120

@@ -123,26 +123,26 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
123123

124124
foreach (var step in result)
125125
{
126-
if (result.Any(x => x.Tag == step.Tag && x.Id != step.Id))
127-
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.Tag}");
126+
if (result.Any(x => x.ExternalId == step.ExternalId && x.Id != step.Id))
127+
throw new WorkflowDefinitionLoadException($"Duplicate step Id {step.ExternalId}");
128128

129129
foreach (var outcome in step.Outcomes)
130130
{
131-
if (result.All(x => x.Tag != outcome.Tag))
132-
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.Tag}");
131+
if (result.All(x => x.ExternalId != outcome.ExternalNextStepId))
132+
throw new WorkflowDefinitionLoadException($"Cannot find step id {outcome.ExternalNextStepId}");
133133

134-
outcome.NextStep = result.Single(x => x.Tag == outcome.Tag).Id;
134+
outcome.NextStep = result.Single(x => x.ExternalId == outcome.ExternalNextStepId).Id;
135135
}
136136
}
137137

138138
foreach (var parent in parents)
139139
{
140-
var target = result.Single(x => x.Tag == parent.Id);
140+
var target = result.Single(x => x.ExternalId == parent.Id);
141141
foreach (var branch in parent.Do)
142142
{
143143
var childTags = branch.Select(x => x.Id).ToList();
144144
target.Children.AddRange(result
145-
.Where(x => childTags.Contains(x.Tag))
145+
.Where(x => childTags.Contains(x.ExternalId))
146146
.OrderBy(x => x.Id)
147147
.Select(x => x.Id)
148148
.Take(1)
@@ -152,11 +152,11 @@ private List<WorkflowStep> ConvertSteps(ICollection<StepSourceV1> source, Type d
152152

153153
foreach (var item in compensatables)
154154
{
155-
var target = result.Single(x => x.Tag == item.Id);
155+
var target = result.Single(x => x.ExternalId == item.Id);
156156
var tag = item.CompensateWith.Select(x => x.Id).FirstOrDefault();
157157
if (tag != null)
158158
{
159-
var compStep = result.FirstOrDefault(x => x.Tag == tag);
159+
var compStep = result.FirstOrDefault(x => x.ExternalId == tag);
160160
if (compStep != null)
161161
target.CompensationStepId = compStep.Id;
162162
}

src/WorkflowCore/Services/ErrorHandlers/CompensateHandler.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
3434
{
3535
var pointerId = scope.Pop();
3636
var scopePointer = workflow.ExecutionPointers.FindById(pointerId);
37-
var scopeStep = def.Steps.First(x => x.Id == scopePointer.StepId);
37+
var scopeStep = def.Steps.FindById(scopePointer.StepId);
3838

3939
var resume = true;
4040
var revert = false;
@@ -44,7 +44,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
4444
{
4545
var parentId = txnStack.Pop();
4646
var parentPointer = workflow.ExecutionPointers.FindById(parentId);
47-
var parentStep = def.Steps.First(x => x.Id == parentPointer.StepId);
47+
var parentStep = def.Steps.FindById(parentPointer.StepId);
4848
if ((!parentStep.ResumeChildrenAfterCompensation) || (parentStep.RevertChildrenAfterCompensation))
4949
{
5050
resume = parentStep.ResumeChildrenAfterCompensation;
@@ -86,7 +86,7 @@ public void Handle(WorkflowInstance workflow, WorkflowDefinition def, ExecutionP
8686

8787
foreach (var siblingPointer in prevSiblings)
8888
{
89-
var siblingStep = def.Steps.First(x => x.Id == siblingPointer.StepId);
89+
var siblingStep = def.Steps.FindById(siblingPointer.StepId);
9090
if (siblingStep.CompensationStepId.HasValue)
9191
{
9292
var compensationPointer = _pointerFactory.BuildCompensationPointer(def, siblingPointer, exceptionPointer, siblingStep.CompensationStepId.Value);

src/WorkflowCore/Services/ExecutionPointerFactory.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ public ExecutionPointer BuildGenesisPointer(WorkflowDefinition def)
1717
StepId = 0,
1818
Active = true,
1919
Status = PointerStatus.Pending,
20-
StepName = Enumerable.First<WorkflowStep>(def.Steps, x => x.Id == 0).Name
20+
StepName = def.Steps.FindById(0).Name
2121
};
2222
}
2323

@@ -32,7 +32,7 @@ public ExecutionPointer BuildNextPointer(WorkflowDefinition def, ExecutionPointe
3232
Active = true,
3333
ContextItem = pointer.ContextItem,
3434
Status = PointerStatus.Pending,
35-
StepName = def.Steps.First(x => x.Id == outcomeTarget.NextStep).Name,
35+
StepName = def.Steps.FindById(outcomeTarget.NextStep).Name,
3636
Scope = new List<string>(pointer.Scope)
3737
};
3838
}
@@ -52,7 +52,7 @@ public ExecutionPointer BuildChildPointer(WorkflowDefinition def, ExecutionPoint
5252
Active = true,
5353
ContextItem = branch,
5454
Status = PointerStatus.Pending,
55-
StepName = def.Steps.First(x => x.Id == childDefinitionId).Name,
55+
StepName = def.Steps.FindById(childDefinitionId).Name,
5656
Scope = new List<string>(childScope)
5757
};
5858
}
@@ -68,7 +68,7 @@ public ExecutionPointer BuildCompensationPointer(WorkflowDefinition def, Executi
6868
Active = true,
6969
ContextItem = pointer.ContextItem,
7070
Status = PointerStatus.Pending,
71-
StepName = def.Steps.First(x => x.Id == compensationStepId).Name,
71+
StepName = def.Steps.FindById(compensationStepId).Name,
7272
Scope = new List<string>(pointer.Scope)
7373
};
7474
}

src/WorkflowCore/Services/ExecutionResultProcessor.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
109109
while (queue.Count > 0)
110110
{
111111
var exceptionPointer = queue.Dequeue();
112-
var exceptionStep = def.Steps.Find(x => x.Id == exceptionPointer.StepId);
112+
var exceptionStep = def.Steps.FindById(exceptionPointer.StepId);
113113
var compensatingStepId = FindScopeCompensationStepId(workflow, def, exceptionPointer);
114114
var errorOption = (exceptionStep.ErrorBehavior ?? (compensatingStepId.HasValue ? WorkflowErrorHandling.Compensate : def.DefaultErrorBehavior));
115115

@@ -129,7 +129,7 @@ public void HandleStepException(WorkflowInstance workflow, WorkflowDefinition de
129129
{
130130
var pointerId = scope.Pop();
131131
var pointer = workflow.ExecutionPointers.FindById(pointerId);
132-
var step = def.Steps.First(x => x.Id == pointer.StepId);
132+
var step = def.Steps.FindById(pointer.StepId);
133133
if (step.CompensationStepId.HasValue)
134134
return step.CompensationStepId.Value;
135135
}

0 commit comments

Comments
 (0)