Skip to content

Commit fc538c2

Browse files
Makes the following changes: (#70430)
* Change DeserializeAsyncEnumerable so that reading from the underlying stream does not wait until the underlying buffer is full. * Encapsulate the buffer management logic behind ReadBufferState. * Avoid allocating a new JsonTypeInfo instance on every IAsyncEnumerator initialization.
1 parent 5786435 commit fc538c2

File tree

2 files changed

+172
-158
lines changed

2 files changed

+172
-158
lines changed

src/libraries/System.Text.Json/src/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs

Lines changed: 23 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,12 @@
11
// Licensed to the .NET Foundation under one or more agreements.
22
// The .NET Foundation licenses this file to you under the MIT license.
33

4-
using System.Buffers;
54
using System.Collections.Generic;
65
using System.Diagnostics;
76
using System.Diagnostics.CodeAnalysis;
87
using System.IO;
98
using System.Runtime.CompilerServices;
109
using System.Text.Json.Serialization;
11-
using System.Text.Json.Serialization.Converters;
1210
using System.Text.Json.Serialization.Metadata;
1311
using System.Threading;
1412
using System.Threading.Tasks;
@@ -375,7 +373,7 @@ public static partial class JsonSerializer
375373
}
376374

377375
JsonTypeInfo jsonTypeInfo = options.GetOrAddJsonTypeInfoForRootType(typeof(TValue));
378-
return CreateAsyncEnumerableDeserializer<TValue>(utf8Json, jsonTypeInfo, cancellationToken);
376+
return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo<TValue>(jsonTypeInfo), cancellationToken);
379377
}
380378

381379
/// <summary>
@@ -406,36 +404,38 @@ public static partial class JsonSerializer
406404
ThrowHelper.ThrowArgumentNullException(nameof(jsonTypeInfo));
407405
}
408406

409-
return CreateAsyncEnumerableDeserializer<TValue>(utf8Json, jsonTypeInfo, cancellationToken);
407+
return CreateAsyncEnumerableDeserializer(utf8Json, CreateQueueTypeInfo<TValue>(jsonTypeInfo), cancellationToken);
408+
}
409+
410+
private static JsonTypeInfo<Queue<TValue>> CreateQueueTypeInfo<TValue>(JsonTypeInfo jsonTypeInfo)
411+
{
412+
return JsonMetadataServices.CreateQueueInfo<Queue<TValue>, TValue>(
413+
options: jsonTypeInfo.Options,
414+
collectionInfo: new()
415+
{
416+
ObjectCreator = static () => new Queue<TValue>(),
417+
ElementInfo = jsonTypeInfo,
418+
NumberHandling = jsonTypeInfo.Options.NumberHandling
419+
});
410420
}
411421

