Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/Orleans.Journaling/StateMachineManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ internal sealed partial class StateMachineManager : IStateMachineManager, ILifec
private readonly Queue<WorkItem> _workQueue = new();
private readonly CancellationTokenSource _shutdownCancellation = new();
private readonly StateMachineManagerState _stateMachineIds;
private readonly Task _workLoop;
private Task? _workLoop;
private ManagerState _state;
private Task? _pendingWrite;
private ulong _nextStateMachineId = MinApplicationStateMachineId;
Expand All @@ -40,8 +40,6 @@ public StateMachineManager(
// This allows us to recover the list of state machines ids without having to store it separately.
_stateMachineIds = new StateMachineManagerState(this, StringCodec, UInt64Codec, serializerSessionPool);
_stateMachinesMap[0] = _stateMachineIds;

_workLoop = Start();
}

public void RegisterStateMachine(string name, IDurableStateMachine stateMachine)
Expand All @@ -65,6 +63,8 @@ public async ValueTask InitializeAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
_shutdownCancellation.Token.ThrowIfCancellationRequested();
Debug.Assert(_workLoop is null, "InitializeAsync should only be called once.");
_workLoop = Start();

Task task;
lock (_lock)
Expand Down Expand Up @@ -379,7 +379,10 @@ async Task ILifecycleObserver.OnStop(CancellationToken cancellationToken)
{
_shutdownCancellation.Cancel();
_workSignal.Signal();
await _workLoop.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing);
if (_workLoop is { } task)
{
await task.WaitAsync(cancellationToken).ConfigureAwait(ConfigureAwaitOptions.ContinueOnCapturedContext | ConfigureAwaitOptions.SuppressThrowing);
}
}

void IDisposable.Dispose()
Expand Down
2 changes: 1 addition & 1 deletion test/Orleans.Journaling.Tests/DurableQueueTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ public async Task DurableQueue_EmptyQueueOperations_Test()
var manager = sut.Manager;
var codec = CodecProvider.GetCodec<string>();
var queue = new DurableQueue<string>("emptyQueue", manager, codec, SessionPool);
await manager.WriteStateAsync(CancellationToken.None);
await sut.Lifecycle.OnStart();
await manager.WriteStateAsync(CancellationToken.None);

// Assert
Assert.Empty(queue);
Expand Down