Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="DynamicInstrumentation.cs" company="Datadog">
// <copyright file="DynamicInstrumentation.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>
Expand Down Expand Up @@ -656,7 +656,6 @@ public void Dispose()
.Add(_snapshotUploader)
.Add(_diagnosticsUploader)
.Add(_probeStatusPoller)
.Add(_dogStats)
.DisposeAll();
}
}
Expand Down
5 changes: 4 additions & 1 deletion tracer/src/Datadog.Trace/DogStatsd/IStatsdManager.cs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
// <copyright file="IStatsdManager.cs" company="Datadog">
// <copyright file="IStatsdManager.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

#nullable enable
using System;
using System.Threading.Tasks;
using Datadog.Trace.Vendors.StatsdClient;

namespace Datadog.Trace.DogStatsd;

internal interface IStatsdManager : IDisposable
{
Task DisposeAsync();

/// <summary>
/// Obtain a <see cref="StatsdManager.StatsdClientLease"/> for accessing a <see cref="IDogStatsd"/> instance.
/// The lease must be disposed after all references to the client have gone.
Expand Down
7 changes: 3 additions & 4 deletions tracer/src/Datadog.Trace/DogStatsd/NoOpStatsd.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// <copyright file="NoOpStatsd.cs" company="Datadog">
// <copyright file="NoOpStatsd.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>

using System;
using System.Threading.Tasks;
using Datadog.Trace.Vendors.StatsdClient;

namespace Datadog.Trace.DogStatsd
Expand Down Expand Up @@ -76,9 +77,7 @@ public void ServiceCheck(string name, Status status, int? timestamp = null, stri
{
}

public void Dispose()
{
}
public Task DisposeAsync() => Task.CompletedTask;

private sealed class NoOpTimer : IDisposable
{
Expand Down
74 changes: 62 additions & 12 deletions tracer/src/Datadog.Trace/DogStatsd/StatsdManager.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="StatsdManager.cs" company="Datadog">
// <copyright file="StatsdManager.cs" company="Datadog">
// Unless explicitly stated otherwise all files in this repository are licensed under the Apache 2 License.
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
// </copyright>
Expand Down Expand Up @@ -164,6 +164,13 @@ public void Dispose()
EnsureClient(ensureCreated: false, forceRecreate: true);
}

public async Task DisposeAsync()
{
_settingSubscription.Dispose();
// We swap out the client to make sure we do any flushes.
await EnsureClientAsync(ensureCreated: false, forceRecreate: true).ConfigureAwait(false);
}

// Internal for testing
internal static bool HasImpactingChanges(TracerSettings.SettingsManager.SettingChanges changes)
{
Expand Down Expand Up @@ -207,6 +214,29 @@ private void EnsureClient(bool ensureCreated, bool forceRecreate)
previous?.MarkClosing(); // will dispose when last lease releases
}

private async Task EnsureClientAsync(bool ensureCreated, bool forceRecreate)
{
StatsdClientHolder? previous;
Log.Debug("Recreating statsdClient: Create new client: {CreateClient}, Force recreate: {ForceRecreate}", ensureCreated, forceRecreate);

lock (_lock)
{
previous = _current;
if (ensureCreated && previous != null && !forceRecreate)
{
// Already created
return;
}

_current = ensureCreated ? _factory() : null;
}

if (previous is not null)
{
await previous.MarkClosingAsync().ConfigureAwait(false); // will dispose when last lease releases
}
}

internal readonly struct StatsdClientLease(StatsdClientHolder? holder) : IDisposable
{
private readonly StatsdClientHolder? _holder = holder;
Expand Down Expand Up @@ -300,23 +330,43 @@ public void MarkClosing()
}
}

public async Task MarkClosingAsync()
{
// Set the closing bit to ensure no more retention of client
#if NET6_0_OR_GREATER
Interlocked.Or(ref _state, ClosingBit);
#else
// Interlocked.Or is not available in < .NET 6, so have to emulate it
int state;
do
{
state = Volatile.Read(ref _state);
}
while (Interlocked.CompareExchange(ref _state, state | ClosingBit, state) != state);
#endif

// If ref count is 0 (i.e., state == ClosingBit), dispose now; else wait for Release() to reach 0
if ((Volatile.Read(ref _state) & int.MaxValue) == 0)
{
await DisposeAsync().ConfigureAwait(false);
}
}

private void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
Log.Debug("Disposing DogStatsdService");
Client.DisposeAsync().GetAwaiter().GetResult();
}
}

