Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions TUnit.OpenTelemetry.Tests/OtlpReceiverIngestionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -202,28 +202,27 @@ public async Task Receiver_DrainAsync_WaitsForLatePostBeforeReturning()
// Simulate a SUT exporter that flushes a couple hundred ms after the test logic
// would finish — without DrainAsync, AspireFixture would tear down the AppHost
// and the late POST would fail / be dropped.
var latePostCompleted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var latePost = Task.Run(async () =>
{
await Task.Delay(TimeSpan.FromMilliseconds(200));
using var client = new HttpClient();
using var content = new ByteArrayContent(Array.Empty<byte>());
await client.PostAsync($"http://127.0.0.1:{receiver.Port}/v1/traces", content);
latePostCompleted.SetResult(true);
});

var drainStart = DateTime.UtcNow;
await receiver.DrainAsync(TimeSpan.FromSeconds(3));
var drainElapsed = DateTime.UtcNow - drainStart;

await latePost;

// Asserts the real contract directly: drain returned only after the late POST
// had been issued and acknowledged. Wall-clock floors (the previous approach)
// failed on macOS arm64 because Task.Delay scheduling jitter could push the
// late POST inside the synchronous prefix of DrainAsync, making the elapsed
// measurement misrepresent the actual invariant the drain must hold.
await Assert.That(latePostCompleted.Task.IsCompleted).IsTrue();
await Assert.That(receiver.Diagnostics.TracesRequests).IsEqualTo(1);
// The drain must have waited past the first 250ms stable window — otherwise the
// 200ms-delayed POST would have landed after the drain returned. Lower bound is
// 350ms to leave headroom for CI scheduling jitter; the real invariant is "drain
// didn't return at ~250ms".
await Assert.That(drainElapsed).IsGreaterThanOrEqualTo(TimeSpan.FromMilliseconds(350));
// And it must respect the cap — no point waiting indefinitely once quiet.
await Assert.That(drainElapsed).IsLessThan(TimeSpan.FromSeconds(3));

await latePost;
}

[Test]
Expand Down
17 changes: 16 additions & 1 deletion TUnit.OpenTelemetry/Receiver/OtlpReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ public async Task DrainAsync(TimeSpan? window = null, CancellationToken cancella
var totalWindow = window ?? DefaultDrainWindow;
var clock = Stopwatch.StartNew();

// A request that's been sent over TCP but not yet pulled by GetContextAsync is
// invisible to both _inflightTasks and _diagnostics.TotalRequests — there's no
// hook between kernel TCP queue and HttpListener's accept loop. A single 250ms
// idle window can therefore return while a request is still on the wire. Require
// two consecutive idle windows (~500ms) so an in-transit POST has a chance to
// surface before drain declares quiet.
var consecutiveIdleWindows = 0;

while (!cancellationToken.IsCancellationRequested)
{
var beforeCount = Volatile.Read(ref _diagnostics.TotalRequests);
Expand Down Expand Up @@ -188,7 +196,14 @@ await Task.Delay(stableFor < remaining ? stableFor : remaining, cancellationToke
var afterCount = Volatile.Read(ref _diagnostics.TotalRequests);
if (afterCount == beforeCount && _inflightTasks.IsEmpty)
{
return;
if (++consecutiveIdleWindows >= 2)
{
return;
}
}
else
{
consecutiveIdleWindows = 0;
}
}
}
Expand Down
Loading