Skip to content

[API Proposal]: Add Unix-specific APIs to register for IO ready notifications on the thread pool #61273

Closed
@kouvel

Description

@kouvel

Background and motivation

Currently we don't have a common way of handling IO readiness and completions across operating systems. On Windows the IO completion thread pool is not very efficient. The mechanism used on Linux in System.Net.Sockets is more efficient, but comes with other issues. As part of adding IO readiness/completion handling to the portable thread pool (transitioning away from the native thread pool), we would also like to consolidate the mechanisms used for IO readiness/completion handling across operating systems. Hopefully this would ease maintenance, make it easier for other components to also use the functionality, and enable further experimentation for improving things across the various async IO mechanisms, such as potentially with better thread management and better ordering of work.

The APIs being proposed here are specific to epoll/kqueue, analogous to the existing APIs for overlapped IO on Windows. Ideally, we would move to io_uring on Linux, but it's not available in all distros yet and we would continue to need a kqueue-based solution. When we decide to add support for io_uring, it would likely require a separate set of APIs.

More discussion here: #47631

API Proposal

Update: It's not necessary to expose these APIs in contracts, so the current proposal is to make them public for use in .NET libraries but not expose them in contracts for public usage.

namespace System.Threading
{
    public static class ThreadPool
    {
        [UnsupportedOSPlatform("windows")]
        [UnsupportedOSPlatform("browser")]
        public static void RegisterForIOReadyNotifications(
            SafeHandle handle,
            IOReadyEvents events,
            IIOReadyEventHandler eventHandler) { }

        [UnsupportedOSPlatform("windows")]
        [UnsupportedOSPlatform("browser")]
        public static void UnregisterForIOReadyNotifications(SafeHandle handle) { }
    }


    [Flags]
    public enum IOReadyEvents
    {
        None = 0x0,
        Read = 0x1,
        Write = 0x2,
        CloseOrError = 0x4
    }

    public interface IIOReadyEventHandler
    {
        IOReadyEvents HandleEventsInlineIfNecessary(IOReadyEvents events);
        void HandleEvents(IOReadyEvents events);
    }
}

RegisterForIOReadyNotifications

  • Registers an IO handle to receive notifications of IO-ready events. The event handler is notified of IO-ready events for the IO handle for inline processing, or asynchronous processing on thread pool worker threads.
  • This API is specific to Unix-like platforms and uses epoll or kqueue. The underlying implementation may not support some types of IO handles.
  • Arguments
    • handle - The IO handle to register for IO-ready event notifications. The safe handle wraps a file descriptor that typically represents a non-blocking pipe or socket. An IO handle is keyed by the handle value and may be registered only once.
    • events - A mask of events for which to register for notifications
    • eventHandler - An object that would handle IO-ready events for the IO handle
  • Exceptions
    • ArgumentNullException - One of the arguments is null
    • ArgumentException - handle is closed or invalid
    • ArgumentOutOfRangeException - The value of events is invalid
    • InvalidOperationException - handle was already registered
    • IOException - An error occurred when attempting to register the IO handle for notifications. The HResult property would contain an error code for further investigation

UnregisterForIOReadyNotifications

  • Unregisters an IO handle to no longer receive notifications of new IO-ready events. Any IO-ready events that were already received prior to unregisteration are still delivered to the previously registered event handler.
  • Arguments
    • handle - The IO handle to unregister for new IO-ready event notifications
  • Exceptions
    • ArgumentNullException - One of the arguments is null
    • InvalidOperationException - handle was not registered

IOReadyEvents

  • Read / Write - The stream is ready for a read or write respectively
  • CloseOrError
    • There appear to be some inconsistencies in how epoll reports the RDHUP, HUP, and ERR events on Linux, see Add test to list OS event details tokio-rs/mio#1091 (comment). To simplify things, for epoll when any of those are set, we could just include this bit and have the user determine what happened based on the error code from the following IO operation.
    • For kqueue, similarly this bit would be set when EV_EOF or EV_ERROR are reported
    • System.Net.Sockets could continue to do what it's doing now and treat CloseOrError as Read | Write

IIOReadyEventHandler

  • IOReadyEvents HandleEventsInlineIfNecessary(IOReadyEvents events);
    • Currently on Unix, when a synchronous operation is performed on a non-blocking socket, after the operation completes the blocking thread is released immediately upon receiving the event instead of queuing event processing to the thread pool, since otherwise the blocking may have occurred on a thread pool thread and lead to thread starvation
    • There is currently a config option to force processing events on the polling threads inline (along with configuring the number of polling threads), which we are currently planning to keep intact, at least for now
    • This method is called immediately upon receiving an event for inline processing
    • events specifies which IO-ready events were triggered
    • The returned events, if not None, will be queued for processing and will be sent later through HandleEvents. So, a user may choose to process all or some of the events inline depending on whether there is a pending synchronous operation waiting to unblock from the event.
  • void HandleEvents(IOReadyEvents events);
    • This method is called for handling events that were returned by the method above

