Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Runtime.CompilerServices;
using System.Text.Json.Serialization;
using System.Text.Json.Serialization.Converters;
using System.Text.Json.Serialization.Metadata;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -375,7 +373,7 @@ public static partial class JsonSerializer
}

JsonTypeInfo jsonTypeInfo = options.GetOrAddJsonTypeInfoForRootType(typeof(TValue));
return CreateAsyncEnumerableDeserializer<TValue>(utf8Json, jsonTypeInfo, cancellationToken);
return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo<TValue>(jsonTypeInfo), cancellationToken);
}

/// <summary>
Expand Down Expand Up @@ -406,36 +404,38 @@ public static partial class JsonSerializer
ThrowHelper.ThrowArgumentNullException(nameof(jsonTypeInfo));
}

return CreateAsyncEnumerableDeserializer<TValue>(utf8Json, jsonTypeInfo, cancellationToken);
return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo<TValue>(jsonTypeInfo), cancellationToken);
}

private static JsonTypeInfo<Queue<TValue>> CreateQueueTypeInfo<TValue>(JsonTypeInfo jsonTypeInfo)
{
return JsonMetadataServices.CreateQueueInfo<Queue<TValue>, TValue>(
options: jsonTypeInfo.Options,
collectionInfo: new()
{
ObjectCreator = static () => new Queue<TValue>(),
ElementInfo = jsonTypeInfo,
NumberHandling = jsonTypeInfo.Options.NumberHandling
});
}

