Skip to content

[QUIC] Cosmetic changes to Read pipeline #55591

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 14, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ private sealed class State
{
public SafeMsQuicStreamHandle Handle = null!; // set in ctor.
public GCHandle StateGCHandle;

public MsQuicStream? Stream; // roots the stream in the pinned state to prevent GC during an async read I/O.
public MsQuicConnection.State ConnectionState = null!; // set in ctor.
public string TraceId = null!; // set in ctor.

Expand All @@ -48,7 +50,7 @@ private sealed class State
// set when ReadState.PendingRead:
public Memory<byte> ReceiveUserBuffer;
public CancellationTokenRegistration ReceiveCancellationRegistration;
public MsQuicStream? RootedReceiveStream; // roots the stream in the pinned state to prevent GC during an async read I/O.
// Resettable completions to be used for multiple calls to receive.
public readonly ResettableCompletionSource<int> ReceiveResettableCompletionSource = new ResettableCompletionSource<int>();

public SendState SendState;
Expand Down Expand Up @@ -363,27 +365,38 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
NetEventSource.Info(_state, $"{TraceId()} Stream reading into Memory of '{destination.Length}' bytes.");
}

ReadState readState;
long abortError = -1;
bool canceledSynchronously = false;
ReadState initialReadState; // value before transitions
long abortError;
bool preCanceled = false;

