Skip to content

Connection Abstractions #1793

Closed
Closed
@scalablecory

Description

@scalablecory

System.Net.Connections

Connections is an abstraction for composable connection establishment. It aims to improve layering separation and provide a standard extensibility model for making network connections.

Connections targets client/server implementations, and their users with advanced needs to plug in custom functionality. The latter is a heavily requested feature for HttpClient, and the Kestrel team has a handful of examples of users taking advantage of this pattern.

Connections brings .NET into parity with “modern” transport models such as Go’s dialers and Netty's channels. ASP.NET has a similar set of interfaces (“Bedrock Transports”) that this would supersede.

Basic API usage

At its most basic usage, System.Net.Connections is an abstraction over Socket.Accept and Socket.Connect. For this Socket usage today:

// server
using var listener = new Socket(SocketType.Stream, ProtocolType.Tcp);
listener.Bind(new IPEndPoint(IPAddress.IPv6Loopback, 0));
listener.Listen();
using Socket connection = await listener.AcceptAsync();
using Stream connectionStream = new NetworkStream(connection);

// client
using var socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
await socket.ConnectAsync(new DnsEndPoint("contoso.com", 80));
using var stream = new NetworkStream(socket);

The equivalent with System.Net.Connections is:

// server
using IConnectionFactory factory = new SocketsConnectionFactory(SocketType.Stream, ProtocolType.Tcp);
using IConnectionListener listener = await factory.BindAsync(new IPEndPoint(IPAddress.IPv6Loopback, 0));
using IConnection connection = await listener.AcceptAsync();
using Stream stream = connection.Stream;

// client
using IConnectionFactory factory = new SocketsConnectionFactory(SocketType.Stream, ProtocolType.Tcp);
using IConnection connection = await factory.ConnectAsync(new DnsEndPoint("contoso.com", 80));
using Stream stream = connection.Stream;

Composability

Composability has been modeled after Stream. For instance:

Stream stream;
stream = new NetworkStream(new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp));
stream = new BufferedStream(stream);
stream = new GZipStream(stream, CompressionLevel.Optimal);

While Streams compose the raw byte stream, Connections compose the establishment of that stream:

// Create a connection factory.
IConnectionFactory factory;
factory = new SocketsConnectionFactory(SocketType.Stream, ProtocolType.Tcp);
factory = new SslConnectionFactory(factory);

// Establish a connection.
IConnection connection = await factory.ConnectAsync(endPoint, options);
Stream stream = connection.Stream;

Beyond things like TLS and sockets, library implementation can separate some connection establishment logic into clean layers, as seen in HttpClient here:

// Setup connection factory base. Either use the user's custom connection factory, or sockets.
_tcpConnectionFactory =
    settings._connectionFactory != null
    ? (IConnectionFactory)new EatDisposeConnectionFactory(settings._connectionFactory)
    : new SocketsConnectionFactory(SocketType.Stream, ProtocolType.Tcp);

// Middleware that selects which endpoint to connect to, routing through proxies.
_tcpConnectionFactory = new TransportSelectionMiddleware(_tcpConnectionFactory);

// Middleware to setup TLS. If the user's custom connection factory already sets up TLS, it's a no-op. Otherwise, it does it for them.
_tcpConnectionFactory = new HttpsConnectionMiddleware(_tcpConnectionFactory);

Extensibility

Components can expose a connection factory to the user as an extension model. As an example, a user might implement a bandwidth monitoring extension for HttpClient:

IConnectionFactory factory;
factory = SocketsHttpHandler.CreateConnectionFactory();
factory = new BandwidthMonitoringMiddleware(factory);

SocketsHttpHandler handler = new SocketsHttpHandler();
handler.SetConnectionFactory(factory);

Requests for HttpClient extensibility

As an example of how users would make use of this, we’ve seen many requests for extensibility in HttpClient:

Go example

