Skip to content

Commit f0eb716

Browse files
authored
Issue 273 (danielgerlag#281)
1 parent 82c0f1c commit f0eb716

File tree

2 files changed

+106
-95
lines changed

2 files changed

+106
-95
lines changed

src/WorkflowCore/Services/CancellationProcessor.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ public void ProcessCancellations(WorkflowInstance workflow, WorkflowDefinition w
3939

4040
foreach (var ptr in toCancel)
4141
{
42-
ptr.EndTime = DateTime.Now.ToUniversalTime();
43-
ptr.Active = false;
44-
ptr.Status = PointerStatus.Cancelled;
45-
4642
if (step.ProceedOnCancel)
4743
{
4844
_executionResultProcessor.ProcessExecutionResult(workflow, workflowDef, ptr, step, ExecutionResult.Next(), executionResult);
4945
}
50-
46+
47+
ptr.EndTime = DateTime.Now.ToUniversalTime();
48+
ptr.Active = false;
49+
ptr.Status = PointerStatus.Cancelled;
50+
5151
foreach (var descendent in workflow.ExecutionPointers.FindByScope(ptr.Id).Where(x => x.Status != PointerStatus.Complete && x.Status != PointerStatus.Cancelled))
5252
{
5353
descendent.EndTime = DateTime.Now.ToUniversalTime();

src/WorkflowCore/Services/WorkflowExecutor.cs

Lines changed: 101 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,12 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
4848
_logger.LogError("Workflow {0} version {1} is not registered", workflow.WorkflowDefinitionId, workflow.Version);
4949
return wfResult;
5050
}
51+
52+
_cancellationProcessor.ProcessCancellations(workflow, def, wfResult);
5153

5254
foreach (var pointer in exePointers)
5355
{
54-
if (pointer.Status == PointerStatus.Cancelled)
56+
if (!pointer.Active)
5557
continue;
5658

5759
var step = def.Steps.FindById(pointer.StepId);
@@ -64,97 +66,17 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
6466
WorkflowId = workflow.Id,
6567
ExecutionPointerId = pointer.Id,
6668
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
67-
Message = String.Format("Unable to find step {0} in workflow definition", pointer.StepId)
69+
Message = $"Unable to find step {pointer.StepId} in workflow definition"
6870
});
6971
continue;
7072
}
7173

7274
try
73-
{
74-
switch (step.InitForExecution(wfResult, def, workflow, pointer))
75-
{
76-
case ExecutionPipelineDirective.Defer:
77-
continue;
78-
case ExecutionPipelineDirective.EndWorkflow:
79-
workflow.Status = WorkflowStatus.Complete;
80-
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
81-
continue;
82-
}
83-
84-
if (pointer.Status != PointerStatus.Running)
85-
{
86-
pointer.Status = PointerStatus.Running;
87-
_publisher.PublishNotification(new StepStarted()
88-
{
89-
EventTimeUtc = _datetimeProvider.Now,
90-
Reference = workflow.Reference,
91-
ExecutionPointerId = pointer.Id,
92-
StepId = step.Id,
93-
WorkflowInstanceId = workflow.Id,
94-
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
95-
Version = workflow.Version
96-
});
97-
}
98-
99-
if (!pointer.StartTime.HasValue)
100-
{
101-
pointer.StartTime = _datetimeProvider.Now.ToUniversalTime();
102-
}
75+
{
76+
if (!InitializeStep(workflow, step, wfResult, def, pointer))
77+
continue;
10378

104-
using (var scope = _scopeProvider.CreateScope())
105-
{
106-
_logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id);
107-
108-
IStepBody body = step.ConstructBody(scope.ServiceProvider);
109-
110-
if (body == null)
111-
{
112-
_logger.LogError("Unable to construct step body {0}", step.BodyType.ToString());
113-
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
114-
wfResult.Errors.Add(new ExecutionError()
115-
{
116-
WorkflowId = workflow.Id,
117-
ExecutionPointerId = pointer.Id,
118-
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
119-
Message = String.Format("Unable to construct step body {0}", step.BodyType.ToString())
120-
});
121-
continue;
122-
}
123-
124-
IStepExecutionContext context = new StepExecutionContext()
125-
{
126-
Workflow = workflow,
127-
Step = step,
128-
PersistenceData = pointer.PersistenceData,
129-
ExecutionPointer = pointer,
130-
Item = pointer.ContextItem
131-
};
132-
133-
foreach (var input in step.Inputs)
134-
input.AssignInput(workflow.Data, body, context);
135-
136-
137-
switch (step.BeforeExecute(wfResult, context, pointer, body))
138-
{
139-
case ExecutionPipelineDirective.Defer:
140-
continue;
141-
case ExecutionPipelineDirective.EndWorkflow:
142-
workflow.Status = WorkflowStatus.Complete;
143-
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
144-
continue;
145-
}
146-
147-
var result = await body.RunAsync(context);
148-
149-
if (result.Proceed)
150-
{
151-
foreach (var output in step.Outputs)
152-
output.AssignOutput(workflow.Data, body, context);
153-
}
154-
155-
_executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult);
156-
step.AfterExecute(wfResult, context, result, pointer);
157-
}
79+
await ExecuteStep(workflow, step, pointer, wfResult, def);
15880
}
15981
catch (Exception ex)
16082
{
@@ -171,16 +93,105 @@ public async Task<WorkflowExecutorResult> Execute(WorkflowInstance workflow)
17193
Host.ReportStepError(workflow, step, ex);
17294
}
17395
_cancellationProcessor.ProcessCancellations(workflow, def, wfResult);
174-
175-
176-
17796
}
17897
ProcessAfterExecutionIteration(workflow, def, wfResult);
17998
DetermineNextExecutionTime(workflow);
18099

