Skip to content

Commit cd24d14

Browse files
Use new PipeWriter Json overloads (#55740)
1 parent fe84bd0 commit cd24d14

File tree

18 files changed

+474
-44
lines changed

18 files changed

+474
-44
lines changed

src/Hosting/TestHost/src/ResponseBodyPipeWriter.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,4 +95,8 @@ public override Span<byte> GetSpan(int sizeHint = 0)
9595
CheckNotComplete();
9696
return _pipe.Writer.GetSpan(sizeHint);
9797
}
98+
99+
public override bool CanGetUnflushedBytes => _pipe.Writer.CanGetUnflushedBytes;
100+
101+
public override long UnflushedBytes => _pipe.Writer.UnflushedBytes;
98102
}

src/Http/Http.Extensions/src/HttpResponseJsonExtensions.cs

Lines changed: 91 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33

44
using System.Diagnostics.CodeAnalysis;
5+
using System.IO.Pipelines;
56
using System.Text.Json;
67
using System.Text.Json.Serialization;
78
using System.Text.Json.Serialization.Metadata;
@@ -89,13 +90,23 @@ public static Task WriteAsJsonAsync<TValue>(
8990

9091
response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;
9192

93+
var startTask = Task.CompletedTask;
94+
if (!response.HasStarted)
95+
{
96+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
97+
startTask = response.StartAsync(cancellationToken);
98+
}
99+
92100
// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
93-
if (!cancellationToken.CanBeCanceled)
101+
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
94102
{
95-
return WriteAsJsonAsyncSlow(response.Body, value, options, response.HttpContext.RequestAborted);
103+
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, options,
104+
ignoreOCE: !cancellationToken.CanBeCanceled,
105+
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
96106
}
97107

98-
return JsonSerializer.SerializeAsync(response.Body, value, options, cancellationToken);
108+
startTask.GetAwaiter().GetResult();
109+
return JsonSerializer.SerializeAsync(response.BodyWriter, value, options, cancellationToken);
99110
}
100111

101112
/// <summary>
@@ -120,21 +131,33 @@ public static Task WriteAsJsonAsync<TValue>(
120131

121132
response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;
122133

134+
var startTask = Task.CompletedTask;
135+
if (!response.HasStarted)
136+
{
137+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
138+
startTask = response.StartAsync(cancellationToken);
139+
}
140+
123141
// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
124-
if (!cancellationToken.CanBeCanceled)
142+
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
125143
{
126-
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
144+
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
145+
ignoreOCE: !cancellationToken.CanBeCanceled,
146+
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
127147
}
128148

129-
return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
149+
startTask.GetAwaiter().GetResult();
150+
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
130151

131-
static async Task WriteAsJsonAsyncSlow(HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo)
152+
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, TValue value, JsonTypeInfo<TValue> jsonTypeInfo,
153+
bool ignoreOCE, CancellationToken cancellationToken)
132154
{
133155
try
134156
{
135-
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
157+
await startTask;
158+
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
136159
}
137-
catch (OperationCanceledException) { }
160+
catch (OperationCanceledException) when (ignoreOCE) { }
138161
}
139162
}
140163

@@ -161,37 +184,52 @@ public static Task WriteAsJsonAsync(
161184

162185
response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;
163186

187+
var startTask = Task.CompletedTask;
188+
if (!response.HasStarted)
189+
{
190+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
191+
startTask = response.StartAsync(cancellationToken);
192+
}
193+
164194
// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
165-
if (!cancellationToken.CanBeCanceled)
195+
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
166196
{
167-
return WriteAsJsonAsyncSlow(response, value, jsonTypeInfo);
197+
return WriteAsJsonAsyncSlow(startTask, response, value, jsonTypeInfo,
198+
ignoreOCE: !cancellationToken.CanBeCanceled,
199+
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
168200
}
169201

170-
return JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, cancellationToken);
202+
startTask.GetAwaiter().GetResult();
203+
return JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
171204

172-
static async Task WriteAsJsonAsyncSlow(HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo)
205+
static async Task WriteAsJsonAsyncSlow(Task startTask, HttpResponse response, object? value, JsonTypeInfo jsonTypeInfo,
206+
bool ignoreOCE, CancellationToken cancellationToken)
173207
{
174208
try
175209
{
176-
await JsonSerializer.SerializeAsync(response.Body, value, jsonTypeInfo, response.HttpContext.RequestAborted);
210+
await startTask;
211+
await JsonSerializer.SerializeAsync(response.BodyWriter, value, jsonTypeInfo, cancellationToken);
177212
}
178-
catch (OperationCanceledException) { }
213+
catch (OperationCanceledException) when (ignoreOCE) { }
179214
}
180215
}
181216

