Skip to content

Commit 82c0f1c

Browse files
committed
readability
1 parent cd77cd8 commit 82c0f1c

File tree

3 files changed

+173
-176
lines changed

3 files changed

+173
-176
lines changed

src/WorkflowCore/Services/BackgroundTasks/EventConsumer.cs

Lines changed: 45 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -26,74 +26,71 @@ public EventConsumer(IPersistenceProvider persistenceStore, IQueueProvider queue
2626

2727
protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
2828
{
29-
if (await _lockProvider.AcquireLock($"evt:{itemId}", cancellationToken))
29+
if (!await _lockProvider.AcquireLock($"evt:{itemId}", cancellationToken))
3030
{
31-
try
31+
Logger.LogInformation($"Event locked {itemId}");
32+
return;
33+
}
34+
35+
try
36+
{
37+
cancellationToken.ThrowIfCancellationRequested();
38+
var evt = await _persistenceStore.GetEvent(itemId);
39+
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
3240
{
33-
cancellationToken.ThrowIfCancellationRequested();
34-
var evt = await _persistenceStore.GetEvent(itemId);
35-
if (evt.EventTime <= _datetimeProvider.Now.ToUniversalTime())
36-
{
37-
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
38-
var success = true;
41+
var subs = await _persistenceStore.GetSubcriptions(evt.EventName, evt.EventKey, evt.EventTime);
42+
var success = true;
3943

40-
foreach (var sub in subs.ToList())
41-
{
42-
success = success && await SeedSubscription(evt, sub, cancellationToken);
43-
}
44+
foreach (var sub in subs.ToList())
45+
{
46+
success = success && await SeedSubscription(evt, sub, cancellationToken);
47+
}
4448

45-
if (success)
46-
{
47-
await _persistenceStore.MarkEventProcessed(itemId);
48-
}
49+
if (success)
50+
{
51+
await _persistenceStore.MarkEventProcessed(itemId);
4952
}
5053
}
51-
finally
52-
{
53-
await _lockProvider.ReleaseLock($"evt:{itemId}");
54-
}
5554
}
56-
else
55+
finally
5756
{
58-
Logger.LogInformation($"Event locked {itemId}");
57+
await _lockProvider.ReleaseLock($"evt:{itemId}");
5958
}
6059
}
6160

6261
private async Task<bool> SeedSubscription(Event evt, EventSubscription sub, CancellationToken cancellationToken)
6362
{
64-
if (await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
63+
if (!await _lockProvider.AcquireLock(sub.WorkflowId, cancellationToken))
6564
{
66-
try
67-
{
68-
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
69-
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished && p.EndTime == null);
70-
foreach (var p in pointers)
71-
{
72-
p.EventData = evt.EventData;
73-
p.EventPublished = true;
74-
p.Active = true;
75-
}
76-
workflow.NextExecution = 0;
77-
await _persistenceStore.PersistWorkflow(workflow);
78-
await _persistenceStore.TerminateSubscription(sub.Id);
79-
return true;
80-
}
81-
catch (Exception ex)
82-
{
83-
Logger.LogError(ex.Message);
84-
return false;
85-
}
86-
finally
65+
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
66+
return false;
67+
}
68+
69+
try
70+
{
71+
var workflow = await _persistenceStore.GetWorkflowInstance(sub.WorkflowId);
72+
var pointers = workflow.ExecutionPointers.Where(p => p.EventName == sub.EventName && p.EventKey == sub.EventKey && !p.EventPublished && p.EndTime == null);
73+
foreach (var p in pointers)
8774
{
88-
await _lockProvider.ReleaseLock(sub.WorkflowId);
89-
await QueueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
75+
p.EventData = evt.EventData;
76+
p.EventPublished = true;
77+
p.Active = true;
9078
}
79+
workflow.NextExecution = 0;
80+
await _persistenceStore.PersistWorkflow(workflow);
81+
await _persistenceStore.TerminateSubscription(sub.Id);
82+
return true;
9183
}
92-
else
84+
catch (Exception ex)
9385
{
94-
Logger.LogInformation("Workflow locked {0}", sub.WorkflowId);
86+
Logger.LogError(ex.Message);
9587
return false;
9688
}
89+
finally
90+
{
91+
await _lockProvider.ReleaseLock(sub.WorkflowId);
92+
await QueueProvider.QueueWork(sub.WorkflowId, QueueType.Workflow);
93+
}
9794
}
9895
}
9996
}

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,61 +28,60 @@ public WorkflowConsumer(IPooledObjectPolicy<IPersistenceProvider> persistencePoo
2828

2929
protected override async Task ProcessItem(string itemId, CancellationToken cancellationToken)
3030
{
31-
if (await _lockProvider.AcquireLock(itemId, cancellationToken))
31+
if (!await _lockProvider.AcquireLock(itemId, cancellationToken))
32+
{
33+
Logger.LogInformation("Workflow locked {0}", itemId);
34+
return;
35+
}
36+
37+
WorkflowInstance workflow = null;
38+
WorkflowExecutorResult result = null;
39+
var persistenceStore = _persistenceStorePool.Get();
40+
try
3241
{
33-
WorkflowInstance workflow = null;
34-
WorkflowExecutorResult result = null;
35-
var persistenceStore = _persistenceStorePool.Get();
3642
try
3743
{
38-
try
44+
cancellationToken.ThrowIfCancellationRequested();
45+
workflow = await persistenceStore.GetWorkflowInstance(itemId);
46+
if (workflow.Status == WorkflowStatus.Runnable)
3947
{
40-
cancellationToken.ThrowIfCancellationRequested();
41-
workflow = await persistenceStore.GetWorkflowInstance(itemId);
42-
if (workflow.Status == WorkflowStatus.Runnable)
48+
var executor = _executorPool.Get();
49+
try
50+
{
51+
result = await executor.Execute(workflow);
52+
}
53+
finally
4354
{
44-
var executor = _executorPool.Get();
45-
try
46-
{
47-
result = await executor.Execute(workflow);
48-
}
49-
finally
50-
{
51-
_executorPool.Return(executor);
52-
await persistenceStore.PersistWorkflow(workflow);
53-
await QueueProvider.QueueWork(itemId, QueueType.Index);
54-
}
55+
_executorPool.Return(executor);
56+
await persistenceStore.PersistWorkflow(workflow);
57+
await QueueProvider.QueueWork(itemId, QueueType.Index);
5558
}
5659
}
57-
finally
60+
}
61+
finally
62+
{
63+
await _lockProvider.ReleaseLock(itemId);
64+
if ((workflow != null) && (result != null))
5865
{
59-
await _lockProvider.ReleaseLock(itemId);
60-
if ((workflow != null) && (result != null))
66+
foreach (var sub in result.Subscriptions)
6167
{
62-
foreach (var sub in result.Subscriptions)
63-
{
64-
await SubscribeEvent(sub, persistenceStore);
65-
}
68+
await SubscribeEvent(sub, persistenceStore);
69+
}
6670

67-
await persistenceStore.PersistErrors(result.Errors);
71+
await persistenceStore.PersistErrors(result.Errors);
6872

69-
var readAheadTicks = _datetimeProvider.Now.Add(Options.PollInterval).ToUniversalTime().Ticks;
73+
var readAheadTicks = _datetimeProvider.Now.Add(Options.PollInterval).ToUniversalTime().Ticks;
7074

71-
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
72-
{
73-
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
74-
}
75+
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue && workflow.NextExecution.Value < readAheadTicks)
76+
{
77+
new Task(() => FutureQueue(workflow, cancellationToken)).Start();
7578
}
7679
}
7780
}
78-
finally
79-
{
80-
_persistenceStorePool.Return(persistenceStore);
81-
}
8281
}
83-
else
82+
finally
8483
{
85-
Logger.LogInformation("Workflow locked {0}", itemId);
84+
_persistenceStorePool.Return(persistenceStore);
8685
}
8786
}
8887

0 commit comments

Comments
 (0)