lock (_state)
{
readState = _state.ReadState;
initialReadState = _state.ReadState;
abortError = _state.ReadErrorCode;

if (readState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
// Failure scenario: pre-canceled token. Transition: any -> Aborted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love the comments ❤️

// PendingRead state indicates there is another concurrent read operation in flight
// which is forbidden, so it is handled separately
if (initialReadState != ReadState.PendingRead && cancellationToken.IsCancellationRequested)
{
readState = ReadState.Aborted;
initialReadState = ReadState.Aborted;
_state.ReadState = ReadState.Aborted;
canceledSynchronously = true;
preCanceled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't directly return new OperationCanceledException(cancellationToken) here and get rid of preCanceled local var. We wouldn't even need to change the initialReadState if I'm reading the code correctly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do all the work on exceptions in one place and outside of the lock

}

// Success scenario: EOS already reached, completing synchronously. No transition (final state)
if (initialReadState == ReadState.ReadsCompleted)
{
return new ValueTask<int>(0);
}
else if (readState == ReadState.None)

// Success scenario: no data available yet, will return a task to wait on. Transition None->PendingRead
if (initialReadState == ReadState.None)
{
Debug.Assert(_state.RootedReceiveStream is null);
Debug.Assert(_state.Stream is null);

_state.ReceiveUserBuffer = destination;
_state.RootedReceiveStream = this;
_state.Stream = this;
_state.ReadState = ReadState.PendingRead;

if (cancellationToken.CanBeCanceled)
Expand All @@ -396,7 +409,7 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
lock (state)
{
completePendingRead = state.ReadState == ReadState.PendingRead;
state.RootedReceiveStream = null;
state.Stream = null;
state.ReceiveUserBuffer = null;
state.ReadState = ReadState.Aborted;
}
Expand All @@ -414,7 +427,9 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio

return _state.ReceiveResettableCompletionSource.GetValueTask();
}
else if (readState == ReadState.IndividualReadComplete)

// Success scenario: data already available, completing synchronously. Transition IndividualReadComplete->None
if (initialReadState == ReadState.IndividualReadComplete)
{
_state.ReadState = ReadState.None;

Expand All @@ -431,25 +446,22 @@ internal override ValueTask<int> ReadAsync(Memory<byte> destination, Cancellatio
}
}

// All success scenarios returned at this point. Failure scenarios below:

Exception? ex = null;

switch (readState)
switch (initialReadState)
{
case ReadState.ReadsCompleted:
return new ValueTask<int>(0);
case ReadState.PendingRead:
ex = new InvalidOperationException("Only one read is supported at a time.");
break;
case ReadState.Aborted:
ex =
canceledSynchronously ? new OperationCanceledException(cancellationToken) : // aborted by token being canceled before the async op started.
abortError == -1 ? new QuicOperationAbortedException() : // aborted by user via some other operation.
new QuicStreamAbortedException(abortError); // aborted by peer.

ex = preCanceled ? new OperationCanceledException(cancellationToken) :
ThrowHelper.GetStreamAbortedException(abortError);
break;
case ReadState.ConnectionClosed:
default:
Debug.Assert(readState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{readState}' is unaccounted for in {nameof(ReadAsync)}.");
Debug.Assert(initialReadState == ReadState.ConnectionClosed, $"{nameof(ReadState)} of '{initialReadState}' is unaccounted for in {nameof(ReadAsync)}.");
ex = GetConnectionAbortedException(_state);
break;
}
Expand Down Expand Up @@ -490,7 +502,7 @@ internal override void AbortRead(long errorCode)
if (_state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
_state.RootedReceiveStream = null;
_state.Stream = null;
_state.ReceiveUserBuffer = null;
}
if (_state.ReadState < ReadState.ReadsCompleted)
Expand Down Expand Up @@ -833,7 +845,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
if (receiveEvent.BufferCount == 0)
{
// This is a 0-length receive that happens once reads are finished (via abort or otherwise).
// State changes for this are handled elsewhere.
// State changes for this are handled in PEER_SEND_SHUTDOWN / PEER_SEND_ABORT / SHUTDOWN_COMPLETE event handlers.
return MsQuicStatusCodes.Success;
}

Expand All @@ -847,6 +859,12 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)
case ReadState.None:
// ReadAsync() hasn't been called yet. Stash the buffer so the next ReadAsync call completes synchronously.

// We are overwriting state.ReceiveQuicBuffers here even if we only partially consumed them
// and it is intended, because unconsumed data will arrive again from the point we've stopped.
// New RECEIVE event wouldn't come until we call EnableReceive(), and we call it only after we've consumed
// as much as we could and said so to msquic in ReceiveComplete(taken), so new event will have all the
// remaining data.

if ((uint)state.ReceiveQuicBuffers.Length < receiveEvent.BufferCount)
{
QuicBuffer[] oldReceiveBuffers = state.ReceiveQuicBuffers;
Expand All @@ -872,7 +890,7 @@ private static unsafe uint HandleEventRecv(State state, ref StreamEvent evt)

state.ReceiveCancellationRegistration.Unregister();
shouldComplete = true;
state.RootedReceiveStream = null;
state.Stream = null;
state.ReadState = ReadState.None;

readLength = CopyMsQuicBuffersToUserBuffer(new ReadOnlySpan<QuicBuffer>(receiveEvent.Buffers, (int)receiveEvent.BufferCount), state.ReceiveUserBuffer.Span);
Expand Down Expand Up @@ -975,7 +993,7 @@ private static uint HandleEventShutdownComplete(State state, ref StreamEvent evt
if (state.ReadState == ReadState.PendingRead)
{
shouldReadComplete = true;
state.RootedReceiveStream = null;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
if (state.ReadState < ReadState.ReadsCompleted)
Expand Down Expand Up @@ -1029,7 +1047,7 @@ private static uint HandleEventPeerSendAborted(State state, ref StreamEvent evt)
if (state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
state.RootedReceiveStream = null;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
state.ReadState = ReadState.Aborted;
Expand Down Expand Up @@ -1057,7 +1075,7 @@ private static uint HandleEventPeerSendShutdown(State state)
if (state.ReadState == ReadState.PendingRead)
{
shouldComplete = true;
state.RootedReceiveStream = null;
state.Stream = null;
state.ReceiveUserBuffer = null;
}
if (state.ReadState < ReadState.ReadsCompleted)
Expand Down