182217
[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
183218
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
184219
private static async Task WriteAsJsonAsyncSlow<TValue>(
185-
Stream body,
220+
Task startTask,
221+
PipeWriter body,
186222
TValue value,
187223
JsonSerializerOptions? options,
224+
bool ignoreOCE,
188225
CancellationToken cancellationToken)
189226
{
190227
try
191228
{
229+
await startTask;
192230
await JsonSerializer.SerializeAsync(body, value, options, cancellationToken);
193231
}
194-
catch (OperationCanceledException) { }
232+
catch (OperationCanceledException) when (ignoreOCE) { }
195233
}
196234

197235
/// <summary>
@@ -266,29 +304,42 @@ public static Task WriteAsJsonAsync(
266304

267305
response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;
268306

307+
var startTask = Task.CompletedTask;
308+
if (!response.HasStarted)
309+
{
310+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
311+
startTask = response.StartAsync(cancellationToken);
312+
}
313+
269314
// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
270-
if (!cancellationToken.CanBeCanceled)
315+
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
271316
{
272-
return WriteAsJsonAsyncSlow(response.Body, value, type, options, response.HttpContext.RequestAborted);
317+
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, options,
318+
ignoreOCE: !cancellationToken.CanBeCanceled,
319+
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
273320
}
274321

275-
return JsonSerializer.SerializeAsync(response.Body, value, type, options, cancellationToken);
322+
startTask.GetAwaiter().GetResult();
323+
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, options, cancellationToken);
276324
}
277325

278326
[RequiresUnreferencedCode(RequiresUnreferencedCodeMessage)]
279327
[RequiresDynamicCode(RequiresDynamicCodeMessage)]
280328
private static async Task WriteAsJsonAsyncSlow(
281-
Stream body,
329+
Task startTask,
330+
PipeWriter body,
282331
object? value,
283332
Type type,
284333
JsonSerializerOptions? options,
334+
bool ignoreOCE,
285335
CancellationToken cancellationToken)
286336
{
287337
try
288338
{
339+
await startTask;
289340
await JsonSerializer.SerializeAsync(body, value, type, options, cancellationToken);
290341
}
291-
catch (OperationCanceledException) { }
342+
catch (OperationCanceledException) when (ignoreOCE) { }
292343
}
293344

294345
/// <summary>
@@ -316,21 +367,33 @@ public static Task WriteAsJsonAsync(
316367

317368
response.ContentType = contentType ?? JsonConstants.JsonContentTypeWithCharset;
318369

370+
var startTask = Task.CompletedTask;
371+
if (!response.HasStarted)
372+
{
373+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
374+
startTask = response.StartAsync(cancellationToken);
375+
}
376+
319377
// if no user provided token, pass the RequestAborted token and ignore OperationCanceledException
320-
if (!cancellationToken.CanBeCanceled)
378+
if (!startTask.IsCompleted || !cancellationToken.CanBeCanceled)
321379
{
322-
return WriteAsJsonAsyncSlow();
380+
return WriteAsJsonAsyncSlow(startTask, response.BodyWriter, value, type, context,
381+
ignoreOCE: !cancellationToken.CanBeCanceled,
382+
cancellationToken.CanBeCanceled ? cancellationToken : response.HttpContext.RequestAborted);
323383
}
324384

325-
return JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
385+
startTask.GetAwaiter().GetResult();
386+
return JsonSerializer.SerializeAsync(response.BodyWriter, value, type, context, cancellationToken);
326387

327-
async Task WriteAsJsonAsyncSlow()
388+
static async Task WriteAsJsonAsyncSlow(Task startTask, PipeWriter body, object? value, Type type, JsonSerializerContext context,
389+
bool ignoreOCE, CancellationToken cancellationToken)
328390
{
329391
try
330392
{
331-
await JsonSerializer.SerializeAsync(response.Body, value, type, context, cancellationToken);
393+
await startTask;
394+
await JsonSerializer.SerializeAsync(body, value, type, context, cancellationToken);
332395
}
333-
catch (OperationCanceledException) { }
396+
catch (OperationCanceledException) when (ignoreOCE) { }
334397
}
335398
}
336399

