Skip to content

Commit 7fc1474

Browse files
committed
Prevent information loss on host shutdown
Do not pass global CancellationToken to persistence operations on host shutdown. This way it is ensured that persistence operations are not cancelled.
1 parent 7de5032 commit 7fc1474

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

src/WorkflowCore/Services/BackgroundTasks/WorkflowConsumer.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
5555
finally
5656
{
5757
WorkflowActivity.Enrich(result);
58-
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions, cancellationToken);
58+
await _persistenceStore.PersistWorkflow(workflow, result.Subscriptions);
5959
await QueueProvider.QueueWork(itemId, QueueType.Index);
6060
_greylist.Remove($"wf:{itemId}");
6161
}
@@ -68,10 +68,10 @@ protected override async Task ProcessItem(string itemId, CancellationToken cance
6868
{
6969
foreach (var sub in result.Subscriptions)
7070
{
71-
await TryProcessSubscription(sub, _persistenceStore, cancellationToken);
71+
await TryProcessSubscription(sub, _persistenceStore);
7272
}
7373

74-
await _persistenceStore.PersistErrors(result.Errors, cancellationToken);
74+
await _persistenceStore.PersistErrors(result.Errors);
7575

7676
if ((workflow.Status == WorkflowStatus.Runnable) && workflow.NextExecution.HasValue)
7777
{
@@ -98,24 +98,24 @@ await _persistenceStore.ScheduleCommand(new ScheduledCommand()
9898

9999
}
100100

101-
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore, CancellationToken cancellationToken)
101+
private async Task TryProcessSubscription(EventSubscription subscription, IPersistenceProvider persistenceStore)
102102
{
103103
if (subscription.EventName != Event.EventTypeActivity)
104104
{
105-
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf, cancellationToken);
105+
var events = await persistenceStore.GetEvents(subscription.EventName, subscription.EventKey, subscription.SubscribeAsOf);
106106

107107
foreach (var evt in events)
108108
{
109109
var eventKey = $"evt:{evt}";
110110
bool acquiredLock = false;
111111
try
112112
{
113-
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
113+
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);
114114
int attempt = 0;
115115
while (!acquiredLock && attempt < 10)
116116
{
117-
await Task.Delay(Options.IdleTime, cancellationToken);
118-
acquiredLock = await _lockProvider.AcquireLock(eventKey, cancellationToken);
117+
await Task.Delay(Options.IdleTime);
118+
acquiredLock = await _lockProvider.AcquireLock(eventKey, CancellationToken.None);
119119

120120
attempt++;
121121
}
@@ -127,7 +127,7 @@ private async Task TryProcessSubscription(EventSubscription subscription, IPersi
127127
else
128128
{
129129
_greylist.Remove(eventKey);
130-
await persistenceStore.MarkEventUnprocessed(evt, cancellationToken);
130+
await persistenceStore.MarkEventUnprocessed(evt);
131131
await QueueProvider.QueueWork(evt, QueueType.Event);
132132
}
133133
}

0 commit comments

Comments
 (0)