// We push this all to a background thread to avoid the disposes running in-line
// the DogStatsdService does sync-over-async, and this can cause thread exhaustion
_ = Task.Run(() =>
{
if (Client is DogStatsdService dogStatsd)
{
dogStatsd.Flush();
}

Client.Dispose();
});
private async Task DisposeAsync()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
Log.Debug("Disposing DogStatsdService");
await Client.DisposeAsync().ConfigureAwait(false);
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions tracer/src/Datadog.Trace/TracerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,10 @@ private static async Task CleanUpOldTracerManager(TracerManager oldManager, Trac
}

var statsdReplaced = false;
if (oldManager.Statsd != newManager.Statsd)
if (oldManager.Statsd != newManager.Statsd && oldManager.Statsd is { } statsd)
{
statsdReplaced = true;
oldManager.Statsd?.Dispose();
await statsd.DisposeAsync().ConfigureAwait(false);
}

var discoveryReplaced = false;
Expand Down Expand Up @@ -774,7 +774,10 @@ private static async Task RunShutdownTasksAsync(TracerManager instance, Timer he
}

instance.RuntimeMetrics?.Dispose();
instance.Statsd?.Dispose();
if (instance.Statsd is { } statsd)
{
await statsd.DisposeAsync().ConfigureAwait(false);
}

Log.Debug("Finished waiting for disposals.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace Datadog.Trace.Vendors.StatsdClient.Bufferize
/// <summary>
/// StatsBufferize bufferizes metrics before sending them.
/// </summary>
internal class StatsBufferize : IDisposable
internal class StatsBufferize
{
private readonly AsynchronousWorker<Stats> _worker;

Expand Down Expand Up @@ -49,10 +49,7 @@ public void Flush()
this._worker.Flush();
}

public void Dispose()
{
this._worker.Dispose();
}
public Task DisposeAsync() => this._worker.DisposeAsync();

private class WorkerHandler : IAsynchronousWorkerHandler<Stats>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
//------------------------------------------------------------------------------
using System;
using System.Globalization;
using System.Threading.Tasks;
using Datadog.Trace.Vendors.StatsdClient.Bufferize;

namespace Datadog.Trace.Vendors.StatsdClient
Expand All @@ -12,7 +13,7 @@ namespace Datadog.Trace.Vendors.StatsdClient
/// DogStatsdService is a <a href="https://docs.datadoghq.com/developers/dogstatsd/?tab=net">DogStatsD client</a>.
/// Dispose must be called to flush all the metrics.
/// </summary>
internal class DogStatsdService : IDogStatsd, IDisposable
internal class DogStatsdService : IDogStatsd
{
private StatsdBuilder _statsdBuilder = new StatsdBuilder(new StatsBufferizeFactory());
private MetricsSender _metricsSender;
Expand Down Expand Up @@ -258,10 +259,14 @@ public void Flush()
/// Disposes an instance of DogStatsdService.
/// Flushes all metrics.
/// </summary>
public void Dispose()
public async Task DisposeAsync()
{
_statsdData?.Dispose();
_statsdData = null;
var statsd = _statsdData;
if (statsd is not null)
{
_statsdData = null;
await statsd.DisposeAsync().ConfigureAwait(false);
}
}
}
}
8 changes: 0 additions & 8 deletions tracer/src/Datadog.Trace/Vendors/StatsdClient/Dogstatsd.cs
Original file line number Diff line number Diff line change
Expand Up @@ -239,13 +239,5 @@ public static void Flush()
_dogStatsdService.Flush();
}