src/Http/Http.Extensions/test/HttpResponseJsonExtensionsTests.cs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,10 +494,7 @@ private class TestStream : Stream
494494
public override long Length { get; }
495495
public override long Position { get; set; }
496496

497-
public override void Flush()
498-
{
499-
throw new NotImplementedException();
500-
}
497+
public override void Flush() { }
501498

502499
public override int Read(byte[] buffer, int offset, int count)
503500
{

src/Mvc/Mvc.Core/src/Formatters/SystemTextJsonOutputFormatter.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,21 +83,25 @@ public sealed override async Task WriteResponseBodyAsync(OutputFormatterWriteCon
8383
}
8484
}
8585

86-
var responseStream = httpContext.Response.Body;
8786
if (selectedEncoding.CodePage == Encoding.UTF8.CodePage)
8887
{
8988
try
9089
{
90+
var responseWriter = httpContext.Response.BodyWriter;
91+
if (!httpContext.Response.HasStarted)
92+
{
93+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
94+
await httpContext.Response.StartAsync();
95+
}
96+
9197
if (jsonTypeInfo is not null)
9298
{
93-
await JsonSerializer.SerializeAsync(responseStream, context.Object, jsonTypeInfo, httpContext.RequestAborted);
99+
await JsonSerializer.SerializeAsync(responseWriter, context.Object, jsonTypeInfo, httpContext.RequestAborted);
94100
}
95101
else
96102
{
97-
await JsonSerializer.SerializeAsync(responseStream, context.Object, SerializerOptions, httpContext.RequestAborted);
103+
await JsonSerializer.SerializeAsync(responseWriter, context.Object, SerializerOptions, httpContext.RequestAborted);
98104
}
99-
100-
await responseStream.FlushAsync(httpContext.RequestAborted);
101105
}
102106
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
103107
}

src/Mvc/Mvc.Core/src/Infrastructure/SystemTextJsonResultExecutor.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,18 @@ public async Task ExecuteAsync(ActionContext context, JsonResult result)
6161
var objectType = value?.GetType() ?? typeof(object);
6262

6363
// Keep this code in sync with SystemTextJsonOutputFormatter
64-
var responseStream = response.Body;
6564
if (resolvedContentTypeEncoding.CodePage == Encoding.UTF8.CodePage)
6665
{
6766
try
6867
{
69-
await JsonSerializer.SerializeAsync(responseStream, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
70-
await responseStream.FlushAsync(context.HttpContext.RequestAborted);
68+
var responseWriter = response.BodyWriter;
69+
if (!response.HasStarted)
70+
{
71+
// Flush headers before starting Json serialization. This avoids an extra layer of buffering before the first flush.
72+
await response.StartAsync();
73+
}
74+
75+
await JsonSerializer.SerializeAsync(responseWriter, value, objectType, jsonSerializerOptions, context.HttpContext.RequestAborted);
7176
}
7277
catch (OperationCanceledException) when (context.HttpContext.RequestAborted.IsCancellationRequested) { }
7378
}

src/Servers/Kestrel/Core/src/Internal/Http/Http1OutputProducer.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ internal class Http1OutputProducer : IHttpOutputProducer, IDisposable
6161
// Fields needed to store writes before calling either startAsync or Write/FlushAsync
6262
// These should be cleared by the end of the request
6363
private List<CompletedBuffer>? _completedSegments;
64+
private int _completedSegmentsByteCount;
6465
private Memory<byte> _currentSegment;
6566
private IMemoryOwner<byte>? _currentSegmentOwner;
6667
private int _position;
@@ -273,6 +274,15 @@ public void Advance(int bytes)
273274
}
274275
}
275276