private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<TValue>(
Stream utf8Json,
JsonTypeInfo jsonTypeInfo,
JsonTypeInfo<Queue<TValue>> queueTypeInfo,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
JsonSerializerOptions options = jsonTypeInfo.Options;
JsonTypeInfo<Queue<TValue>> queueTypeInfo =
JsonMetadataServices.CreateQueueInfo<Queue<TValue>, TValue>(
options: options,
collectionInfo: new()
{
ObjectCreator = () => new Queue<TValue>(),
ElementInfo = jsonTypeInfo,
NumberHandling = options.NumberHandling
});

queueTypeInfo.EnsureConfigured();
JsonSerializerOptions options = queueTypeInfo.Options;
var bufferState = new ReadBufferState(options.DefaultBufferSize);
ReadStack readStack = default;
queueTypeInfo.EnsureConfigured();
readStack.Initialize(queueTypeInfo, supportContinuation: true);
var jsonReaderState = new JsonReaderState(options.GetReaderOptions());

try
{
do
{
bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false);
bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken, fillBuffer: false).ConfigureAwait(false);
ContinueDeserialize<Queue<TValue>>(
ref bufferState,
ref jsonReaderState,
Expand Down Expand Up @@ -476,7 +476,7 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
{
while (true)
{
bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false);
bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
TValue value = ContinueDeserialize<TValue>(ref bufferState, ref jsonReaderState, ref readStack, converter, options);

if (bufferState.IsFinalBlock)
Expand Down Expand Up @@ -507,7 +507,7 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
{
while (true)
{
bufferState = ReadFromStream(utf8Json, bufferState);
bufferState.ReadFromStream(utf8Json);
TValue value = ContinueDeserialize<TValue>(ref bufferState, ref jsonReaderState, ref readStack, converter, options);

if (bufferState.IsFinalBlock)
Expand All @@ -522,146 +522,24 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
}
}

/// <summary>
/// Read from the stream until either our buffer is filled or we hit EOF.
/// Calling ReadCore is relatively expensive, so we minimize the number of times
/// we need to call it.
/// </summary>
internal static async ValueTask<ReadBufferState> ReadFromStreamAsync(
Stream utf8Json,
ReadBufferState bufferState,
CancellationToken cancellationToken)
{
while (true)
{
int bytesRead = await utf8Json.ReadAsync(
#if BUILDING_INBOX_LIBRARY
bufferState.Buffer.AsMemory(bufferState.BytesInBuffer),
#else
bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer,
#endif
cancellationToken).ConfigureAwait(false);

if (bytesRead == 0)
{
bufferState.IsFinalBlock = true;
break;
}

bufferState.BytesInBuffer += bytesRead;

if (bufferState.BytesInBuffer == bufferState.Buffer.Length)
{
break;
}
}

return bufferState;
}

/// <summary>
/// Read from the stream until either our buffer is filled or we hit EOF.
/// Calling ReadCore is relatively expensive, so we minimize the number of times
/// we need to call it.
/// </summary>
internal static ReadBufferState ReadFromStream(
Stream utf8Json,
ReadBufferState bufferState)
{
while (true)
{
int bytesRead = utf8Json.Read(
#if BUILDING_INBOX_LIBRARY
bufferState.Buffer.AsSpan(bufferState.BytesInBuffer));
#else
bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer);
#endif

if (bytesRead == 0)
{
bufferState.IsFinalBlock = true;
break;
}

bufferState.BytesInBuffer += bytesRead;

if (bufferState.BytesInBuffer == bufferState.Buffer.Length)
{
break;
}
}

return bufferState;
}

internal static TValue ContinueDeserialize<TValue>(
ref ReadBufferState bufferState,
ref JsonReaderState jsonReaderState,
ref ReadStack readStack,
JsonConverter converter,
JsonSerializerOptions options)
{
if (bufferState.BytesInBuffer > bufferState.ClearMax)
{
bufferState.ClearMax = bufferState.BytesInBuffer;
}

int start = 0;
if (bufferState.IsFirstIteration)
{
bufferState.IsFirstIteration = false;

// Handle the UTF-8 BOM if present
Debug.Assert(bufferState.Buffer.Length >= JsonConstants.Utf8Bom.Length);
if (bufferState.Buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom))
{
start += JsonConstants.Utf8Bom.Length;
bufferState.BytesInBuffer -= JsonConstants.Utf8Bom.Length;
}
}

// Process the data available
TValue value = ReadCore<TValue>(
ref jsonReaderState,
bufferState.IsFinalBlock,
new ReadOnlySpan<byte>(bufferState.Buffer, start, bufferState.BytesInBuffer),
bufferState.Bytes,
options,
ref readStack,
converter);

Debug.Assert(readStack.BytesConsumed <= bufferState.BytesInBuffer);
int bytesConsumed = checked((int)readStack.BytesConsumed);

bufferState.BytesInBuffer -= bytesConsumed;

// The reader should have thrown if we have remaining bytes.
Debug.Assert(!bufferState.IsFinalBlock || bufferState.BytesInBuffer == 0);

if (!bufferState.IsFinalBlock)
{
// Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization.
if ((uint)bufferState.BytesInBuffer > ((uint)bufferState.Buffer.Length / 2))
{
// We have less than half the buffer available, double the buffer size.
byte[] oldBuffer = bufferState.Buffer;
int oldClearMax = bufferState.ClearMax;
byte[] newBuffer = ArrayPool<byte>.Shared.Rent((bufferState.Buffer.Length < (int.MaxValue / 2)) ? bufferState.Buffer.Length * 2 : int.MaxValue);

// Copy the unprocessed data to the new buffer while shifting the processed bytes.
Buffer.BlockCopy(oldBuffer, bytesConsumed + start, newBuffer, 0, bufferState.BytesInBuffer);
bufferState.Buffer = newBuffer;
bufferState.ClearMax = bufferState.BytesInBuffer;

// Clear and return the old buffer
new Span<byte>(oldBuffer, 0, oldClearMax).Clear();
ArrayPool<byte>.Shared.Return(oldBuffer);
}
else if (bufferState.BytesInBuffer != 0)
{
// Shift the processed bytes to the beginning of buffer to make more room.
Buffer.BlockCopy(bufferState.Buffer, bytesConsumed + start, bufferState.Buffer, 0, bufferState.BytesInBuffer);
}
}
Debug.Assert(readStack.BytesConsumed <= bufferState.Bytes.Length);
bufferState.AdvanceBuffer((int)readStack.BytesConsumed);

return value;
}
Expand Down
Loading