Here we use a SOCKS proxy with Go's HTTP client, via a dialer:

auth := proxy.Auth{
    User:     "YOUR_PROXY_LOGIN",
    Password: "YOUR_PROXY_PASSWORD",
}

dialer, err := proxy.SOCKS5("tcp", "contoso.com:12345", &auth, proxy.Direct)
tr := &http.Transport{Dial: dialer.Dial}
myClient := &http.Client{
    Transport: tr,
}

Netty example

And use Netty's channel pipelines to do the same:

ChannelPipeline p = ch.pipeline();
p.addFirst(new Socks5ProxyHandler(new InetSocketAddress("contoso.com", 12345), "YOUR_PROXY_LOGIN", "YOUR_PROXY_PASSWORD"));
p.addLast(new HttpClientCodec());

Connection Properties

Streams have implementation-specific functionality and properties. For instance, Socket.Shutdown() and various properties on SslStream. This means that, even when composing them, the user must still keep track of multiple layers:

using Socket socket = new Socket(SocketType.Stream, ProtocolType.Tcp);
using Stream networkStream = new NetworkStream(socket);
using SslStream sslStream = new SslStream(networkStream);
using StreamWriter writer = new StreamWriter(sslStream) { AutoFlush = true };

Console.WriteLine($"Connected via TLS: {sslStream.CipherAlgorithm} {sslStream.CipherStrength}");
writer.Write("Hello World.");
socket.Shutdown(SocketShutdown.Send);

This makes it challenging to build extensibility into a library's design:

  • One can no longer just ask a user for a Stream, but instead every piece of layer that they need to override.
  • Multiple callbacks may be needed to hide certain layers from the user, to allow them to implement only exactly what they need.

With connections, this is cleaned up by allowing each layer in a composed connection to expose, override, or hide features from the previous layer. Abstractions can be used to avoid exposing specific implementation.

Here, our TLS implementation exposes a property while passing through unknown types to previous layers.