277+
public long UnflushedBytes
278+
{
279+
get
280+
{
281+
var bytes = _position + _advancedBytesForChunk + _pipeWriter.UnflushedBytes + _completedSegmentsByteCount;
282+
return bytes;
283+
}
284+
}
285+
276286
public void CancelPendingFlush()
277287
{
278288
_pipeWriter.CancelPendingFlush();
@@ -372,6 +382,7 @@ private void WriteDataWrittenBeforeHeaders(ref BufferWriter<PipeWriter> writer)
372382
segment.Return();
373383
}
374384

385+
_completedSegmentsByteCount = 0;
375386
_completedSegments.Clear();
376387
}
377388

@@ -730,6 +741,7 @@ private void AddSegment(int sizeHint = 0)
730741
// GetMemory was called. In that case we'll take the current segment and call it "completed", but need to
731742
// ignore any empty space in it.
732743
_completedSegments.Add(new CompletedBuffer(_currentSegmentOwner, _currentSegment, _position));
744+
_completedSegmentsByteCount += _position;
733745
}
734746

735747
if (sizeHint <= _memoryPool.MaxBufferSize)

src/Servers/Kestrel/Core/src/Internal/Http/HttpProtocol.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1482,6 +1482,8 @@ public void Advance(int bytes)
14821482
}
14831483
}
14841484

1485+
public long UnflushedBytes => Output.UnflushedBytes;
1486+
14851487
public Memory<byte> GetMemory(int sizeHint = 0)
14861488
{
14871489
_isLeasedMemoryInvalid = false;

src/Servers/Kestrel/Core/src/Internal/Http/HttpResponsePipeWriter.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ public void Abort()
9393
}
9494
}
9595

96+
public override bool CanGetUnflushedBytes => true;
97+
public override long UnflushedBytes => _pipeControl.UnflushedBytes;
98+
9699
[MethodImpl(MethodImplOptions.AggressiveInlining)]
97100
private void ValidateState(CancellationToken cancellationToken = default)
98101
{

src/Servers/Kestrel/Core/src/Internal/Http/IHttpOutputProducer.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ internal interface IHttpOutputProducer
1616
Task WriteDataAsync(ReadOnlySpan<byte> data, CancellationToken cancellationToken);
1717
ValueTask<FlushResult> WriteStreamSuffixAsync();
1818
void Advance(int bytes);
19+
long UnflushedBytes { get; }
1920
Span<byte> GetSpan(int sizeHint = 0);
2021
Memory<byte> GetMemory(int sizeHint = 0);
2122
void CancelPendingFlush();

src/Servers/Kestrel/Core/src/Internal/Http/IHttpResponseControl.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ internal interface IHttpResponseControl
1111
Memory<byte> GetMemory(int sizeHint = 0);
1212
Span<byte> GetSpan(int sizeHint = 0);
1313
void Advance(int bytes);
14+
long UnflushedBytes { get; }
1415
ValueTask<FlushResult> FlushPipeAsync(CancellationToken cancellationToken);
1516
ValueTask<FlushResult> WritePipeAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken);
1617
void CancelPendingFlush();

src/Servers/Kestrel/Core/src/Internal/Http2/Http2OutputProducer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,8 @@ public void Advance(int bytes)
469469
}
470470
}
471471

472+
public long UnflushedBytes => _pipeWriter.UnflushedBytes;
473+
472474
public Span<byte> GetSpan(int sizeHint = 0)
473475
{
474476
lock (_dataWriterLock)

src/Servers/Kestrel/Core/src/Internal/Http3/Http3OutputProducer.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,8 @@ public void Advance(int bytes)
122122
}
123123
}
124124

125+
public long UnflushedBytes => _pipeWriter.UnflushedBytes;
126+
125127
public void CancelPendingFlush()
126128
{
127129
lock (_dataWriterLock)

0 commit comments

Comments
 (0)