-
Notifications
You must be signed in to change notification settings - Fork 5.1k
[QUIC] Cosmetic changes to Read pipeline #55591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -32,6 +32,8 @@ private sealed class State | |
{ | ||
public SafeMsQuicStreamHandle Handle = null!; // set in ctor. | ||
public GCHandle StateGCHandle; | ||
|
||
public MsQuicStream? Stream; // roots the stream in the pinned state to prevent GC during an async read I/O. | ||
public MsQuicConnection.State ConnectionState = null!; // set in ctor. | ||
public string TraceId = null!; // set in ctor. | ||
|
||
|
@@ -48,7 +50,7 @@ private sealed class State | |
// set when ReadState.PendingRead: | ||
public Memory<byte> ReceiveUserBuffer; | ||
public CancellationTokenRegistration ReceiveCancellationRegistration; | ||
public MsQuicStream? RootedReceiveStream; // roots the stream in the pinned state to prevent GC during an async read I/O. | ||
// Resettable completions to be used for multiple calls to receive. | ||
public readonly ResettableCompletionSource<int> ReceiveResettableCompletionSource = new ResettableCompletionSource<int>(); | ||
|
||
public SendState SendState; | ||
|
@@ -363,27 +365,38 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio | |
NetEventSource.Info(_state, $"{TraceId()} Stream reading into Memory of '{destination.Length}' bytes."); | ||
} | ||
|
||
ReadState readState; | ||
long abortError = -1; | ||
bool canceledSynchronously = false; | ||
ReadState initialReadState; // value before transitions | ||
long abortError; | ||
bool preCanceled = false; | ||
|
||
lock (_state) | ||
{ | ||
readState = _state.ReadState; | ||
initialReadState = _state.ReadState; | ||
abortError = _state.ReadErrorCode; | ||
|
||
if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested) | ||
// Failure scenario: pre-canceled token. Transition: any -> Aborted | ||
// PendingRead state indicates there is another concurrent read operation in flight | ||
// which is forbidden, so it is handled separately | ||
if (initialReadState != ReadState.PendingRead && cancellationToken.IsCancellationRequested) | ||
{ | ||
readState = ReadState.Aborted; | ||
initialReadState = ReadState.Aborted; | ||
_state.ReadState = ReadState.Aborted; | ||
canceledSynchronously = true; | ||
preCanceled = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why don't directly return There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To do all the work on exceptions in one place and outside of the lock |
||
} | ||
|
||
// Success scenario: EOS already reached, completing synchronously. No transition (final state) | ||
if (initialReadState == ReadState.ReadsCompleted) | ||
{ | ||
return new ValueTask<int>(0); | ||
} | ||
else if (readState == ReadState.None) | ||
|
||
// Success scenario: no data available yet, will return a task to wait on. Transition None->PendingRead | ||
if (initialReadState == ReadState.None) | ||
{ | ||
Debug.Assert(_state.RootedReceiveStream is null); | ||
Debug.Assert(_state.Stream is null); | ||
|
||
_state.ReceiveUserBuffer = destination; | ||
_state.RootedReceiveStream = this; | ||
_state.Stream = this; | ||
_state.ReadState = ReadState.PendingRead; | ||
|
||
if (cancellationToken.CanBeCanceled) | ||
|
@@ -396,7 +409,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio | |
lock (state) | ||
{ | ||
completePendingRead = state.ReadState == ReadState.PendingRead; | ||
state.RootedReceiveStream = null; | ||
state.Stream = null; | ||
state.ReceiveUserBuffer = null; | ||
state.ReadState = ReadState.Aborted; | ||
} | ||
|
@@ -414,7 +427,9 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio | |
|
||
return _state.ReceiveResettableCompletionSource.GetValueTask(); | ||
} | ||
else if (readState == ReadState.IndividualReadComplete) | ||
|
||
// Success scenario: data already available, completing synchronously. Transition IndividualReadComplete->None | ||
if (initialReadState == ReadState.IndividualReadComplete) | ||
{ | ||
_state.ReadState = ReadState.None; | ||
|
||
|
@@ -431,25 +446,22 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio | |
} | ||
} | ||
|
||
// All success scenarios returned at this point. Failure scenarios below: | ||
|
||
Exception? ex = null; | ||
|
||
switch (readState) | ||
switch (initialReadState) | ||
{ | ||
case ReadState.ReadsCompleted: | ||
return new ValueTask<int>(0); | ||
case ReadState.PendingRead: | ||
ex = new InvalidOperationException("Only one read is supported at a time."); | ||
break; | ||
case ReadState.Aborted: | ||
ex = | ||
canceledSynchronously ? new OperationCanceledException(cancellationToken) : // aborted by token being canceled before the async op started. | ||
abortError == -1 ? new QuicOperationAbortedException() : // aborted by user via some other operation. | ||
new QuicStreamAbortedException(abortError); // aborted by peer. | ||
|
||
ex = preCanceled ? new OperationCanceledException(cancellationToken) : | ||
ThrowHelper.GetStreamAbortedException(abortError); | ||
break; | ||
case ReadState.ConnectionClosed: | ||
default: | ||
Debug.Assert(readState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}."); | ||
Debug.Assert(initialReadState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{initialReadState}' is unaccounted for in {nameof(ReadAsync)}."); | ||
ex = GetConnectionAbortedException(_state); | ||
break; | ||
} | ||
|
@@ -490,7 +502,7 @@ internal override void AbortRead(long errorCode) | |
if (_state.ReadState == ReadState.PendingRead) | ||
{ | ||
shouldComplete = true; | ||
_state.RootedReceiveStream = null; | ||
_state.Stream = null; | ||
_state.ReceiveUserBuffer = null; | ||
} | ||
if (_state.ReadState < ReadState.ReadsCompleted) | ||
|
@@ -833,7 +845,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) | |
if (receiveEvent.BufferCount == 0) | ||
{ | ||
// This is a 0-length receive that happens once reads are finished (via abort or otherwise). | ||
// State changes for this are handled elsewhere. | ||
// State changes for this are handled in PEER_SEND_SHUTDOWN / PEER_SEND_ABORT / SHUTDOWN_COMPLETE event handlers. | ||
return MsQuicStatusCodes.Success; | ||
} | ||
|
||
|
@@ -847,6 +859,12 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) | |
case ReadState.None: | ||
// ReadAsync() hasn't been called yet. Stash the buffer so the next ReadAsync call completes synchronously. | ||
|
||
// We are overwriting state.ReceiveQuicBuffers here even if we only partially consumed them | ||
// and it is intended, because unconsumed data will arrive again from the point we've stopped. | ||
// New RECEIVE event wouldn't come until we call EnableReceive(), and we call it only after we've consumed | ||
// as much as we could and said so to msquic in ReceiveComplete(taken), so new event will have all the | ||
// remaining data. | ||
|
||
if ((uint)state.ReceiveQuicBuffers.Length < receiveEvent.BufferCount) | ||
{ | ||
QuicBuffer[] oldReceiveBuffers = state.ReceiveQuicBuffers; | ||
|
@@ -872,7 +890,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt) | |
|
||
state.ReceiveCancellationRegistration.Unregister(); | ||
shouldComplete = true; | ||
state.RootedReceiveStream = null; | ||
state.Stream = null; | ||
state.ReadState = ReadState.None; | ||
|
||
readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<QuicBuffer>(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span); | ||
|
@@ -975,7 +993,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt | |
if (state.ReadState == ReadState.PendingRead) | ||
{ | ||
shouldReadComplete = true; | ||
state.RootedReceiveStream = null; | ||
state.Stream = null; | ||
state.ReceiveUserBuffer = null; | ||
} | ||
if (state.ReadState < ReadState.ReadsCompleted) | ||
|
@@ -1029,7 +1047,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt) | |
if (state.ReadState == ReadState.PendingRead) | ||
{ | ||
shouldComplete = true; | ||
state.RootedReceiveStream = null; | ||
state.Stream = null; | ||
state.ReceiveUserBuffer = null; | ||
} | ||
state.ReadState = ReadState.Aborted; | ||
|
@@ -1057,7 +1075,7 @@ private static uint HandleEventPeerSendShutdown(State state) | |
if (state.ReadState == ReadState.PendingRead) | ||
{ | ||
shouldComplete = true; | ||
state.RootedReceiveStream = null; | ||
state.Stream = null; | ||
state.ReceiveUserBuffer = null; | ||
} | ||
if (state.ReadState < ReadState.ReadsCompleted) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Love the comments ❤️