412422
private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<TValue>(
413423
Stream utf8Json,
414-
JsonTypeInfo jsonTypeInfo,
424+
JsonTypeInfo<Queue<TValue>> queueTypeInfo,
415425
[EnumeratorCancellation] CancellationToken cancellationToken)
416426
{
417-
JsonSerializerOptions options = jsonTypeInfo.Options;
418-
JsonTypeInfo<Queue<TValue>> queueTypeInfo =
419-
JsonMetadataServices.CreateQueueInfo<Queue<TValue>, TValue>(
420-
options: options,
421-
collectionInfo: new()
422-
{
423-
ObjectCreator = () => new Queue<TValue>(),
424-
ElementInfo = jsonTypeInfo,
425-
NumberHandling = options.NumberHandling
426-
});
427-
427+
queueTypeInfo.EnsureConfigured();
428+
JsonSerializerOptions options = queueTypeInfo.Options;
428429
var bufferState = new ReadBufferState(options.DefaultBufferSize);
429430
ReadStack readStack = default;
430-
queueTypeInfo.EnsureConfigured();
431431
readStack.Initialize(queueTypeInfo, supportContinuation: true);
432432
var jsonReaderState = new JsonReaderState(options.GetReaderOptions());
433433

434434
try
435435
{
436436
do
437437
{
438-
bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false);
438+
bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken, fillBuffer: false).ConfigureAwait(false);
439439
ContinueDeserialize<Queue<TValue>>(
440440
ref bufferState,
441441
ref jsonReaderState,
@@ -476,7 +476,7 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
476476
{
477477
while (true)
478478
{
479-
bufferState = await ReadFromStreamAsync(utf8Json, bufferState, cancellationToken).ConfigureAwait(false);
479+
bufferState = await bufferState.ReadFromStreamAsync(utf8Json, cancellationToken).ConfigureAwait(false);
480480
TValue value = ContinueDeserialize<TValue>(ref bufferState, ref jsonReaderState, ref readStack, converter, options);
481481

482482
if (bufferState.IsFinalBlock)
@@ -507,7 +507,7 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
507507
{
508508
while (true)
509509
{
510-
bufferState = ReadFromStream(utf8Json, bufferState);
510+
bufferState.ReadFromStream(utf8Json);
511511
TValue value = ContinueDeserialize<TValue>(ref bufferState, ref jsonReaderState, ref readStack, converter, options);
512512

513513
if (bufferState.IsFinalBlock)
@@ -522,146 +522,24 @@ private static async IAsyncEnumerable<TValue> CreateAsyncEnumerableDeserializer<
522522
}
523523
}
524524

525-
/// <summary>
526-
/// Read from the stream until either our buffer is filled or we hit EOF.
527-
/// Calling ReadCore is relatively expensive, so we minimize the number of times
528-
/// we need to call it.
529-
/// </summary>
530-
internal static async ValueTask<ReadBufferState> ReadFromStreamAsync(
531-
Stream utf8Json,
532-
ReadBufferState bufferState,
533-
CancellationToken cancellationToken)
534-
{
535-
while (true)
536-
{
537-
int bytesRead = await utf8Json.ReadAsync(
538-
#if BUILDING_INBOX_LIBRARY
539-
bufferState.Buffer.AsMemory(bufferState.BytesInBuffer),
540-
#else
541-
bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer,
542-
#endif
543-
cancellationToken).ConfigureAwait(false);
544-
545-
if (bytesRead == 0)
546-
{
547-
bufferState.IsFinalBlock = true;
548-
break;
549-
}
550-
551-
bufferState.BytesInBuffer += bytesRead;
552-
553-
if (bufferState.BytesInBuffer == bufferState.Buffer.Length)
554-
{
555-
break;
556-
}
557-
}
558-
559-
return bufferState;
560-
}
561-
562-
/// <summary>
563-
/// Read from the stream until either our buffer is filled or we hit EOF.
564-
/// Calling ReadCore is relatively expensive, so we minimize the number of times
565-
/// we need to call it.
566-
/// </summary>
567-
internal static ReadBufferState ReadFromStream(
568-
Stream utf8Json,
569-
ReadBufferState bufferState)
570-
{
571-
while (true)
572-
{
573-
int bytesRead = utf8Json.Read(
574-
#if BUILDING_INBOX_LIBRARY
575-
bufferState.Buffer.AsSpan(bufferState.BytesInBuffer));
576-
#else
577-
bufferState.Buffer, bufferState.BytesInBuffer, bufferState.Buffer.Length - bufferState.BytesInBuffer);
578-
#endif
579-
580-
if (bytesRead == 0)
581-
{
582-
bufferState.IsFinalBlock = true;
583-
break;
584-
}
585-
586-
bufferState.BytesInBuffer += bytesRead;
587-
588-
if (bufferState.BytesInBuffer == bufferState.Buffer.Length)
589-
{
590-
break;
591-
}
592-
}
593-
594-
return bufferState;
595-
}
596-
597525
internal static TValue ContinueDeserialize<TValue>(
598526
ref ReadBufferState bufferState,
599527
ref JsonReaderState jsonReaderState,
600528
ref ReadStack readStack,
601529
JsonConverter converter,
602530
JsonSerializerOptions options)
603531
{
604-
if (bufferState.BytesInBuffer > bufferState.ClearMax)
605-
{
606-
bufferState.ClearMax = bufferState.BytesInBuffer;
607-
}
608-
609-
int start = 0;
610-
if (bufferState.IsFirstIteration)
611-
{
612-
bufferState.IsFirstIteration = false;
613-
614-
// Handle the UTF-8 BOM if present
615-
Debug.Assert(bufferState.Buffer.Length >= JsonConstants.Utf8Bom.Length);
616-
if (bufferState.Buffer.AsSpan().StartsWith(JsonConstants.Utf8Bom))
617-
{
618-
start += JsonConstants.Utf8Bom.Length;
619-
bufferState.BytesInBuffer -= JsonConstants.Utf8Bom.Length;
620-
}
621-
}
622-
623532
// Process the data available
624533
TValue value = ReadCore<TValue>(
625534
ref jsonReaderState,
626535
bufferState.IsFinalBlock,
627-
new ReadOnlySpan<byte>(bufferState.Buffer, start, bufferState.BytesInBuffer),
536+
bufferState.Bytes,
628537
options,
629538
ref readStack,
630539
converter);
631540

632-
Debug.Assert(readStack.BytesConsumed <= bufferState.BytesInBuffer);
633-
int bytesConsumed = checked((int)readStack.BytesConsumed);
634-
635-
bufferState.BytesInBuffer -= bytesConsumed;
636-
637-
// The reader should have thrown if we have remaining bytes.
638-
Debug.Assert(!bufferState.IsFinalBlock || bufferState.BytesInBuffer == 0);
639-
640-
if (!bufferState.IsFinalBlock)
641-
{
642-
// Check if we need to shift or expand the buffer because there wasn't enough data to complete deserialization.
643-
if ((uint)bufferState.BytesInBuffer > ((uint)bufferState.Buffer.Length / 2))
644-
{
645-
// We have less than half the buffer available, double the buffer size.
646-
byte[] oldBuffer = bufferState.Buffer;
647-
int oldClearMax = bufferState.ClearMax;
648-
byte[] newBuffer = ArrayPool<byte>.Shared.Rent((bufferState.Buffer.Length < (int.MaxValue / 2)) ? bufferState.Buffer.Length * 2 : int.MaxValue);
649-
650-
// Copy the unprocessed data to the new buffer while shifting the processed bytes.
651-
Buffer.BlockCopy(oldBuffer, bytesConsumed + start, newBuffer, 0, bufferState.BytesInBuffer);
652-
bufferState.Buffer = newBuffer;
653-
bufferState.ClearMax = bufferState.BytesInBuffer;
654-
655-
// Clear and return the old buffer
656-
new Span<byte>(oldBuffer, 0, oldClearMax).Clear();
657-
ArrayPool<byte>.Shared.Return(oldBuffer);
658-
}
659-
else if (bufferState.BytesInBuffer != 0)
660-
{
661-
// Shift the processed bytes to the beginning of buffer to make more room.
662-
Buffer.BlockCopy(bufferState.Buffer, bytesConsumed + start, bufferState.Buffer, 0, bufferState.BytesInBuffer);
663-
}
664-
}
541+
Debug.Assert(readStack.BytesConsumed <= bufferState.Bytes.Length);
542+
bufferState.AdvanceBuffer((int)readStack.BytesConsumed);
665543

666544
return value;
667545
}

0 commit comments

Comments
 (0)