/// <summary>
/// Disposes the instance of DogStatsdService.
/// Flushes all metrics.
/// </summary>
public static void Dispose()
{
_dogStatsdService.Dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
// This file was automatically generated by the UpdateVendors tool.
//------------------------------------------------------------------------------
using System;
using System.Threading.Tasks;

namespace Datadog.Trace.Vendors.StatsdClient
{
/// <summary>
/// IDogStatsd is an interface over DogStatsdService.
/// </summary>
internal interface IDogStatsd : IDisposable
internal interface IDogStatsd
{
/// <summary>
/// Gets the telemetry counters
Expand Down Expand Up @@ -174,5 +175,7 @@ void ServiceCheck(
string hostname = null,
string[] tags = null,
string message = null);

Task DisposeAsync();
}
}
13 changes: 9 additions & 4 deletions tracer/src/Datadog.Trace/Vendors/StatsdClient/StatsdData.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
// This file was automatically generated by the UpdateVendors tool.
//------------------------------------------------------------------------------
using System;
using System.Threading.Tasks;
using Datadog.Trace.Vendors.StatsdClient.Bufferize;
using Datadog.Trace.Vendors.StatsdClient.Transport;

namespace Datadog.Trace.Vendors.StatsdClient
{
internal class StatsdData : IDisposable
internal class StatsdData
{
private ITransport _transport;
private StatsBufferize _statsBufferize;
Expand All @@ -35,16 +36,20 @@ public void Flush()
Telemetry.Flush();
}

public void Dispose()
public async Task DisposeAsync()
{
// _statsBufferize and _telemetry must be disposed before _statsSender to make
// sure _statsSender does not received data when it is already disposed.

Telemetry?.Dispose();
Telemetry = null;

_statsBufferize?.Dispose();
_statsBufferize = null;
var statsBufferize = _statsBufferize;
if (statsBufferize is not null)
{
_statsBufferize = null;
await statsBufferize.DisposeAsync().ConfigureAwait(false);
}

_transport?.Dispose();
_transport = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ internal class NamedPipeTransport : ITransport

public NamedPipeTransport(string pipeName, TimeSpan? timeout = null)
{
_namedPipe = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.Asynchronous);
_namedPipe = new NamedPipeClientStream(".", pipeName, PipeDirection.Out, PipeOptions.None);
_timeout = timeout ?? TimeSpan.FromSeconds(2);
}

Expand Down Expand Up @@ -65,16 +65,12 @@ private bool SendBuffer(byte[] buffer, int length, bool allowRetry)

try
{
// WriteAsync overload with a CancellationToken instance seems to not work.
return _namedPipe.WriteAsync(buffer, 0, length).Wait(_timeout);
_namedPipe.Write(buffer, 0, length);
return true;
}
catch (IOException)
{
}
catch (AggregateException e) when (e.InnerException is IOException)
{
// dotnet6.0 raises AggregateException when an IOException occurs.
}

// When the server disconnects, IOException is raised with the message "Pipe is broken".
// In this case, we try to reconnect once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ namespace Datadog.Trace.Vendors.StatsdClient.Worker
/// AsynchronousWorker performs tasks asynchronously.
/// `handler` must be thread safe if `workerThreadCount` > 1.
/// </summary>
internal class AsynchronousWorker<T> : IDisposable
internal class AsynchronousWorker<T>
{
private static TimeSpan maxWaitDurationInFlush = TimeSpan.FromSeconds(3);
private readonly ConcurrentBoundedQueue<T> _queue;
Expand Down Expand Up @@ -75,19 +75,15 @@ public void Flush()
_flushEvent.WaitOne(maxWaitDurationInFlush);
}

public void Dispose()
public async Task DisposeAsync()
{
if (!_terminate)
{
Flush();
_terminate = true;
try
{
foreach (var worker in _workers)
{
worker.Wait();
}

await Task.WhenAll(_workers).ConfigureAwait(false);
_flushEvent.Dispose();
}
catch (Exception e)
Expand Down
Loading
Loading