API Usage

These APIs are basically a minimum form of interface between the current SocketAsyncContext and SocketAsyncEngine. SocketAsyncContext would be the initial consumer: registration, unregistration, handling sync events inline, and handling events normally.

The APIs are not necessarily easy to use, they're meant to provide a minimal support for epoll/kqueue-based polling with handling events in thread pool threads. Here's some oversimplified pseudocode on how the APIs may be used, to read some data from a socket, process it, and write a response:

internal sealed class ClientSocketIOHandler : IIOReadyEventHandler
{
    SafeHandle _socketHandle;
    bool _isReading;
    byte[] _readBuffer;
    int _nextReadIndex;
    byte[] _writeBuffer;
    int _byteCountToWrite;
    int _nextWriteStartIndex;

    public ClientSocketIOHandler(SafeHandle socketHandle)
    {
        _socketHandle = socketHandle;
        _isReading = true;
        // Change the socket to non-blocking

        ThreadPool.RegisterForIOReadyNotifications(
            _socketHandle,
            IOReadyEvents.Read | IOReadyEvents.Write | IOReadyEvents.CloseOrError,
            this);
    }

    public IOReadyEvents HandleEventsInlineIfNecessary(IOReadyEvents events) => events;

    public unsafe void HandleEvents(IOReadyEvents events)
    {
        if ((events & IOReadyEvents.CloseOrError) != 0)
        {
            events |= IOReadyEvents.Read | IOReadyEvents.Write;
        }

        lock (this)
        {
            if ((events & IOReadyEvents.Read) != 0 && _isReading)
            {
                if (_readBuffer == null)
                {
                    _readBuffer = new byte[256];
                }

                var span = new Span<byte>(_readBuffer, _nextReadIndex, _readBuffer.Length - _nextReadIndex);
                int bytesReceived, errno;
                fixed (byte* b = &MemoryMarshal.GetReference(span))
                {
                    errno = Interop.Recv(_socketHandle, b, span.Length, 0, out bytesReceived);
                }

                if (bytesReceived <= 0) // error
                {
                    _isReading = false;
                    ThreadPool.UnregisterForIOReadyNotifications(_socketHandle);
                    return;
                }

                _nextReadIndex += bytesReceived;

                if (/* enough data read */) // done reading
                {
                    _isReading = false;
                    // Process the data, fill _writeBuffer
                    _readBuffer = null;
                    events |= IOReadyEvents.Write;
                }
            }

            if ((events & IOReadyEvents.Write) != 0 && _writeBuffer != null)
            {
                var span = new Span<byte>(_writeBuffer, _nextWriteStartIndex, _byteCountToWrite - _nextWriteStartIndex);
                int bytesSent, errno;
                fixed (byte* b = &MemoryMarshal.GetReference(span))
                {
                    errno = Interop.Send(_socketHandle, b, span.Length, 0, out bytesSent);
                }

                if (bytesSent <= 0) // error
                {
                    _writeBuffer = null;
                    ThreadPool.UnregisterForIOReadyNotifications(_socketHandle);
                    return;
                }

                _nextWriteStartIndex += bytesSent;
                if (_nextWriteStartIndex >= _byteCountToWrite) // done writing
                {
                    _writeBuffer = null;
                    ThreadPool.UnregisterForIOReadyNotifications(_socketHandle);
                    // Close the socket
                }
            }
        }
    }
}

Alternative Designs

  • UnregisterForIOReadyNotifications - A separate method is proposed for unregistration rather than returning a disposable from the Register method. The expectation is that the event handler object would manage the lifetime of registration along with its own lifetime. A separate method for unregistration would avoid some unnecessary allocation for each IO handle that is registered.
  • IOReadyEvents.CloseOrError - Alternatively, we could include all of the various bits (currently named ReadClose, Close, and Error in the implementation), translate to those exactly as how the system provides, and let the user disambiguate
  • InternalsVisibleTo or reflection instead of making the APIs public - As far as I could tell, InternalsVisibleTo doesn't seem to be used anymore except for tests. Reflection is possible, though perhaps inconvenient.
  • When thinking about adding APIs for io_uring it may make sense to put those APIs under System.IO (though with implementation in CoreLib), since it may need an API for each op code. Not sure if there would be a better place for these epoll/kqueue-based APIs than directly on the thread pool.

Risks

  • Registering without unregistering would add to bookkeeping memory. Typically when all file descriptors representing a particular stream are closed, it would not receive events anymore, but there would be some bookkeeping cost from not unregistering. I feel it's a low risk, as it's a fairly low-level API.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions