Skip to content

Periodically flush when using PipeWriter in Json #102541

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 33 additions & 15 deletions src/libraries/Common/src/System/Text/Json/PooledByteBufferWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,22 @@
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.IO.Pipelines;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.Text.Json
{
internal sealed class PooledByteBufferWriter : IBufferWriter<byte>, IDisposable
internal sealed class PooledByteBufferWriter : PipeWriter, IDisposable
{
// This class allows two possible configurations: if rentedBuffer is not null then
// it can be used as an IBufferWriter and holds a buffer that should eventually be
// returned to the shared pool. If rentedBuffer is null, then the instance is in a
// cleared/disposed state and it must re-rent a buffer before it can be used again.
private byte[]? _rentedBuffer;
private int _index;
private readonly Stream? _stream;

private const int MinimumBufferSize = 256;

Expand All @@ -41,6 +43,11 @@ public PooledByteBufferWriter(int initialCapacity) : this()
_index = 0;
}

public PooledByteBufferWriter(int initialCapacity, Stream stream) : this(initialCapacity)
{
_stream = stream;
}

public ReadOnlyMemory<byte> WrittenMemory
{
get
Expand Down Expand Up @@ -127,43 +134,32 @@ public void InitializeEmptyInstance(int initialCapacity)

public static PooledByteBufferWriter CreateEmptyInstanceForCaching() => new PooledByteBufferWriter();

public void Advance(int count)
public override void Advance(int count)
{
Debug.Assert(_rentedBuffer != null);
Debug.Assert(count >= 0);
Debug.Assert(_index <= _rentedBuffer.Length - count);
_index += count;
}

public Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
public override Memory<byte> GetMemory(int sizeHint = MinimumBufferSize)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsMemory(_index);
}

public Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
public override Span<byte> GetSpan(int sizeHint = MinimumBufferSize)
{
CheckAndResizeBuffer(sizeHint);
return _rentedBuffer.AsSpan(_index);
}

#if NET
internal ValueTask WriteToStreamAsync(Stream destination, CancellationToken cancellationToken)
{
return destination.WriteAsync(WrittenMemory, cancellationToken);
}

internal void WriteToStream(Stream destination)
{
destination.Write(WrittenMemory.Span);
}
#else
internal Task WriteToStreamAsync(Stream destination, CancellationToken cancellationToken)
{
Debug.Assert(_rentedBuffer != null);
return destination.WriteAsync(_rentedBuffer, 0, _index, cancellationToken);
}

internal void WriteToStream(Stream destination)
{
Debug.Assert(_rentedBuffer != null);
Expand Down Expand Up @@ -217,6 +213,28 @@ private void CheckAndResizeBuffer(int sizeHint)
Debug.Assert(_rentedBuffer.Length - _index > 0);
Debug.Assert(_rentedBuffer.Length - _index >= sizeHint);
}

public override async ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken = default)
{
Debug.Assert(_stream is not null);
#if NET
await _stream.WriteAsync(WrittenMemory, cancellationToken).ConfigureAwait(false);
#else
Debug.Assert(_rentedBuffer != null);
await _stream.WriteAsync(_rentedBuffer, 0, _index, cancellationToken).ConfigureAwait(false);
#endif
Clear();

return new FlushResult(isCanceled: false, isCompleted: false);
}

public override bool CanGetUnflushedBytes => true;
public override long UnflushedBytes => _index;

// This type is used internally in JsonSerializer to help buffer and flush bytes to the underlying Stream.
// It's only pretending to be a PipeWriter and doesn't need Complete or CancelPendingFlush for the internal usage.
public override void CancelPendingFlush() => throw new NotImplementedException();
public override void Complete(Exception? exception = null) => throw new NotImplementedException();
}

internal static partial class ThrowHelper
Expand Down
4 changes: 2 additions & 2 deletions src/libraries/System.Text.Json/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,8 @@
<data name="PipeWriterCanceled" xml:space="preserve">
<value>PipeWriter.FlushAsync was canceled.</value>
</data>
<data name="PipeWriterCompleted" xml:space="preserve">
<value>PipeWriter has been completed, nothing more can be written to it.</value>
<data name="PipeWriter_DoesNotImplementUnflushedBytes" xml:space="preserve">
<value>The PipeWriter '{0}' does not implement PipeWriter.UnflushedBytes.</value>
</data>
<data name="InvalidNewLine" xml:space="preserve">
<value>New line can be only "\n" or "\r\n".</value>
Expand Down
1 change: 0 additions & 1 deletion src/libraries/System.Text.Json/src/System.Text.Json.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ The System.Text.Json library is built-in as part of the shared framework in .NET
<Compile Include="System\Text\Json\Reader\Utf8JsonReader.TryGet.cs" />
<Compile Include="System\Text\Json\Serialization\Arguments.cs" />
<Compile Include="System\Text\Json\Serialization\ArgumentState.cs" />
<Compile Include="System\Text\Json\Serialization\AsyncSerializationBufferWriterContext.cs" />
<Compile Include="System\Text\Json\Serialization\Attributes\JsonObjectCreationHandlingAttribute.cs" />
<Compile Include="System\Text\Json\Serialization\Attributes\JsonConstructorAttribute.cs" />
<Compile Include="System\Text\Json\Serialization\Attributes\JsonConverterAttribute.cs" />
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TElement[] array, J

state.Current.EndCollectionElement();

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.EnumeratorIndex = ++index;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ protected internal override bool OnWriteResume(

do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected internal override bool OnWriteResume(
{
do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TAsyncEnumerable va
return true;
}

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ protected internal override bool OnWriteResume(Utf8JsonWriter writer, TDictionar

do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected override bool OnWriteResume(
JsonConverter<object?> converter = GetElementConverter(ref state);
do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TCollection value,
JsonConverter<TElement> converter = GetElementConverter(ref state);
do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TCollection value,

state.Current.EndCollectionElement();

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.EnumeratorIndex = ++index;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ protected override bool OnWriteResume(Utf8JsonWriter writer, TCollection value,

state.Current.EndCollectionElement();

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.EnumeratorIndex = ++index;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ internal static bool OnWriteResume(Utf8JsonWriter writer, ReadOnlySpan<T> value,

state.Current.EndCollectionElement();

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.EnumeratorIndex = ++index;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ protected sealed override bool OnWriteResume(Utf8JsonWriter writer, TCollection
JsonConverter<object?> converter = GetElementConverter(ref state);
do
{
if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
state.Current.CollectionEnumerator = enumerator;
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ internal sealed override bool OnTryWrite(
state.Current.EndProperty();
state.Current.EnumeratorIndex++;

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
return false;
}
Expand Down Expand Up @@ -430,7 +430,7 @@ internal sealed override bool OnTryWrite(
state.Current.EndProperty();
state.Current.EnumeratorIndex++;

if (ShouldFlush(writer, ref state))
if (ShouldFlush(ref state))
{
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,17 @@ internal JsonConverter<TTarget> CreateCastingConverter<TTarget>()
/// </summary>
internal bool IsInternalConverterForNumberType { get; init; }

internal static bool ShouldFlush(Utf8JsonWriter writer, ref WriteStack state)
internal static bool ShouldFlush(ref WriteStack state)
{
// If surpassed flush threshold then return false which will flush stream.
return (state.FlushThreshold > 0 && writer.BytesPending > state.FlushThreshold);
Debug.Assert(state.FlushThreshold == 0 || (state.PipeWriter is { CanGetUnflushedBytes: true }),
"ShouldFlush should only be called by resumable serializers, all of which use the PipeWriter abstraction with CanGetUnflushedBytes == true.");
// If surpassed flush threshold then return true which will flush stream.
if (state.PipeWriter is { } pipeWriter)
{
return state.FlushThreshold > 0 && pipeWriter.UnflushedBytes > state.FlushThreshold;
}

return false;
}

internal abstract object? ReadAsObject(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,7 @@ public JsonPropertyInfo CreateJsonPropertyInfo(Type propertyType, string name)

// Untyped, root-level serialization methods
internal abstract void SerializeAsObject(Utf8JsonWriter writer, object? rootValue);
internal abstract Task SerializeAsObjectAsync<TSerializationContext>(TSerializationContext serializationContext, object? rootValue, CancellationToken cancellationToken)
where TSerializationContext : struct, IAsyncSerializationBufferWriterContext;
internal abstract Task SerializeAsObjectAsync(PipeWriter pipeWriter, object? rootValue, int flushThreshold, CancellationToken cancellationToken);
internal abstract Task SerializeAsObjectAsync(Stream utf8Json, object? rootValue, CancellationToken cancellationToken);
internal abstract Task SerializeAsObjectAsync(PipeWriter utf8Json, object? rootValue, CancellationToken cancellationToken);
internal abstract void SerializeAsObject(Stream utf8Json, object? rootValue);
Expand Down
Loading
Loading