Skip to content

Add a fallback to filling buffers in DeserializeAsyncEnumerable. #104635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,27 @@ internal struct ReadBufferState : IDisposable
private bool _isFirstBlock;
private bool _isFinalBlock;

// An "unsuccessful read" in this context refers to a buffer read operation that
// wasn't sufficient to advance the reader to the next token. This occurs primarily
// when consuming large JSON strings (which don't support streaming today) but is
// also possible with other token types such as numbers, booleans, or nulls.
//
// The JsonSerializer.DeserializeAsyncEnumerable methods employ a special buffering
// strategy where rather than attempting to fill the entire buffer, the deserializer
// will be invoked as soon as the first chunk of data is read from the stream.
// This is to ensure liveness: data should be surfaced on the IAE as soon as they
// are streamed from the server. On the other hand, this can create performance
// problems in cases where the underlying stream uses extremely fine-grained buffering.
// For this reason, we employ a threshold that will revert to buffer filling once crossed.
// The counter is reset to zero whenever the JSON reader has been advanced successfully.
//
// The threshold is set to 5 unsuccessful reads. This is a relatively conservative threshold
// but should still make fallback unlikely in most scenaria. It should ensure that fallback
// isn't triggered in null or boolean tokens even in the worst-case scenario where they are
// streamed one byte at a time.
private const int UnsuccessfulReadCountThreshold = 5;
private int _unsuccessfulReadCount;

public ReadBufferState(int initialBufferSize)
{
_buffer = ArrayPool<byte>.Shared.Rent(Math.Max(initialBufferSize, JsonConstants.Utf8Bom.Length));
Expand All @@ -46,6 +67,7 @@ public readonly async ValueTask<ReadBufferState> ReadFromStreamAsync(
// make all updates on a copy which is returned once complete.
ReadBufferState bufferState = this;

int minBufferCount = fillBuffer || _unsuccessfulReadCount > UnsuccessfulReadCountThreshold ? bufferState._buffer.Length : 0;
do
{
int bytesRead = await utf8Json.ReadAsync(
Expand All @@ -64,7 +86,7 @@ public readonly async ValueTask<ReadBufferState> ReadFromStreamAsync(

bufferState._count += bytesRead;
}
while (fillBuffer && bufferState._count < bufferState._buffer.Length);
while (bufferState._count < minBufferCount);

bufferState.ProcessReadBytes();
return bufferState;
Expand Down Expand Up @@ -106,6 +128,7 @@ public void AdvanceBuffer(int bytesConsumed)
{
Debug.Assert(bytesConsumed <= _count);

_unsuccessfulReadCount = bytesConsumed == 0 ? _unsuccessfulReadCount + 1 : 0;
_count -= bytesConsumed;

if (!_isFinalBlock)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// The .NET Foundation licenses this file to you under the MIT license.

using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text.Json.Serialization.Metadata;
Expand Down Expand Up @@ -333,6 +335,47 @@ await Assert.ThrowsAsync<TaskCanceledException>(async () =>
});
}

[Theory]
[InlineData(5, 1024)]
[InlineData(5, 1024 * 1024)]
public static async Task DeserializeAsyncEnumerable_SlowStreamWithLargeStrings(int totalStrings, int stringLength)
{
var options = new JsonSerializerOptions
{
Converters = { new StringLengthConverter() }
};

using var stream = new SlowStream(GenerateJsonCharacters());
string expectedElement = stringLength.ToString(CultureInfo.InvariantCulture);
IAsyncEnumerable<string?> asyncEnumerable = JsonSerializer.DeserializeAsyncEnumerable<string>(stream, options);

await foreach (string? value in asyncEnumerable)
{
Assert.Equal(expectedElement, value);
}

IEnumerable<byte> GenerateJsonCharacters()
{
// ["xxx...x","xxx...x",...,"xxx...x"]
yield return (byte)'[';
for (int i = 0; i < totalStrings; i++)
{
yield return (byte)'"';
for (int j = 0; j < stringLength; j++)
{
yield return (byte)'x';
}
yield return (byte)'"';

if (i < totalStrings - 1)
{
yield return (byte)',';
}
}
yield return (byte)']';
}
}

public static IEnumerable<object[]> GetAsyncEnumerableSources()
{
yield return WrapArgs(Enumerable.Empty<int>(), 1, DeserializeAsyncEnumerableOverload.JsonSerializerOptions);
Expand Down Expand Up @@ -381,5 +424,48 @@ private static async Task<List<T>> ToListAsync<T>(this IAsyncEnumerable<T> sourc
}
return list;
}

private sealed class SlowStream(IEnumerable<byte> byteSource) : Stream, IDisposable
{
private readonly IEnumerator<byte> _enumerator = byteSource.GetEnumerator();
private long _position;

public override bool CanRead => true;
public override int Read(byte[] buffer, int offset, int count)
{
Debug.Assert(buffer != null);
Debug.Assert(offset >= 0 && count <= buffer.Length - offset);

if (count == 0 || !_enumerator.MoveNext())
{
return 0;
}

_position++;
buffer[offset] = _enumerator.Current;
return 1;
}

public override bool CanSeek => false;
public override bool CanWrite => false;
public override long Position { get => _position; set => throw new NotSupportedException(); }
public override long Length => throw new NotSupportedException();
public override void Flush() => throw new NotSupportedException();
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
public override void SetLength(long value) => throw new NotSupportedException();
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
void IDisposable.Dispose() => _enumerator.Dispose();
}

private sealed class StringLengthConverter : JsonConverter<string>
{
public override string Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options)
{
Debug.Assert(!reader.ValueIsEscaped && !reader.HasValueSequence);
return reader.ValueSpan.Length.ToString(CultureInfo.InvariantCulture);
}

public override void Write(Utf8JsonWriter writer, string value, JsonSerializerOptions options) => throw new NotImplementedException();
}
}
}
Loading