181100
return wfResult;
182101
}
183-
102+
103+
private bool InitializeStep(WorkflowInstance workflow, WorkflowStep step, WorkflowExecutorResult wfResult, WorkflowDefinition def, ExecutionPointer pointer)
104+
{
105+
switch (step.InitForExecution(wfResult, def, workflow, pointer))
106+
{
107+
case ExecutionPipelineDirective.Defer:
108+
return false;
109+
case ExecutionPipelineDirective.EndWorkflow:
110+
workflow.Status = WorkflowStatus.Complete;
111+
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
112+
return false;
113+
}
114+
115+
if (pointer.Status != PointerStatus.Running)
116+
{
117+
pointer.Status = PointerStatus.Running;
118+
_publisher.PublishNotification(new StepStarted()
119+
{
120+
EventTimeUtc = _datetimeProvider.Now,
121+
Reference = workflow.Reference,
122+
ExecutionPointerId = pointer.Id,
123+
StepId = step.Id,
124+
WorkflowInstanceId = workflow.Id,
125+
WorkflowDefinitionId = workflow.WorkflowDefinitionId,
126+
Version = workflow.Version
127+
});
128+
}
129+
130+
if (!pointer.StartTime.HasValue)
131+
{
132+
pointer.StartTime = _datetimeProvider.Now.ToUniversalTime();
133+
}
134+
135+
return true;
136+
}
137+
138+
private async Task ExecuteStep(WorkflowInstance workflow, WorkflowStep step, ExecutionPointer pointer, WorkflowExecutorResult wfResult, WorkflowDefinition def)
139+
{
140+
using (var scope = _scopeProvider.CreateScope())
141+
{
142+
_logger.LogDebug("Starting step {0} on workflow {1}", step.Name, workflow.Id);
143+
144+
IStepBody body = step.ConstructBody(scope.ServiceProvider);
145+
146+
if (body == null)
147+
{
148+
_logger.LogError("Unable to construct step body {0}", step.BodyType.ToString());
149+
pointer.SleepUntil = _datetimeProvider.Now.ToUniversalTime().Add(_options.ErrorRetryInterval);
150+
wfResult.Errors.Add(new ExecutionError()
151+
{
152+
WorkflowId = workflow.Id,
153+
ExecutionPointerId = pointer.Id,
154+
ErrorTime = _datetimeProvider.Now.ToUniversalTime(),
155+
Message = $"Unable to construct step body {step.BodyType.ToString()}"
156+
});
157+
return;
158+
}
159+
160+
IStepExecutionContext context = new StepExecutionContext()
161+
{
162+
Workflow = workflow,
163+
Step = step,
164+
PersistenceData = pointer.PersistenceData,
165+
ExecutionPointer = pointer,
166+
Item = pointer.ContextItem
167+
};
168+
169+
foreach (var input in step.Inputs)
170+
input.AssignInput(workflow.Data, body, context);
171+
172+
switch (step.BeforeExecute(wfResult, context, pointer, body))
173+
{
174+
case ExecutionPipelineDirective.Defer:
175+
return;
176+
case ExecutionPipelineDirective.EndWorkflow:
177+
workflow.Status = WorkflowStatus.Complete;
178+
workflow.CompleteTime = _datetimeProvider.Now.ToUniversalTime();
179+
return;
180+
}
181+
182+
var result = await body.RunAsync(context);
183+
184+
if (result.Proceed)
185+
{
186+
foreach (var output in step.Outputs)
187+
output.AssignOutput(workflow.Data, body, context);
188+
}
189+
190+
_executionResultProcessor.ProcessExecutionResult(workflow, def, pointer, step, result, wfResult);
191+
step.AfterExecute(wfResult, context, result, pointer);
192+
}
193+
}
194+
184195
private void ProcessAfterExecutionIteration(WorkflowInstance workflow, WorkflowDefinition workflowDef, WorkflowExecutorResult workflowResult)
185196
{
186197
var pointers = workflow.ExecutionPointers.Where(x => x.EndTime == null);

0 commit comments

Comments
 (0)