Skip to content

[Experiment] Reduce the number of syscalls #38747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ private enum State
}

private int _state; // Actually AsyncOperation.State.
protected int _observedSequenceNumber;

#if DEBUG
private int _callbackQueued; // When non-zero, the callback has been queued.
Expand Down Expand Up @@ -311,12 +312,17 @@ public void DoAbort()
ErrorCode = SocketError.OperationAborted;
}

// returns true when read()|write() returned lower number of bytes than the size of buffer
internal virtual bool HasExhaustedIoSpace(int currentSequenceNumber) => false;

protected abstract void Abort();

protected abstract bool DoTryComplete(SocketAsyncContext context);

public abstract void InvokeCallback(bool allowPooling);

public void SetSequenceNumber(int sequenceNumber) => _observedSequenceNumber = sequenceNumber;

[Conditional("SOCKETASYNCCONTEXT_TRACE")]
public void Trace(string message, [CallerMemberName] string? memberName = null)
{
Expand Down Expand Up @@ -369,6 +375,9 @@ private sealed class BufferMemorySendOperation : SendOperation

public BufferMemorySendOperation(SocketAsyncContext context) : base(context) { }

internal override bool HasExhaustedIoSpace(int currentSequenceNumber) => _observedSequenceNumber == currentSequenceNumber
&& ErrorCode == SocketError.Success && BytesTransferred > 0 && BytesTransferred < Buffer.Length;

protected override bool DoTryComplete(SocketAsyncContext context)
{
int bufferIndex = 0;
Expand Down Expand Up @@ -427,6 +436,9 @@ private sealed unsafe class BufferPtrSendOperation : SendOperation

public BufferPtrSendOperation(SocketAsyncContext context) : base(context) { }

internal override bool HasExhaustedIoSpace(int currentSequenceNumber) => _observedSequenceNumber == currentSequenceNumber
&& ErrorCode == SocketError.Success && BytesTransferred > 0 && Count > 0; // TryCompleteSendTo modifies Count

protected override bool DoTryComplete(SocketAsyncContext context)
{
int bufferIndex = 0;
Expand Down Expand Up @@ -457,6 +469,9 @@ private sealed class BufferMemoryReceiveOperation : ReceiveOperation

public BufferMemoryReceiveOperation(SocketAsyncContext context) : base(context) { }

internal override bool HasExhaustedIoSpace(int currentSequenceNumber) => _observedSequenceNumber == currentSequenceNumber
&& ErrorCode == SocketError.Success && BytesTransferred >= 0 && BytesTransferred < Buffer.Length;

protected override bool DoTryComplete(SocketAsyncContext context)
{
// Zero byte read is performed to know when data is available.
Expand Down Expand Up @@ -536,6 +551,9 @@ private sealed unsafe class BufferPtrReceiveOperation : ReceiveOperation

public BufferPtrReceiveOperation(SocketAsyncContext context) : base(context) { }

internal override bool HasExhaustedIoSpace(int currentSequenceNumber) => _observedSequenceNumber == currentSequenceNumber
&& ErrorCode == SocketError.Success && BytesTransferred >= 0 && BytesTransferred < Length;

protected override bool DoTryComplete(SocketAsyncContext context) =>
SocketPal.TryCompleteReceiveFrom(context._socket, new Span<byte>(BufferPtr, Length), null, Flags, SocketAddress, ref SocketAddressLen, out BytesTransferred, out ReceivedFlags, out ErrorCode);
}
Expand Down Expand Up @@ -734,27 +752,11 @@ public void Init()
// observedSequenceNumber must be passed to StartAsyncOperation.
public bool IsReady(SocketAsyncContext context, out int observedSequenceNumber)
{
// It is safe to read _state and _sequence without using Lock.
// - The return value is soley based on Volatile.Read of _state.
// - The Volatile.Read of _sequenceNumber ensures we read a value before executing the operation.
// This is needed to retry the operation in StartAsyncOperation in case the _sequenceNumber incremented.
// - Because no Lock is taken, it is possible we observe a sequence number increment before the state
// becomes Ready. When that happens, observedSequenceNumber is decremented, and StartAsyncOperation will
// execute the operation because the sequence number won't match.

Debug.Assert(sizeof(QueueState) == sizeof(int));
QueueState state = (QueueState)Volatile.Read(ref Unsafe.As<QueueState, int>(ref _state));
observedSequenceNumber = Volatile.Read(ref _sequenceNumber);

bool isReady = state == QueueState.Ready || state == QueueState.Stopped;
if (!isReady)
using (Lock())
{
observedSequenceNumber--;
observedSequenceNumber = _sequenceNumber;
return _state == QueueState.Ready || _state == QueueState.Stopped;
}

Trace(context, $"{isReady}");

return isReady;
}

// Return true for pending, false for completed synchronously (including failure and abort)
Expand Down Expand Up @@ -860,8 +862,15 @@ public bool StartAsyncOperation(SocketAsyncContext context, TOperation operation
return null;

case QueueState.Waiting:
Debug.Assert(_tail != null, "State == Waiting but queue is empty!");
if (_tail == null)
{
_state = QueueState.Ready;
_sequenceNumber++;
return null;
}

op = _tail.Next;
op.SetSequenceNumber(_sequenceNumber);
Debug.Assert(_isNextOperationSynchronous == (op.Event != null));
if (skipAsyncEvents && !_isNextOperationSynchronous)
{
Expand Down Expand Up @@ -1035,15 +1044,37 @@ public OperationResult ProcessQueuedOperation(TOperation op)
// No more operations to process
_tail = null;
_isNextOperationSynchronous = false;
_state = QueueState.Ready;
_sequenceNumber++;

if (op.HasExhaustedIoSpace(currentSequenceNumber: _sequenceNumber))
{
// the buffer that given operation was using was bigger than
// the number of bytes returned from read()|write()
// so instead of setting the state to Ready and having the next read()|write()
// return EWOULDBLOCK, we set the state to Waiting and just wait
// for new epoll notification which is going to set it to Ready
// see the Q&A section of https://www.man7.org/linux/man-pages/man7/epoll.7.html for more
_state = QueueState.Waiting;
}
else
{
_state = QueueState.Ready;
}
Trace(context, $"Exit (finished queue)");
}
else
{
// Pop current operation and advance to next
nextOp = _tail.Next = op.Next;
_isNextOperationSynchronous = nextOp.Event != null;
// Pop current operation
_tail.Next = op.Next;
_isNextOperationSynchronous = op.Next.Event != null;

if (op.HasExhaustedIoSpace(currentSequenceNumber: _sequenceNumber))
{
_state = QueueState.Waiting;
}
else
{
nextOp = _tail.Next; // will be dispatched outside of lock
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

namespace System.Net.Sockets
{
internal static partial class SocketPal
internal static class SocketPal
{
public const bool SupportsMultipleConnectAttempts = false;
public static readonly int MaximumAddressSize = Interop.Sys.GetMaximumAddressSize();
Expand Down