Skip to content

Commit 443add5

Browse files
committed
Use PipeWriter instead of ArrayWriter when writing to outputStream
1 parent c30abc5 commit 443add5

File tree

5 files changed

+726
-4
lines changed

5 files changed

+726
-4
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<PackageId>HotChocolate.Transport.Formatters</PackageId>
5+
<AssemblyName>HotChocolate.Transport.Formatters</AssemblyName>
6+
<RootNamespace>HotChocolate.Transport.Formatters</RootNamespace>
7+
<Description>This package contains formatters for GraphQL over HTTP.</Description>
8+
</PropertyGroup>
9+
10+
<PropertyGroup>
11+
<IsAotCompatible>true</IsAotCompatible>
12+
</PropertyGroup>
13+
14+
<ItemGroup>
15+
<ProjectReference Include="..\..\..\Core\src\Execution.Abstractions\HotChocolate.Execution.Abstractions.csproj" />
16+
<ProjectReference Include="..\..\..\Utilities\src\Utilities.Buffers\HotChocolate.Utilities.Buffers.csproj" />
17+
<ProjectReference Include="..\..\..\Utilities\src\Utilities.Tasks\HotChocolate.Utilities.Tasks.csproj" />
18+
</ItemGroup>
19+
20+
<ItemGroup>
21+
<PackageReference Include="System.IO.Pipelines" />
22+
</ItemGroup>
23+
24+
</Project>
Lines changed: 285 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,285 @@
1+
using System.Buffers;
2+
using System.Diagnostics;
3+
using System.IO.Pipelines;
4+
using HotChocolate.Buffers;
5+
using HotChocolate.Execution;
6+
using HotChocolate.Utilities;
7+
using static HotChocolate.Transport.Formatters.JsonLinesResultFormatterEventSource;
8+
9+
namespace HotChocolate.Transport.Formatters;
10+
11+
public sealed class JsonLinesResultFormatter(JsonResultFormatterOptions options) : IExecutionResultFormatter
12+
{
13+
private const int MaxBacklogSize = 64;
14+
private static readonly StreamPipeWriterOptions s_pipeWriterOptions = new(leaveOpen: true);
15+
private readonly JsonResultFormatter _payloadFormatter = new(options with { Indented = false });
16+
17+
public ValueTask FormatAsync(
18+
IExecutionResult result,
19+
Stream outputStream,
20+
CancellationToken cancellationToken = default)
21+
{
22+
ArgumentNullException.ThrowIfNull(result);
23+
ArgumentNullException.ThrowIfNull(outputStream);
24+
25+
return result switch
26+
{
27+
IOperationResult operationResult
28+
=> FormatOperationResultAsync(operationResult, outputStream, cancellationToken),
29+
OperationResultBatch resultBatch
30+
=> FormatResultBatchAsync(resultBatch, outputStream, cancellationToken),
31+
IResponseStream responseStream
32+
=> FormatResponseStreamAsync(responseStream, outputStream, cancellationToken),
33+
_ => throw new NotSupportedException()
34+
};
35+
}
36+
37+
private async ValueTask FormatOperationResultAsync(
38+
IOperationResult operationResult,
39+
Stream outputStream,
40+
CancellationToken ct)
41+
{
42+
var buffer = PipeWriter.Create(outputStream, s_pipeWriterOptions);
43+
var scope = Log.FormatOperationResultStart();
44+
45+
try
46+
{
47+
MessageHelper.FormatNextMessage(_payloadFormatter, operationResult, buffer);
48+
49+
if (!ct.IsCancellationRequested)
50+
{
51+
await buffer.CompleteAsync().ConfigureAwait(false);
52+
}
53+
}
54+
catch (Exception ex)
55+
{
56+
scope?.AddError(ex);
57+
Debug.WriteLine(ex);
58+
}
59+
finally
60+
{
61+
scope?.Dispose();
62+
}
63+
}
64+
65+
private async ValueTask FormatResultBatchAsync(
66+
OperationResultBatch resultBatch,
67+
Stream outputStream,
68+
CancellationToken ct)
69+
{
70+
await using var writer = new ConcurrentStreamWriter(outputStream, MaxBacklogSize);
71+
72+
await using var tokenRegistration = ct.Register(
73+
static w => ((ConcurrentStreamWriter)w!).DisposeAsync().FireAndForget(),
74+
writer,
75+
useSynchronizationContext: false);
76+
77+
KeepAliveJob? keepAlive = null;
78+
List<Task>? streams = null;
79+
80+
try
81+
{
82+
foreach (var result in resultBatch.Results)
83+
{
84+
switch (result)
85+
{
86+
case IOperationResult operationResult:
87+
var scope = Log.FormatOperationResultStart();
88+
try
89+
{
90+
var buffer = writer.Begin();
91+
MessageHelper.FormatNextMessage(_payloadFormatter, operationResult, buffer);
92+
await writer.CommitAsync(buffer, ct).ConfigureAwait(false);
93+
keepAlive?.Reset();
94+
}
95+
catch (Exception ex)
96+
{
97+
scope?.AddError(ex);
98+
Debug.WriteLine(ex);
99+
}
100+
finally
101+
{
102+
await operationResult.DisposeAsync().ConfigureAwait(false);
103+
scope?.Dispose();
104+
}
105+
106+
break;
107+
108+
case IResponseStream responseStream:
109+
keepAlive ??= new KeepAliveJob(writer);
110+
streams ??= [];
111+
var formatter = new StreamFormatter(_payloadFormatter, keepAlive, responseStream, writer);
112+
streams.Add(formatter.ProcessAsync(ct));
113+
break;
114+
115+
default:
116+
throw new NotSupportedException(
117+
"The result batch contains an unsupported result type.");
118+
}
119+
}
120+
}
121+
finally
122+
{
123+
if (streams?.Count > 0)
124+
{
125+
await Task.WhenAll(streams).ConfigureAwait(false);
126+
}
127+
128+
keepAlive?.Dispose();
129+
}
130+
131+
await writer.WaitForCompletionAsync().ConfigureAwait(false);
132+
}
133+
134+
private async ValueTask FormatResponseStreamAsync(
135+
IResponseStream responseStream,
136+
Stream outputStream,
137+
CancellationToken ct)
138+
{
139+
await using var writer = new ConcurrentStreamWriter(outputStream, MaxBacklogSize);
140+
141+
await using var tokenRegistration = ct.Register(
142+
static w => ((ConcurrentStreamWriter)w!).DisposeAsync().FireAndForget(),
143+
writer,
144+
useSynchronizationContext: false);
145+
146+
using (var keepAlive = new KeepAliveJob(writer))
147+
{
148+
var formatter = new StreamFormatter(_payloadFormatter, keepAlive, responseStream, writer);
149+
await formatter.ProcessAsync(ct).ConfigureAwait(false);
150+
}
151+
152+
await writer.WaitForCompletionAsync().ConfigureAwait(false);
153+
}
154+
155+
private sealed class StreamFormatter(
156+
JsonResultFormatter payloadFormatter,
157+
KeepAliveJob keepAliveJob,
158+
IResponseStream responseStream,
159+
ConcurrentStreamWriter writer)
160+
{
161+
public async Task ProcessAsync(CancellationToken ct)
162+
{
163+
try
164+
{
165+
await foreach (var result in responseStream.ReadResultsAsync()
166+
.WithCancellation(ct)
167+
.ConfigureAwait(false))
168+
{
169+
var scope = Log.FormatOperationResultStart();
170+
171+
try
172+
{
173+
var buffer = writer.Begin();
174+
MessageHelper.FormatNextMessage(payloadFormatter, result, buffer);
175+
await writer.CommitAsync(buffer, ct).ConfigureAwait(false);
176+
keepAliveJob.Reset();
177+
}
178+
catch (Exception ex)
179+
{
180+
scope?.AddError(ex);
181+
Debug.WriteLine(ex);
182+
return;
183+
}
184+
finally
185+
{
186+
await result.DisposeAsync().ConfigureAwait(false);
187+
scope?.Dispose();
188+
}
189+
}
190+
}
191+
catch (OperationCanceledException)
192+
{
193+
// if the operation was canceled, we do not need to log this
194+
// and will stop gracefully.
195+
}
196+
finally
197+
{
198+
await responseStream.DisposeAsync().ConfigureAwait(false);
199+
}
200+
}
201+
}
202+
203+
private sealed class KeepAliveJob : IDisposable
204+
{
205+
private static readonly TimeSpan s_timerPeriod = TimeSpan.FromSeconds(12);
206+
private static readonly TimeSpan s_keepAlivePeriod = TimeSpan.FromSeconds(8);
207+
private readonly CancellationTokenSource _cancellationTokenSource = new();
208+
private readonly CancellationToken _ct;
209+
private readonly ConcurrentStreamWriter _writer;
210+
private readonly Timer _keepAliveTimer;
211+
private DateTime _lastWriteTime = DateTime.UtcNow;
212+
private bool _disposed;
213+
214+
public KeepAliveJob(ConcurrentStreamWriter writer)
215+
{
216+
_writer = writer;
217+
_keepAliveTimer = new Timer(_ => EnsureKeepAlive(), null, s_timerPeriod, s_timerPeriod);
218+
_ct = _cancellationTokenSource.Token;
219+
}
220+
221+
public void Reset() => _lastWriteTime = DateTime.UtcNow;
222+
223+
private void EnsureKeepAlive()
224+
{
225+
if (_disposed)
226+
{
227+
return;
228+
}
229+
230+
if (DateTime.UtcNow - _lastWriteTime >= s_keepAlivePeriod)
231+
{
232+
WriteKeepAliveAsync().FireAndForget();
233+
}
234+
235+
async Task WriteKeepAliveAsync()
236+
{
237+
try
238+
{
239+
var buffer = _writer.Begin();
240+
buffer.Write(MessageHelper.KeepAlive);
241+
await _writer.CommitAsync(buffer, _ct).ConfigureAwait(false);
242+
_lastWriteTime = DateTime.UtcNow;
243+
}
244+
catch
245+
{
246+
// ignore
247+
}
248+
}
249+
}
250+
251+
public void Dispose()
252+
{
253+
if (_disposed)
254+
{
255+
return;
256+
}
257+
258+
_disposed = true;
259+
_keepAliveTimer.Dispose();
260+
_cancellationTokenSource.Cancel();
261+
_cancellationTokenSource.Dispose();
262+
}
263+
}
264+
265+
private static class MessageHelper
266+
{
267+
private const byte NewLine = (byte)'\n';
268+
269+
public static void FormatNextMessage(
270+
JsonResultFormatter payloadFormatter,
271+
IOperationResult result,
272+
IBufferWriter<byte> writer)
273+
{
274+
// write the result data
275+
payloadFormatter.Format(result, writer);
276+
277+
// write the new line
278+
var span = writer.GetSpan(1);
279+
span[0] = NewLine;
280+
writer.Advance(1);
281+
}
282+
283+
public static ReadOnlySpan<byte> KeepAlive => " "u8;
284+
}
285+
}

0 commit comments

Comments
 (0)