Skip to content

Commit 0486124

Browse files
authored
Merge pull request #417 from nblumhardt/buffer-pooling-fix
Fix array pool usage across API buffer and forwarding channel entries
2 parents 1f0ac11 + f84d3f1 commit 0486124

File tree

4 files changed

+41
-42
lines changed

4 files changed

+41
-42
lines changed

src/SeqCli/Forwarder/Channel/ForwardingChannel.cs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
using System;
16+
using System.Buffers;
1617
using System.IO;
1718
using System.Threading;
1819
using System.Threading.Channels;
@@ -73,6 +74,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
7374
{
7475
entry.CompletionSource.TrySetException(e);
7576
}
77+
78+
ArrayPool<byte>.Shared.Return(entry.Data.Array!);
7679
}
7780
}
7881
catch (Exception ex)
@@ -124,12 +127,14 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
124127
}, cancellationToken: hardCancel);
125128
}
126129

127-
public async Task WriteAsync(byte[] storage, Range range, CancellationToken cancellationToken)
130+
public async Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
128131
{
129132
var tcs = new TaskCompletionSource();
130133
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _hardCancel);
131134

132-
await _writer.WriteAsync(new ForwardingChannelEntry(storage[range], tcs), cts.Token);
135+
var copyBuffer = ArrayPool<byte>.Shared.Rent(data.Count);
136+
data.AsSpan().CopyTo(copyBuffer.AsSpan());
137+
await _writer.WriteAsync(new ForwardingChannelEntry(new ArraySegment<byte>(copyBuffer, 0, data.Count), tcs), cts.Token);
133138
await tcs.Task;
134139
}
135140

src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,5 @@
1717

1818
namespace SeqCli.Forwarder.Channel;
1919

20+
// Note, Data is backed by a rented array that the receiver should return.
2021
readonly record struct ForwardingChannelEntry(ArraySegment<byte> Data, TaskCompletionSource CompletionSource);

src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -70,67 +70,69 @@ async Task<IResult> IngestAsync(HttpContext context)
7070

7171
async Task<IResult> IngestCompactFormatAsync(HttpContext context)
7272
{
73+
byte[]? rented = null;
74+
7375
try
7476
{
75-
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
77+
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
7678
cts.CancelAfter(TimeSpan.FromSeconds(5));
77-
79+
7880
var requestApiKey = GetApiKey(context.Request);
79-
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);
80-
81+
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);
82+
8183
// Add one for the extra newline that we have to insert at the end of batches.
8284
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;
83-
var rented = ArrayPool<byte>.Shared.Rent(bufferSize);
84-
var buffer = rented[..bufferSize];
85+
rented = ArrayPool<byte>.Shared.Rent(bufferSize);
86+
var buffer = new ArraySegment<byte>(rented, 0, bufferSize);
8587
var writeHead = 0;
8688
var readHead = 0;
87-
89+
8890
var done = false;
8991
while (!done)
9092
{
9193
// Fill the memory buffer from as much of the incoming request payload as possible; buffering in memory increases the
9294
// size of write batches.
9395
while (!done)
9496
{
95-
var remaining = buffer.Length - 1 - writeHead;
97+
var remaining = buffer.Count - 1 - writeHead;
9698
if (remaining == 0)
9799
{
98100
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
99101
.Error("An incoming request exceeded the configured batch size limit");
100102
return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process");
101103
}
102-
104+
103105
var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token);
104106
if (read == 0)
105107
{
106108
done = true;
107109
}
108-
110+
109111
writeHead += read;
110-
112+
111113
// Ingested batches must be terminated with `\n`, but this isn't an API requirement.
112-
if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n')
114+
if (done && writeHead > 0 && writeHead < buffer.Count && buffer[writeHead - 1] != (byte)'\n')
113115
{
114116
buffer[writeHead] = (byte)'\n';
115117
writeHead += 1;
116118
}
117119
}
118-
120+
119121
// Validate what we read, marking out a batch of one or more complete newline-delimited events.
120122
var batchStart = readHead;
121123
var batchEnd = readHead;
122124
while (batchEnd < writeHead)
123125
{
124126
var eventStart = batchEnd;
125127
var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n');
126-
128+
127129
if (nlIndex == -1)
128130
{
129131
break;
130132
}
131-
133+
132134
var eventEnd = eventStart + nlIndex + 1;
133-
135+
134136
batchEnd = eventEnd;
135137
readHead = batchEnd;
136138

@@ -142,12 +144,12 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
142144
return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}.");
143145
}
144146
}
145-
147+
146148
if (batchStart != batchEnd)
147149
{
148-
await Write(log, ArrayPool<byte>.Shared, buffer, batchStart..batchEnd, cts.Token);
150+
await log.WriteAsync(buffer[batchStart..batchEnd], cts.Token);
149151
}
150-
152+
151153
// Copy any unprocessed data into our buffer and continue
152154
if (!done && readHead != 0)
153155
{
@@ -157,10 +159,7 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
157159
writeHead = retain;
158160
}
159161
}
160-
161-
// Exception cases are handled by `Write`
162-
ArrayPool<byte>.Shared.Return(rented);
163-
162+
164163
return SuccessfulIngestion();
165164
}
166165
catch (Exception ex)
@@ -169,6 +168,13 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
169168
.Error(ex, "Ingestion failed");
170169
return Error(HttpStatusCode.InternalServerError, "Ingestion failed.");
171170
}
171+
finally
172+
{
173+
if (rented != null)
174+
{
175+
ArrayPool<byte>.Shared.Return(rented);
176+
}
177+
}
172178
}
173179

174180
static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName)
@@ -263,19 +269,6 @@ bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment
263269
errorFragment = null;
264270
return true;
265271
}
266-
267-
static async Task Write(ForwardingChannel forwardingChannel, ArrayPool<byte> pool, byte[] storage, Range range, CancellationToken cancellationToken)
268-
{
269-
try
270-
{
271-
await forwardingChannel.WriteAsync(storage, range, cancellationToken);
272-
}
273-
catch
274-
{
275-
pool.Return(storage);
276-
throw;
277-
}
278-
}
279272

280273
static IResult Error(HttpStatusCode statusCode, string message)
281274
{

test/SeqCli.EndToEnd/Support/CaptiveProcess.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,11 +111,11 @@ public int WaitForExit(TimeSpan? timeout = null)
111111

112112
if (_captureOutput)
113113
{
114-
if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(1)))
115-
throw new IOException("STDOUT did not complete in the fixed 1 second window.");
114+
if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(5)))
115+
throw new IOException("STDOUT did not complete in the fixed 5-second window.");
116116

117-
if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(1)))
118-
throw new IOException("STDERR did not complete in the fixed 1 second window.");
117+
if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(5)))
118+
throw new IOException("STDERR did not complete in the fixed 5-second window.");
119119
}
120120

121121
return _process.ExitCode;

0 commit comments

Comments
 (0)