Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace ConcurrentQueue<T> in BackgroundWorker #3355

Merged
merged 18 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@

- InApp includes/excludes can now be configured using regular expressions ([#3321](https://github.com/getsentry/sentry-dotnet/pull/3321))

### Fixes

- Fixed memory leak in BackgroundWorker observed when using Sentry with Quartz and MySql ([#3355](https://github.com/getsentry/sentry-dotnet/pull/3355))

### Dependencies

- Bump CLI from v2.31.0 to v2.31.2 ([#3342](https://github.com/getsentry/sentry-dotnet/pull/3342), [#3345](https://github.com/getsentry/sentry-dotnet/pull/3345))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
```

BenchmarkDotNet v0.13.12, macOS Sonoma 14.4.1 (23E224) [Darwin 23.4.0]
Apple M2 Max, 1 CPU, 12 logical and 12 physical cores
.NET SDK 8.0.204
[Host] : .NET 6.0.22 (6.0.2223.42425), Arm64 RyuJIT AdvSIMD
DefaultJob : .NET 6.0.22 (6.0.2223.42425), Arm64 RyuJIT AdvSIMD


```
| Method | N | Mean | Error | StdDev | Gen0 | Gen1 | Gen2 | Allocated |
|------------------------- |----- |---------:|----------:|----------:|----------:|---------:|---------:|----------:|
| ConcurrentQueueLiteAsync | 1000 | 3.377 ms | 0.0782 ms | 0.2305 ms | 1054.6875 | 476.5625 | 31.2500 | 2.18 MB |
| ConcurrentQueueAsync | 1000 | 3.574 ms | 0.0741 ms | 0.2172 ms | 1066.4063 | 519.5313 | 144.5313 | 2.21 MB |
57 changes: 57 additions & 0 deletions benchmarks/Sentry.Benchmarks/ConcurrentQueueBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
using BenchmarkDotNet.Attributes;
using Sentry.Internal;
using Sentry.Protocol.Envelopes;

namespace Sentry.Benchmarks;

public class ConcurrentQueueBenchmarks
{
[Params(1000)]
public int N;

[Benchmark]
public async Task ConcurrentQueueLiteAsync()
{
ConcurrentQueueLite<Envelope> queue = new();
List<Task> tasks = new();
for (var i = 0; i < N; i++)
{
tasks.Add(Task.Run(() =>
{
queue.Enqueue(Envelope.FromEvent(new SentryEvent()));
}));
tasks.Add(Task.Run(() =>
{
if (queue.TryPeek(out var item))
{
queue.TryDequeue(out _);
}
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
}));
}
await Task.WhenAll(tasks);
queue.Clear();
}

[Benchmark]
public async Task ConcurrentQueueAsync()
{
ConcurrentQueue<Envelope> queue = new();
List<Task> tasks = new();
for (var i = 0; i < N; i++)
{
tasks.Add(Task.Run(() =>
{
queue.Enqueue(Envelope.FromEvent(new SentryEvent()));
}));
tasks.Add(Task.Run(() =>
{
if (queue.TryPeek(out var item))
{
queue.TryDequeue(out _);
}
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
}));
}
await Task.WhenAll(tasks);
queue.Clear();
}
}
7 changes: 4 additions & 3 deletions src/Sentry/Internal/BackgroundWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ internal class BackgroundWorker : IBackgroundWorker, IDisposable
{
private readonly ITransport _transport;
private readonly SentryOptions _options;
private readonly ConcurrentQueue<Envelope> _queue;
private readonly ConcurrentQueueLite<Envelope> _queue;
private readonly int _maxItems;
private readonly CancellationTokenSource _shutdownSource;
private readonly SemaphoreSlim _queuedEnvelopeSemaphore;
Expand All @@ -27,11 +27,11 @@ public BackgroundWorker(
ITransport transport,
SentryOptions options,
CancellationTokenSource? shutdownSource = null,
ConcurrentQueue<Envelope>? queue = null)
ConcurrentQueueLite<Envelope>? queue = null)
{
_transport = transport;
_options = options;
_queue = queue ?? new ConcurrentQueue<Envelope>();
_queue = queue ?? new ConcurrentQueueLite<Envelope>();
_maxItems = options.MaxQueueItems;
_shutdownSource = shutdownSource ?? new CancellationTokenSource();
_queuedEnvelopeSemaphore = new SemaphoreSlim(0, _maxItems);
Expand Down Expand Up @@ -191,6 +191,7 @@ private async Task DoWorkAsync()
}
catch (Exception e)
{
_queue.Clear();
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
_options.LogFatal(e, "Exception in the background worker.");
throw;
}
Expand Down
64 changes: 64 additions & 0 deletions src/Sentry/Internal/ConcurrentQueueLite.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
namespace Sentry.Internal;

/// <summary>
/// A minimal replacement for <see cref="ConcurrentQueue{T}"/>
///
/// We're using this due to a memory leak that happens when using ConcurrentQueue in the BackgroundWorker.
/// See https://github.com/getsentry/sentry-dotnet/issues/2516
/// </summary>
internal class ConcurrentQueueLite<T>
{
private readonly List<T> _queue = new();
private int _listCounter = 0;

public void Enqueue(T item)
{
lock (_queue)
{
_queue.Add(item);
_listCounter++;
}
}
public bool TryDequeue([NotNullWhen(true)] out T? item)
{
lock (_queue)
{
if (_listCounter > 0)
{
item = _queue[0]!;
_queue.RemoveAt(0);
_listCounter--;
return true;
}
}
item = default;
return false;
}

public int Count => _listCounter;

public bool IsEmpty => _listCounter == 0;
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved

public void Clear()
{
lock (_queue)
{
_queue.Clear();
_listCounter = 0;
}
}

public bool TryPeek([NotNullWhen(true)] out T? item)
{
lock (_queue)
{
if (_listCounter > 0)
{
item = _queue[0]!;
return true;
}
}
item = default;
return false;
}
}
12 changes: 6 additions & 6 deletions test/Sentry.Tests/Internals/BackgroundWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ private class Fixture
public IClientReportRecorder ClientReportRecorder { get; private set; } = Substitute.For<IClientReportRecorder>();
public ITransport Transport { get; set; } = Substitute.For<ITransport>();
public IDiagnosticLogger Logger { get; set; }
public ConcurrentQueue<Envelope> Queue { get; set; } = new();
public ConcurrentQueueLite<Envelope> Queue { get; set; } = new();
public CancellationTokenSource CancellationTokenSource { get; set; } = new();
public SentryOptions SentryOptions { get; set; } = new();

Expand Down Expand Up @@ -157,7 +157,7 @@ public void Dispose_EventQueuedZeroShutdownTimeout_CantEmptyQueueBeforeShutdown(
Assert.Equal(TaskStatus.RanToCompletion, sut.WorkerTask.Status);

// Worker was stopped before queue could be emptied.
Assert.NotEmpty(_fixture.Queue);
_fixture.Queue.IsEmpty.Should().BeFalse();
}

[Fact]
Expand All @@ -182,7 +182,7 @@ public void Dispose_EventQueuedDefaultShutdownTimeout_EmptiesQueueBeforeShutdown
Assert.Equal(TaskStatus.RanToCompletion, sut.WorkerTask.Status);

// Worker was given time to empty before it was stopped.
Assert.Empty(_fixture.Queue);
_fixture.Queue.IsEmpty.Should().BeTrue();

// We should not have used the entire shutdown timeout period.
sw.Elapsed.Should().BeLessThan(_fixture.SentryOptions.ShutdownTimeout,
Expand Down Expand Up @@ -362,15 +362,15 @@ public async Task FlushAsync_SingleEvent_FlushReturnsAfterEventSent()
sut.EnqueueEnvelope(envelope, process: false);

var flushTask = sut.FlushAsync(Timeout.InfiniteTimeSpan);
Assert.Single(_fixture.Queue); // Event being processed
_fixture.Queue.Count.Should().Be(1); // Event being processed

// Release the item and flush
sut.ProcessQueuedItems(1);
await flushTask;

// Assert
_fixture.Logger.Received(1).Log(SentryLevel.Debug, "Successfully flushed all events up to call to FlushAsync.");
Assert.Empty(_fixture.Queue);
_fixture.Queue.IsEmpty.Should().BeTrue();
}

[Fact]
Expand Down Expand Up @@ -407,7 +407,7 @@ public async Task FlushAsync_FullQueue_RespectsTimeout()
sw.Stop();

_fixture.Logger.Received(1).Log(SentryLevel.Debug, "Timeout when trying to flush queue.");
Assert.Single(_fixture.Queue); // Only the item being processed at the blocked callback
_fixture.Queue.Count.Should().Be(1); // Only the item being processed at the blocked callback

// Test the timeout
sw.Elapsed.Should().BeGreaterThan(flushTimeout);
Expand Down
141 changes: 141 additions & 0 deletions test/Sentry.Tests/Internals/ConcurrentQueueLiteTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
namespace Sentry.Tests.Internals;

public class ConcurrentQueueLiteTests
{
[Fact]
public void Enqueue_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();

// Act
queue.Enqueue(1);

// Assert
queue.Count.Should().Be(1);
}

[Fact]
public void TryDequeue_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();
queue.Enqueue(1);

// Act
var result = queue.TryDequeue(out var dequeuedItem);

// Assert
result.Should().BeTrue();
dequeuedItem.Should().Be(1);
queue.Count.Should().Be(0);
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void TryDequeue_EmptyQueue_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();

// Act
var result = queue.TryDequeue(out var dequeuedItem);

// Assert
result.Should().BeFalse();
dequeuedItem.Should().Be(default(int));
}

[Fact]
public void Count_EmptyQueue_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();

// Act & Assert
queue.Count.Should().Be(0);
}

[Fact]
public void IsEmpty_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();

// Act & Assert
queue.IsEmpty.Should().BeTrue();
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void Clear_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();
queue.Enqueue(1);

// Act
queue.Clear();

// Assert
queue.Count.Should().Be(0);
jamescrosswell marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void TryPeek_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();
queue.Enqueue(1);

// Act
var result = queue.TryPeek(out var peekedItem);

// Assert
result.Should().BeTrue();
peekedItem.Should().Be(1);
queue.Count.Should().Be(1);
}

[Fact]
public void TryPeek_EmptyQueue_Test()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();

// Act
var result = queue.TryPeek(out var peekedItem);

// Assert
result.Should().BeFalse();
peekedItem.Should().Be(default(int));
}

[Fact]
public async Task TestConcurrency()
{
// Arrange
var queue = new ConcurrentQueueLite<int>();
var count = 100;
var tasks = new Task[count * 2];
var received = 0;

// Act
for (var i = 0; i < count; i++)
{
var toAdd = i;
tasks[i] = Task.Run(() =>
{
queue.Enqueue(toAdd);
Interlocked.Increment(ref received);
});
tasks[i + count] = Task.Run(() =>
{
queue.TryDequeue(out _);
Interlocked.Increment(ref received);
});
}
await Task.WhenAll(tasks);

// Assert
received.Should().Be(count * 2);
}
}