Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/SeqCli/Forwarder/Channel/ForwardingChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

using System;
using System.Buffers;
using System.IO;
using System.Threading;
using System.Threading.Channels;
Expand Down Expand Up @@ -73,6 +74,8 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
{
entry.CompletionSource.TrySetException(e);
}

ArrayPool<byte>.Shared.Return(entry.Data.Array!);
}
}
catch (Exception ex)
Expand Down Expand Up @@ -124,12 +127,14 @@ public ForwardingChannel(BufferAppender appender, BufferReader reader, Bookmark
}, cancellationToken: hardCancel);
}

public async Task WriteAsync(byte[] storage, Range range, CancellationToken cancellationToken)
public async Task WriteAsync(ArraySegment<byte> data, CancellationToken cancellationToken)
{
var tcs = new TaskCompletionSource();
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _hardCancel);

await _writer.WriteAsync(new ForwardingChannelEntry(storage[range], tcs), cts.Token);
var copyBuffer = ArrayPool<byte>.Shared.Rent(data.Count);
data.AsSpan().CopyTo(copyBuffer.AsSpan());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These data segments are likely to be much shorter than the max-sized buffer we read incoming requests into.

await _writer.WriteAsync(new ForwardingChannelEntry(new ArraySegment<byte>(copyBuffer, 0, data.Count), tcs), cts.Token);
await tcs.Task;
}

Expand Down
1 change: 1 addition & 0 deletions src/SeqCli/Forwarder/Channel/ForwardingChannelEntry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@

namespace SeqCli.Forwarder.Channel;

// Note, Data is backed by a rented array that the receiver should return.
readonly record struct ForwardingChannelEntry(ArraySegment<byte> Data, TaskCompletionSource CompletionSource);
65 changes: 29 additions & 36 deletions src/SeqCli/Forwarder/Web/Api/IngestionEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,67 +70,69 @@ async Task<IResult> IngestAsync(HttpContext context)

async Task<IResult> IngestCompactFormatAsync(HttpContext context)
{
byte[]? rented = null;

try
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
using var cts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted);
cts.CancelAfter(TimeSpan.FromSeconds(5));

var requestApiKey = GetApiKey(context.Request);
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);
var log = _forwardingChannels.GetForwardingChannel(requestApiKey);

// Add one for the extra newline that we have to insert at the end of batches.
var bufferSize = _config.Connection.BatchSizeLimitBytes + 1;
var rented = ArrayPool<byte>.Shared.Rent(bufferSize);
var buffer = rented[..bufferSize];
rented = ArrayPool<byte>.Shared.Rent(bufferSize);
var buffer = new ArraySegment<byte>(rented, 0, bufferSize);
var writeHead = 0;
var readHead = 0;

var done = false;
while (!done)
{
// Fill the memory buffer from as much of the incoming request payload as possible; buffering in memory increases the
// size of write batches.
while (!done)
{
var remaining = buffer.Length - 1 - writeHead;
var remaining = buffer.Count - 1 - writeHead;
if (remaining == 0)
{
IngestionLog.ForClient(context.Connection.RemoteIpAddress)
.Error("An incoming request exceeded the configured batch size limit");
return Error(HttpStatusCode.RequestEntityTooLarge, "the request is too large to process");
}

var read = await context.Request.Body.ReadAsync(buffer.AsMemory(writeHead, remaining), cts.Token);
if (read == 0)
{
done = true;
}

writeHead += read;

// Ingested batches must be terminated with `\n`, but this isn't an API requirement.
if (done && writeHead > 0 && writeHead < buffer.Length && buffer[writeHead - 1] != (byte)'\n')
if (done && writeHead > 0 && writeHead < buffer.Count && buffer[writeHead - 1] != (byte)'\n')
{
buffer[writeHead] = (byte)'\n';
writeHead += 1;
}
}

// Validate what we read, marking out a batch of one or more complete newline-delimited events.
var batchStart = readHead;
var batchEnd = readHead;
while (batchEnd < writeHead)
{
var eventStart = batchEnd;
var nlIndex = buffer.AsSpan()[eventStart..].IndexOf((byte)'\n');

if (nlIndex == -1)
{
break;
}

var eventEnd = eventStart + nlIndex + 1;

batchEnd = eventEnd;
readHead = batchEnd;

Expand All @@ -142,12 +144,12 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
return Error(HttpStatusCode.BadRequest, $"Payload validation failed: {error}.");
}
}

if (batchStart != batchEnd)
{
await Write(log, ArrayPool<byte>.Shared, buffer, batchStart..batchEnd, cts.Token);
await log.WriteAsync(buffer[batchStart..batchEnd], cts.Token);
}

// Copy any unprocessed data into our buffer and continue
if (!done && readHead != 0)
{
Expand All @@ -157,10 +159,7 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
writeHead = retain;
}
}

// Exception cases are handled by `Write`
ArrayPool<byte>.Shared.Return(rented);


return SuccessfulIngestion();
}
catch (Exception ex)
Expand All @@ -169,6 +168,13 @@ async Task<IResult> IngestCompactFormatAsync(HttpContext context)
.Error(ex, "Ingestion failed");
return Error(HttpStatusCode.InternalServerError, "Ingestion failed.");
}
finally
{
if (rented != null)
{
ArrayPool<byte>.Shared.Return(rented);
}
}
}

static bool DefaultedBoolQuery(HttpRequest request, string queryParameterName)
Expand Down Expand Up @@ -263,19 +269,6 @@ bool ValidateClef(Span<byte> evt, [NotNullWhen(false)] out string? errorFragment
errorFragment = null;
return true;
}

static async Task Write(ForwardingChannel forwardingChannel, ArrayPool<byte> pool, byte[] storage, Range range, CancellationToken cancellationToken)
{
try
{
await forwardingChannel.WriteAsync(storage, range, cancellationToken);
}
catch
{
pool.Return(storage);
throw;
}
}

static IResult Error(HttpStatusCode statusCode, string message)
{
Expand Down
8 changes: 4 additions & 4 deletions test/SeqCli.EndToEnd/Support/CaptiveProcess.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ public int WaitForExit(TimeSpan? timeout = null)

if (_captureOutput)
{
if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(1)))
throw new IOException("STDOUT did not complete in the fixed 1 second window.");
if (!_outputComplete.WaitOne(TimeSpan.FromSeconds(5)))
throw new IOException("STDOUT did not complete in the fixed 5-second window.");

if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(1)))
throw new IOException("STDERR did not complete in the fixed 1 second window.");
if (!_errorComplete.WaitOne(TimeSpan.FromSeconds(5)))
throw new IOException("STDERR did not complete in the fixed 5-second window.");
}

return _process.ExitCode;
Expand Down