-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Fix #7578, Cancelled sinks are blocking other BroadcastHub consumers #7579
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #7578, Cancelled sinks are blocking other BroadcastHub consumers #7579
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review
} | ||
|
||
[Fact] | ||
public async Task BroadcastHub_must_handle_cancelled_Sink() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual reproduction
} | ||
|
||
public long Id { get; } | ||
private sealed record UnRegister(long Id, int PreviousOffset, int FinalOffset) : IHubEvent; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cleanup, convert classes to records if they're not a singleton
private void OnEvent(IHubEvent hubEvent) | ||
{ | ||
if (hubEvent == RegistrationPending.Instance) | ||
switch (hubEvent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactor cascading if...else if statements to a switch statement
consumer.Callback(new Initialize(startFrom)) | ||
.ContinueWith(t => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New code, the consumer.Callback()
uses the new API that returns a Func<T, Task>
instead of the old Action<T>
that allows us to respond to the result of the callback. In this case, if the stream is dead, we unregister the consumer.
{ | ||
private List<Func<object, Task>>? CallbacksWaitingForInterpreter = null; | ||
private AtomicReference<List<TaskCompletionSource<Done>>?> AsyncCallbacksInProgress = new(new List<TaskCompletionSource<Done>>()); | ||
public static readonly TaskCompletionSource<Done> NoPromise; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Marker for callbacks that doesn't need to observe any task completion (fire and forget)
=> () => Interpreter.OnAsyncInput(this, NotUsed.Instance, _ => handler()); | ||
=> () => Interpreter.OnAsyncInput(this, NotUsed.Instance, NoPromise, _ => handler()); | ||
|
||
protected Func<T, Task<Done>> GetAsyncCallbackAsync<T>(Action<T> handler) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New async callback API, instead of returning an Action<T>
, this new API returns a Func<T, Task<Done>>
where external callers can observe the result of any async callback invocations.
var promise = new TaskCompletionSource<Done>(); | ||
if (AddToWaiting(promise)) | ||
{ | ||
Interpreter.OnAsyncInput(this, @event, promise, x => handler((T)x)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We create a new TaskCompletionSource
"promise" for each async callback invocation. AddToWaiting()
checks that the promise can be atomically added to AsyncCallbacksInProgress
. If the add succeed, we enqueue its invocation in the interpreter, if not, we signal a StreamDetachedException
on the promise.
var inProgress = AsyncCallbacksInProgress.GetAndSet(null); | ||
if (inProgress is not null && inProgress.Count > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On stream stop, we set AsyncCallbacksInProgress
to null to signal that we're not accepting any async callback anymore, then for each active callbacks, we signal a StreamDetachedException
failure.
internal void OnFeedbackDispatched(TaskCompletionSource<Done> p) | ||
{ | ||
switch (AsyncCallbacksInProgress.Value) | ||
{ | ||
case null: | ||
// already finished, nothing to do here | ||
break; | ||
case var x: | ||
x.Remove(p); | ||
break; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When interpreter finished processing the callback, it signals the logic (this instance) through this callback. The logic then removes the TaskCompletionSource
from the pending list.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self-review
/// </summary> | ||
public abstract class GraphStageLogic : IStageLogging | ||
{ | ||
private readonly Queue<ActorGraphInterpreter.AsyncInput> _callbacksWaitingForInterpreter = new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Queue to hold early callbacks that were invoked before the interpreter is initialized
public abstract class GraphStageLogic : IStageLogging | ||
{ | ||
private readonly Queue<ActorGraphInterpreter.AsyncInput> _callbacksWaitingForInterpreter = new(); | ||
private readonly AtomicReference<List<TaskCompletionSource<Done>>?> _asyncCallbacksInProgress = new(new List<TaskCompletionSource<Done>>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This holds the current async callback invocations that are currently in progress. Note that it is a nullable atomic reference to a list of TaskCompletionSources. We use null to signal that the stream is shutting down or in the process of doing so, all callback invocations will automatically trigger a StreamDetachedException if it sees that this list is null. We need this because we need to guarantee cross-thread access safety (async callbacks are invoked from outside the stream)
=> @event => | ||
{ | ||
if(_interpreter == null) | ||
_callbacksWaitingForInterpreter.Enqueue( | ||
new ActorGraphInterpreter.AsyncInput(null, null, @event, NoPromise, x => handler((T)x))); | ||
else | ||
Interpreter.OnAsyncInput(this, @event, NoPromise, x => handler((T)x)); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example of how the invocation is stored using the existing AsyncInput
data structure to be defered for later. The actual execution will be performed when Interpreter
property is assigned.
while (_callbacksWaitingForInterpreter.Count > 0) | ||
{ | ||
var input = _callbacksWaitingForInterpreter.Dequeue(); | ||
_interpreter.OnAsyncInput(this, input.Event, input.Promise, input.Handler); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All defered callback invocation are executed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed this together on the call this morning, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still test failures
Resolved via #7615 |
close #7578
Changes
QueueSource