bool IConnectionProperties.TryGet(Type propertyKey, [NotNullWhen(true)] out object? property)
{
    if ((propertyKey == typeof(ISslConnectionInformation))
    {
        property = (TProperty)(object)...;
        return true;
    }

    return _baseConnection.ConnectionProperties.TryGet(propertyKey, out property);
}

The type keys available in properties are not discoverable :( documentation must be read.

When using an established connection, we then extract it as well as a Socket property which was exposed by a previous sockets layer:

using IConnection connection = ...;
using StreamWriter writer = new StreamWriter(connection.Stream) { AutoFlush = true };

if (connection.ConnectionProperties.TryGet(out ISslConnectionProperties? sslProperties))
{
    Console.WriteLine($"Connected via TLS: {sslProperties.CipherAlgorithm} {sslProperties.CipherStrength}");
}

writer.Write("Hello World.");

if (connection.ConnectionProperties.TryGet(out Socket? socket))
{
    socket.Shutdown(SocketShutdown.Send);
}

Connection Establishment properties

Property bags are also used when establishing new connections, to allow each layer to decide how to establish the connection. For instance, HttpClient can implement a factory that tests if HTTPS is being used and inject TLS into a connection:

async ValueTask<IConnection> ConnectAsync(EndPoint endPoint, IConnectionProperties options, CancellationToken cancellationToken = default)
{
    if (!options.TryGet(out IHttpInternalConnectInfo httpProperties))
    {
        throw new HttpRequestException($"{nameof(HttpsConnectionMiddleware)} requires the {nameof(InternalConnectionInfoProperty)} connection property.");
    }

    HttpConnectionKind kind = httpProperties.Pool.Kind;

    if(kind == HttpConnectionKind.Https || kind == HttpConnectionKind.SslProxyTunnel)
    {
        return await _tlsConnectionFactory.ConnectAsync(endPoint, options, cancellationToken);
    }
    else
    {
        return await _plaintextConnectionFactory.ConnectAsync(endPoint, options, cancellationToken);
    }
}

Usage Examples

Establish a new connection and send data

async Task Send(IConnectionFactory factory)
{
    await using IConnection connection = await factory.ConnectAsync(new DnsEndPoint("contoso.com", 80));
    await using Stream s = connection.Stream;
    await using var sw = new StreamWriter(s);

    await sw.WriteAsync("GET / HTTP/1.1\r\n\r\n");
}

Listen for a new connection and receive data

async Task Receive(IConnectionListenerFactory factory)
{
    await using IConnectionListener listener = await factory.BindAsync(new IPEndPoint(IPAddress.Loopback, 0));
    await using IConnection connection = await listener.AcceptAsync();
    await using Stream s = connection.Stream;
    using var sr = new StreamReader(s);

    string requestLine = await sr.ReadLineAsync();
}

Proposed API

interface IConnectionProperties
{
    bool TryGet(Type propertyKey, [NotNullWhen(true)] out object? property);
}

// "easy mode" implementation for users; library implementors can write a custom one that merges allocation of IConnection, IConnectionProperties, and their properties.
public sealed partial class ConnectionPropertyCollection : IConnectionProperties
{
    public void Add<T>(T property);
    public bool TryGet(Type propertyKey, [NotNullWhen(true)] out object? property);
}

// this is separate from IConnection due to anticipation of QUIC. See QUIC straw man below.
interface IConnectionStream : IAsyncDisposable, IDisposable
{
    IConnectionProperties ConnectionProperties { get; }

    // If only one is implemented, the other should wrap. To prevent usage errors, whichever is retrieved first, the other one should throw.
    Stream Stream { get; }
    IDuplexPipe Pipe { get; }
}

interface IConnection : IConnectionStream
{
    EndPoint? LocalEndPoint { get; }
    EndPoint? RemoteEndPoint { get; }
}

// This could be split into two interfaces, one which has Connect and the other which has Bind. This would harm composability, but would avoid users needing to throw NotImplementedException if they only care about server-side.
interface IConnectionFactory : IAsyncDisposable, IDisposable
{
    ValueTask<IConnection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
    ValueTask<IConnectionListener> BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

interface IConnectionListener : IAsyncDisposable, IDisposable
{
    IConnectionProperties ListenerProperties { get; }
    EndPoint? LocalEndPoint { get; }
    ValueTask<IConnection> AcceptAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

static class ConnectionExtensions
{
    // Injects a simple stream filter without all the other ceremony.
    public static IConnectionFactory Filter(this IConnectionFactory factory, Func<IConnection, IConnectionProperties?, CancellationToken, ValueTask<Stream>> filter);
    public static IConnectionFactory Filter(this IConnectionFactory factory, Func<IConnection, IConnectionProperties?, CancellationToken, ValueTask<IConnection>> filter);

    // Generic wrapper for non-generic IConnectionProperties method.
    public static bool TryGet<T>(this IConnectionProperties properties, [MaybeNullWhen(false)] out T property);
}

Some thoughts:

  • A purer version of the API could move EndPoint parameters/properties into the IConnectionProperties.

Additional APIs

This integrates the above interfaces with HTTP, Sockets, TLS

namespace System.Net.Connections
{
    // needs documentation: exposes typeof(Socket) property in its connections.
    class SocketsConnectionFactory : IConnectionFactory
    {
        // dual-mode IPv6 socket. See Socket(SocketType socketType, ProtocolType protocolType)
        public SocketsConnectionFactory(SocketType socketType, ProtocolType protocolType);

        // See Socket(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType)
        public SocketsConnectionFactory(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType);

        public ValueTask<IConnectionListener> BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
        public ValueTask<IConnection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);

        public void Dispose();
        protected virtual void Dispose(bool disposing);
        public virtual ValueTask DisposeAsync();

        // These exist to provide an easy way for users to override default behavior.
        // A more idiomatic (but more API-heavy) way to do this would be to pass some sort of ISocketConfiguration that has all the pre-connect socket options one could want.
        protected virtual Socket CreateSocket(SocketType socketType, ProtocolType protocolType, EndPoint? endPoint, IConnectionProperties? options);
        protected virtual Stream CreateStream(Socket socket, IConnectionProperties? options);
        protected virtual IDuplexPipe CreatePipe(Socket socket, IConnectionProperties? options);
    }

    [System.CLSCompliantAttribute(false)] // due to TlsCipherSuite
    interface ISslConnectionProperties
    {
        CipherAlgorithmType CipherAlgorithm { get; }
        int CipherStrength { get; }
        HashAlgorithmType HashAlgorithm { get; }
        int HashStrength { get; }
        ExchangeAlgorithmType KeyExchangeAlgorithm { get; }
        int KeyExchangeStrength { get; }
        X509Certificate LocalCertificate { get; }
        SslApplicationProtocol NegotiatedApplicationProtocol { get; }
        TlsCipherSuite NegotiatedCipherSuite { get; }
        X509Certificate RemoteCertificate { get; }
        SslProtocols SslProtocol { get; }
        TransportContext TransportContext { get; }
    }

    // needs documentation: exposes typeof(ISslConnectionProperties) property in its connections.
    // needs documentation: requires SslClientAuthenticationOptions and SslServerAuthenticationOptions options in connect/bind.
    class SslConnectionFactory : IConnectionFactory
    {
        public SslConnectionFactory(IConnectionFactory baseFactory);

        public void Dispose();
        protected virtual void Dispose(bool disposing);
        public virtual ValueTask DisposeAsync();

        public ValueTask<IConnectionListener> BindAsync(EndPoint endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
        public ValueTask<IConnection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
    }
}

namespace System.Net.Http
{
    sealed class SocketsHttpHandler
    {
        public static IConnectionFactory CreateConnectionFactory();
        public static IConnectionFactory CreateConnectionFactory(Func<HttpRequestMessage, DnsEndPoint, IConnectionProperties, Socket> createSocket);

        // Return a stream that isn't Socket-based.
        public static IConnectionFactory CreateConnectionFactory(Func<HttpRequestMessage, DnsEndPoint, IConnectionProperties, CancellationToken, ValueTask<Stream>> establishConnection);
        public static IConnectionFactory CreateConnectionFactory(Func<HttpRequestMessage, DnsEndPoint, IConnectionProperties, CancellationToken, ValueTask<IConnection>> establishConnection);

        // For users of the above two APIs, they can call this if they just want to wrap the defaults.
        public static Socket DefaultCreateSocket(HttpRequestMessage message, DnsEndPoint endPoint, IConnectionProperties options);
        public static ValueTask<IConnection> DefaultEstablishConnection(HttpRequestMessage message, DnsEndPoint endPoint, IConnectionProperties options, CancellationToken cancellationToken);

        public void SetConnectionFactory(IConnectionFactory factory);

        // Sets a pre-encryption filter.
        public void SetConnectionFilter(Func<HttpRequestMessage, DnsEndPoint, IConnection, CancellationToken, ValueTask<Stream>> filter);
        public void SetConnectionFilter(Func<HttpRequestMessage, DnsEndPoint, IConnection, CancellationToken, ValueTask<IConnection>> filter);
    }
}

Thoughts and Questions

  • Heavy use of property bags significantly harms discoverability: users need to read documentation to understand what features to expect from each layer.
    • This is not a common design pattern in corefx, but our ASP.NET users will be very familiar with DI and should have an easy enough time grasping it.
    • This puts additional burden on maintainers to ensure thorough documentation of required/exposed properties.
  • Shuffling features between layers with property bags may harm maintainability if authors need to pay attention to which layer introduces which property and how they all mold together.
    • To prove out this API, I implemented it feature-complete for HttpClient and did not observe this complexity. I believe there are not a large number of use cases one would want to introduce layers for, and so layering is unlikely to become complex enough for this to be a problem in most apps.
  • This API has a very large surface area. Are there enough benefits versus a simpler one (e.g. a Func<(string host, int port), Stream>)?
    • HttpClient has a strong need for a connection establishment abstraction. All but one need could be solved by the simpler one.
    • ... however, HTTP/3 and QUIC is significantly more involved and will need some more complex dialer abstraction anyway. Having some symmetry between the TCP and QUIC dialers may be beneficial.
    • This being the standard/recommended method to abstract connections across BCL, ASP.NET Core, and 3rd party libraries provides value over some simple library-specific callbacks.
  • How much impact will this have? How many 3rd party libraries are in need of this and would use it if available?
    • Not many people need this, but when they do it is really important to them. See HttpClient extensibility asks.
    • There are not many 3rd party libraries opening network connections.
  • Should we apply the extensibility model to other things? Does it make sense for e.g. SmtpClient, SqlConnection, and so on to make use of this?
    • For now, we have decided to be conservative and wait to see how this gets used before introducing those extensibility points.

Future QUIC amendments

QUIC is not yet out of draft status, so QUIC-specific APIs were not a focus for this proposal. However, current knowledge of QUIC did help shape the APIs to make adapting it easier. Here are some thoughts based on current experience:

  • QUIC has features that an app is required to be aware of. It is not possible for an app to write a generic protocol that is seamlessly usable between both TCP and QUIC without at least some small shim that knows how to do the QUIC-specific stuff:
    • There is no "keep sending in background after socket/process closed" concept, so some form of flushing of send buffers is required.
    • Streams are aborted with status codes, and can abort read and write side of a stream separately.
    • Connections are always closed with an status code.
    • There are no predefined codes to indicate success/failure: they are all application-specific. It's not clear what an abortive QuicStream.Dispose should do, for instance.
    • Protocols make use of unidirectional/bidirectional stream differentiation, and how those map to stream IDs.
  • IConnectionStream and IConnection are split in the proposed API specifically to later support an IMultiplexedConnection that creates multiple IConnectionStream.
    • To avoid an IMultiplexedConnection, we might choose to merge the two APIs and have IConnection force users to explicitly open the one bidirectional IConnectionStream for the connection.

A QUIC extension to this might look like:

interface IMultiplexedConnection : IAsyncDisposable, IDisposable
{
    EndPoint? RemoteEndPoint { get; }
    EndPoint? LocalEndPoint { get; }

    ValueTask<IConnectionStream> OpenStream(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
    ValueTask<IConnectionStream> AcceptStream(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

interface IMultiplexedConnectionFactory : IAsyncDisposable, IDisposable
{
    ValueTask<IMultiplexedConnection> ConnectAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
    ValueTask<IMultiplexedConnectionListener> BindAsync(EndPoint? endPoint, IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

interface IMultiplexedConnectionListener : IAsyncDisposable, IDisposable
{
    IConnectionProperties ListenerProperties { get; }
    EndPoint? LocalEndPoint { get; }
    ValueTask<IMultiplexedConnection> AcceptAsync(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

Alternately, the IConnection and IMultiplexedConnection APIs might be merged, reducing surface area significantly. The TCP version would simply throw if opening/accepting more than once. This API might look like:

interface IConnection
{
    EndPoint? LocalEndPoint { get; }
    EndPoint? RemoteEndPoint { get; }

    ValueTask<IConnectionStream> OpenStream(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
    ValueTask<IConnectionStream> AcceptStream(IConnectionProperties? options = null, CancellationToken cancellationToken = default);
}

However, beyond API surface reduction there isn't clear practical reason to merge them. Given how QUIC is significantly different from TCP, it isn't clear that libraries would see correct reuse of filtering IConnectionFactory implementations.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions