Skip to content

[API Proposal]: Streaming APIs for the System.Net.Http.Json extensions #87577

Closed
@IEvangelist

Description

@IEvangelist

Background and motivation

As of .NET 8 preview 5 (and starting with .NET 5), there are various extension methods within the System.Net.Http.Json namespace (specifically the System.Net.Http.Json NuGet package 📦) that expose JSON-centric functionality to conveniently encapsulate common workflows. Such as, but not limited to extending the HttpClient:

  • GetFromJsonAsync
  • PostAsJsonAsync
  • PutAsJsonAsync
  • PatchAsJsonAsync
  • DeleteFromJsonAsync

All of these APIs are Task-like returning, i.e., either Task<TValue> or ValueTask<TValue> However, there are no extension methods for some of the streaming APIs, that return IAsyncEnumerable<TValue>. While this is obviously possible without the use of in-built extension methods, it would be really convenient for the runtime to include said extensions.

Ideally, we could encapsulate all of the logic required to efficiently delegate JSON-streaming calls to the JsonSerializer.DeserializeAsyncEnumerable functionality.

API Proposal

This would be scoped to GET on the HttpClient.

namespace System.Net.Http.Json;

public static partial class HttpClientJsonExtensions
{
    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri,
        JsonSerializerOptions? options,
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        Uri? requestUri,
        JsonSerializerOptions? options,
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri,
        JsonTypeInfo<TValue> jsonTypeInfo,
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        Uri? requestUri,
        JsonTypeInfo<TValue> jsonTypeInfo,
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri,
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client,
        Uri? requestUri,
        CancellationToken cancellationToken = default) { }
}

And the HttpContent:

public static partial class HttpContentJsonExtensions
{
    public static IAsyncEnumerable<T?> ReadFromJsonAsAsyncEnumerable<T>(
        this HttpContent content, 
        JsonSerializerOptions? options, 
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<T?> ReadFromJsonAsAsyncEnumerable<T>(
        this HttpContent content, 
        CancellationToken cancellationToken = default) { }

    public static IAsyncEnumerable<T?> ReadFromJsonAsAsyncEnumerable<T>(
        this HttpContent content,
        JsonTypeInfo<T> jsonTypeInfo,
        CancellationToken cancellationToken = default) { }
}
Potential Implementation This would essentially encapsulate the following logic:
public static class HttpRequestJsonExtensions
{
    public static async IAsyncEnumerable<TValue?> GetFromJsonAsAsyncEnumerable<TValue>(
        this HttpClient client, 
        [StringSyntax(StringSyntaxAttribute.Uri)] string? requestUri, 
        JsonSerializerOptions? options, 
        [EnumeratorCancellation] CancellationToken cancellationToken = default)
    {
        ArgumentNullException.ThrowIfNull(client);

        TimeSpan timeout = client.Timeout;

        // Create the CTS before the initial SendAsync so that the SendAsync counts against the timeout.
        CancellationTokenSource? linkedCTS = null;
        if (timeout != Timeout.InfiniteTimeSpan)
        {
            linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
            linkedCTS.CancelAfter(timeout);
        }

        Task<HttpResponseMessage> responseTask;
        try
        {
            // Intentionally using cancellationToken instead of the linked one 
            // here as HttpClient will enforce the Timeout on its own for this part
            responseTask = client.GetAsync(
                CreateUri(requestUri), HttpCompletionOption.ResponseHeadersRead, cancellationToken);
        }
        catch
        {
            linkedCTS?.Dispose();
            throw;
        }

        using HttpResponseMessage response = await responseTask.ConfigureAwait(false);
        response.EnsureSuccessStatusCode();

        using Stream contentStream =
            await response.Content
                .ReadAsStreamAsync(cancellationToken)
                .ConfigureAwait(false);

        await foreach (TValue? value in JsonSerializer.DeserializeAsyncEnumerable<TValue>(
            contentStream, options, cancellationToken).ConfigureAwait(false))
        {
            yield return value;
        }
    }

    private static Uri? CreateUri(string? uri) =>
       string.IsNullOrEmpty(uri) ? null : new Uri(uri, UriKind.RelativeOrAbsolute);
}

I'd love to be the one to implement this, if it's approved.

API Usage

// Imagine an ASP.NET Core Minimal API that returns an IAsyncEnumerable<T> where T is a `TimeSeries`.
public record class TimeSeries(string Name, DateTime dateTime, decimal Value);

// Imagine that this is the consuming code.
HttpClient client = new();
await foreach (var timeSeries in client.GetFromJsonAsAsyncEnumerable<TimeSeries>("api/streaming/endpoint"))
{
    // Use timeSeries values
}

Alternative Designs

No response

Risks

No response

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions