Skip to content

Conversation

Arkatufus
Copy link
Contributor

@Arkatufus Arkatufus commented Mar 24, 2025

@Arkatufus Arkatufus marked this pull request as draft March 25, 2025 21:05
Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review

}

[Fact]
public async Task BroadcastHub_must_handle_cancelled_Sink()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual reproduction

}

public long Id { get; }
private sealed record UnRegister(long Id, int PreviousOffset, int FinalOffset) : IHubEvent;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleanup, convert classes to records if they're not a singleton

private void OnEvent(IHubEvent hubEvent)
{
if (hubEvent == RegistrationPending.Instance)
switch (hubEvent)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor cascading if...else if statements to a switch statement

Comment on lines +674 to +675
consumer.Callback(new Initialize(startFrom))
.ContinueWith(t =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New code, the consumer.Callback() uses the new API that returns a Func<T, Task> instead of the old Action<T> that allows us to respond to the result of the callback. In this case, if the stream is dead, we unregister the consumer.

{
private List<Func<object, Task>>? CallbacksWaitingForInterpreter = null;
private AtomicReference<List<TaskCompletionSource<Done>>?> AsyncCallbacksInProgress = new(new List<TaskCompletionSource<Done>>());
public static readonly TaskCompletionSource<Done> NoPromise;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marker for callbacks that doesn't need to observe any task completion (fire and forget)

=> () => Interpreter.OnAsyncInput(this, NotUsed.Instance, _ => handler());
=> () => Interpreter.OnAsyncInput(this, NotUsed.Instance, NoPromise, _ => handler());

protected Func<T, Task<Done>> GetAsyncCallbackAsync<T>(Action<T> handler)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New async callback API, instead of returning an Action<T>, this new API returns a Func<T, Task<Done>> where external callers can observe the result of any async callback invocations.

Comment on lines 1704 to 1708
var promise = new TaskCompletionSource<Done>();
if (AddToWaiting(promise))
{
Interpreter.OnAsyncInput(this, @event, promise, x => handler((T)x));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We create a new TaskCompletionSource "promise" for each async callback invocation. AddToWaiting() checks that the promise can be atomically added to AsyncCallbacksInProgress. If the add succeed, we enqueue its invocation in the interpreter, if not, we signal a StreamDetachedException on the promise.

Comment on lines 1831 to 1832
var inProgress = AsyncCallbacksInProgress.GetAndSet(null);
if (inProgress is not null && inProgress.Count > 0)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On stream stop, we set AsyncCallbacksInProgress to null to signal that we're not accepting any async callback anymore, then for each active callbacks, we signal a StreamDetachedException failure.

Comment on lines 1848 to 1858
internal void OnFeedbackDispatched(TaskCompletionSource<Done> p)
{
switch (AsyncCallbacksInProgress.Value)
{
case null:
// already finished, nothing to do here
break;
case var x:
x.Remove(p);
break;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When interpreter finished processing the callback, it signals the logic (this instance) through this callback. The logic then removes the TaskCompletionSource from the pending list.

@Arkatufus Arkatufus changed the title Reproduction for #7578, Cancelled sinks are blocking other BroadcastHub consumers Fix #7578, Cancelled sinks are blocking other BroadcastHub consumers Mar 27, 2025
@Arkatufus Arkatufus marked this pull request as ready for review March 27, 2025 21:26
Copy link
Contributor Author

@Arkatufus Arkatufus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self-review

/// </summary>
public abstract class GraphStageLogic : IStageLogging
{
private readonly Queue<ActorGraphInterpreter.AsyncInput> _callbacksWaitingForInterpreter = new();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue to hold early callbacks that were invoked before the interpreter is initialized

public abstract class GraphStageLogic : IStageLogging
{
private readonly Queue<ActorGraphInterpreter.AsyncInput> _callbacksWaitingForInterpreter = new();
private readonly AtomicReference<List<TaskCompletionSource<Done>>?> _asyncCallbacksInProgress = new(new List<TaskCompletionSource<Done>>());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This holds the current async callback invocations that are currently in progress. Note that it is a nullable atomic reference to a list of TaskCompletionSources. We use null to signal that the stream is shutting down or in the process of doing so, all callback invocations will automatically trigger a StreamDetachedException if it sees that this list is null. We need this because we need to guarantee cross-thread access safety (async callbacks are invoked from outside the stream)

Comment on lines 1692 to 1699
=> @event =>
{
if(_interpreter == null)
_callbacksWaitingForInterpreter.Enqueue(
new ActorGraphInterpreter.AsyncInput(null, null, @event, NoPromise, x => handler((T)x)));
else
Interpreter.OnAsyncInput(this, @event, NoPromise, x => handler((T)x));
};
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example of how the invocation is stored using the existing AsyncInput data structure to be defered for later. The actual execution will be performed when Interpreter property is assigned.

Comment on lines 836 to 840
while (_callbacksWaitingForInterpreter.Count > 0)
{
var input = _callbacksWaitingForInterpreter.Dequeue();
_interpreter.OnAsyncInput(this, input.Event, input.Promise, input.Handler);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All defered callback invocation are executed here.

Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed this together on the call this morning, LGTM

@Aaronontheweb Aaronontheweb enabled auto-merge (squash) March 28, 2025 15:37
@Aaronontheweb Aaronontheweb added this to the 1.5.41 milestone Mar 28, 2025
@Aaronontheweb Aaronontheweb disabled auto-merge March 31, 2025 14:05
@Aaronontheweb Aaronontheweb self-requested a review March 31, 2025 14:05
Copy link
Member

@Aaronontheweb Aaronontheweb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still test failures

@Aaronontheweb
Copy link
Member

Resolved via #7615

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Cancelled sinks are blocking other BroadcastHub consumers
2 participants