Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@
"env_var": "DD_TELEMETRY_HEARTBEAT_INTERVAL",
"const_name": "HeartbeatIntervalSeconds"
},
{
"env_var": "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL",
"const_name": "ExtendedHeartbeatIntervalSeconds"
},
{
"env_var": "DD_TELEMETRY_LOG_COLLECTION_ENABLED",
"const_name": "TelemetryLogsEnabled"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,11 @@ DD_TELEMETRY_HEARTBEAT_INTERVAL: |
For testing purposes. Defaults to 60
<see cref="Datadog.Trace.Telemetry.TelemetrySettings.HeartbeatInterval"/>

DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL: |
Configuration key for how often extended heartbeat telemetry should be sent, in seconds.
For testing purposes. Defaults to 86400 (24 hours)
<see cref="Datadog.Trace.Telemetry.TelemetrySettings.ExtendedHeartbeatInterval"/>

DD_TELEMETRY_LOG_COLLECTION_ENABLED: |
Configuration key for whether to enable redacted error log collection.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,14 @@
"product": "Telemetry"
}
],
"DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL": [
{
"implementation": "B",
"type": "decimal",
"default": "86400.0",
"product": "Telemetry"
}
],
"DD_TELEMETRY_LOG_COLLECTION_ENABLED": [
{
"implementation": "A",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ internal static class Telemetry
/// </summary>
public const string HeartbeatIntervalSeconds = "DD_TELEMETRY_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for how often extended heartbeat telemetry should be sent, in seconds.
/// For testing purposes. Defaults to 86400 (24 hours)
/// <see cref="Datadog.Trace.Telemetry.TelemetrySettings.ExtendedHeartbeatInterval"/>
/// </summary>
public const string ExtendedHeartbeatIntervalSeconds = "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for whether to enable redacted error log collection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ internal static class Telemetry
/// </summary>
public const string HeartbeatIntervalSeconds = "DD_TELEMETRY_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for how often extended heartbeat telemetry should be sent, in seconds.
/// For testing purposes. Defaults to 86400 (24 hours)
/// <see cref="Datadog.Trace.Telemetry.TelemetrySettings.ExtendedHeartbeatInterval"/>
/// </summary>
public const string ExtendedHeartbeatIntervalSeconds = "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for whether to enable redacted error log collection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ internal static class Telemetry
/// </summary>
public const string HeartbeatIntervalSeconds = "DD_TELEMETRY_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for how often extended heartbeat telemetry should be sent, in seconds.
/// For testing purposes. Defaults to 86400 (24 hours)
/// <see cref="Datadog.Trace.Telemetry.TelemetrySettings.ExtendedHeartbeatInterval"/>
/// </summary>
public const string ExtendedHeartbeatIntervalSeconds = "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for whether to enable redacted error log collection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ internal static class Telemetry
/// </summary>
public const string HeartbeatIntervalSeconds = "DD_TELEMETRY_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for how often extended heartbeat telemetry should be sent, in seconds.
/// For testing purposes. Defaults to 86400 (24 hours)
/// <see cref="Datadog.Trace.Telemetry.TelemetrySettings.ExtendedHeartbeatInterval"/>
/// </summary>
public const string ExtendedHeartbeatIntervalSeconds = "DD_TELEMETRY_EXTENDED_HEARTBEAT_INTERVAL";

/// <summary>
/// Configuration key for whether to enable redacted error log collection.
/// </summary>
Expand Down
77 changes: 71 additions & 6 deletions tracer/src/Datadog.Trace/Telemetry/TelemetryController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ internal TelemetryController(
IMetricsTelemetryCollector metrics,
RedactedErrorLogCollector? redactedErrorLogs,
TelemetryTransportManager transportManager,
TimeSpan flushInterval)
TimeSpan flushInterval,
TimeSpan extendedHeartbeatInterval)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_dependencies = dependencies ?? throw new ArgumentNullException(nameof(dependencies));
Expand All @@ -67,7 +68,7 @@ internal TelemetryController(
// We use Task.Delay(Timeout.Infinite) here as "a Task that never completes".
// It simplifies some of the logic we need to do in the scheduler
var redactedErrorLogsTask = () => _redactedErrorLogs?.WaitForLogsAsync() ?? Task.Delay(Timeout.Infinite);
_scheduler = new(flushInterval, redactedErrorLogsTask, _processExit);
_scheduler = new(flushInterval, extendedHeartbeatInterval, redactedErrorLogsTask, _processExit);

try
{
Expand Down Expand Up @@ -280,6 +281,11 @@ private async Task PushTelemetryLoopAsync()
await PushTelemetry(includeLogs: _scheduler.ShouldFlushRedactedErrorLogs, sendAppClosing: isFinalPush).ConfigureAwait(false);
}

if (_isStarted && _scheduler.ShouldSendExtendedHeartbeat)
{
await PushExtendedHeartbeat().ConfigureAwait(false);
}

if (isFinalPush)
{
Log.Debug("Process exit requested, ending telemetry loop");
Expand Down Expand Up @@ -338,6 +344,35 @@ private async Task PushTelemetry(bool includeLogs, bool sendAppClosing)
}
}

private async Task PushExtendedHeartbeat()
{
try
{
var application = _application.GetApplicationData();
var host = _application.GetHostData();
if (application is null || host is null)
{
Log.Debug("Telemetry not initialized, skipping extended heartbeat");
return;
}

var data = _dataBuilder.BuildExtendedHeartbeatData(
application,
host,
_configuration.GetData(),
_dependencies.GetFullData(),
_integrations.GetFullData(),
_namingVersion);

Log.Debug("Pushing extended heartbeat telemetry");
await _transportManager.TryPushTelemetry(data).ConfigureAwait(false);
}
catch (Exception ex)
{
Log.Warning(ex, "Error pushing extended heartbeat telemetry");
}
}

private readonly struct WorkItem
{
public WorkItem(ItemType type, object? state)
Expand Down Expand Up @@ -439,6 +474,7 @@ internal sealed class Scheduler
private const int ProcessTaskIndex = 1;
private const int InitializationTaskIndex = 2;
private const int LogQueueSizeTaskIndex = 3;
private const int ExtendedHeartbeatTaskIndex = 4;

private readonly TaskCompletionSource<bool> _tracerInitialized = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<bool> _processExitSource;
Expand All @@ -447,31 +483,36 @@ internal sealed class Scheduler
private readonly IClock _clock;
private readonly IDelayFactory _delayFactory;
private TimeSpan _flushInterval;
private TimeSpan _extendedHeartbeatInterval;
private DateTime _lastFlush;
private DateTime _lastExtendedHeartbeat;
private bool _initializationFlushExecuted = false;

public Scheduler(TimeSpan flushInterval, Func<Task> logQueueTaskGenerator, TaskCompletionSource<bool> processExitSource)
: this(flushInterval, logQueueTaskGenerator, processExitSource, new Clock(), new DelayFactory())
public Scheduler(TimeSpan flushInterval, TimeSpan extendedHeartbeatInterval, Func<Task> logQueueTaskGenerator, TaskCompletionSource<bool> processExitSource)
: this(flushInterval, extendedHeartbeatInterval, logQueueTaskGenerator, processExitSource, new Clock(), new DelayFactory())
{
}

// For testing only
public Scheduler(TimeSpan flushInterval, Func<Task> logQueueTaskGenerator, TaskCompletionSource<bool> processExitSource, IClock clock, IDelayFactory delayFactory)
public Scheduler(TimeSpan flushInterval, TimeSpan extendedHeartbeatInterval, Func<Task> logQueueTaskGenerator, TaskCompletionSource<bool> processExitSource, IClock clock, IDelayFactory delayFactory)
{
_clock = clock;
_delayFactory = delayFactory;
_processExitSource = processExitSource;
_flushInterval = flushInterval;
_extendedHeartbeatInterval = extendedHeartbeatInterval;
_logQueueTaskGenerator = logQueueTaskGenerator;
ShouldFlushTelemetry = false; // wait for initialization before flushing metrics
_lastFlush = _clock.UtcNow;
_lastExtendedHeartbeat = _clock.UtcNow;

// Using a task array instead of overloads to avoid allocating the array every loop
_tasks = new Task[4];
_tasks = new Task[5];
_tasks[DelayTaskIndex] = Task.CompletedTask; // Replaced on first iteration of WaitForNextInterval(), but ensures there's no nulls around
_tasks[ProcessTaskIndex] = processExitSource.Task;
_tasks[InitializationTaskIndex] = _tracerInitialized.Task;
_tasks[LogQueueSizeTaskIndex] = _logQueueTaskGenerator();
_tasks[ExtendedHeartbeatTaskIndex] = Task.Delay(Timeout.Infinite);
}

public interface IDelayFactory
Expand All @@ -483,6 +524,8 @@ public interface IDelayFactory

public bool ShouldFlushRedactedErrorLogs { get; private set; }

public bool ShouldSendExtendedHeartbeat { get; private set; }

public void SetFlushInterval(TimeSpan flushInterval)
{
_flushInterval = flushInterval;
Expand All @@ -499,6 +542,7 @@ public async Task WaitForNextInterval()
// take a long time to push telemetry if the network is slow or faulty

var nextFlush = _lastFlush.Add(_flushInterval);
var nextExtendedHeartbeat = _lastExtendedHeartbeat.Add(_extendedHeartbeatInterval);

// Note that we don't start flushing until initialized, so using infinite delay initially
TimeSpan? waitPeriod = _initializationFlushExecuted
Expand All @@ -516,6 +560,13 @@ public async Task WaitForNextInterval()
// if we don't have a wait period, it's because we're waiting for initialization
_tasks[DelayTaskIndex] = _delayFactory.Delay(waitPeriod ?? Timeout.InfiniteTimeSpan);
_tasks[LogQueueSizeTaskIndex] = logFlushTask;

// Set up extended heartbeat timer
var extendedHeartbeatWait = _initializationFlushExecuted
? nextExtendedHeartbeat - _clock.UtcNow
: Timeout.InfiniteTimeSpan;
_tasks[ExtendedHeartbeatTaskIndex] = _delayFactory.Delay(extendedHeartbeatWait);

await Task.WhenAny(_tasks).ConfigureAwait(false);
}

Expand All @@ -524,12 +575,14 @@ public async Task WaitForNextInterval()
// end of the line, flush everything, don't bother recalculating;
ShouldFlushTelemetry = true;
ShouldFlushRedactedErrorLogs = true;
ShouldSendExtendedHeartbeat = true;
return;
}

// Reset variables
ShouldFlushTelemetry = false;
ShouldFlushRedactedErrorLogs = false;
ShouldSendExtendedHeartbeat = false;
var now = _clock.UtcNow;

// Should we flush telemetry?
Expand All @@ -540,6 +593,8 @@ public async Task WaitForNextInterval()
ShouldFlushTelemetry = true;
// Including logs as we typically log a lot at startup
ShouldFlushRedactedErrorLogs = true;
// Send first extended heartbeat on startup
ShouldSendExtendedHeartbeat = true;
// replace the tracerInitializedTask with a task that never completes
_tasks[InitializationTaskIndex] = Task.Delay(Timeout.Infinite);
}
Expand All @@ -556,10 +611,20 @@ public async Task WaitForNextInterval()
ShouldFlushRedactedErrorLogs = true;
}

if (_initializationFlushExecuted && (nextExtendedHeartbeat <= now))
{
ShouldSendExtendedHeartbeat = true;
}

if (ShouldFlushTelemetry)
{
_lastFlush = now;
}

if (ShouldSendExtendedHeartbeat)
{
_lastExtendedHeartbeat = now;
}
}

private sealed class Clock : IClock
Expand Down
3 changes: 2 additions & 1 deletion tracer/src/Datadog.Trace/Telemetry/TelemetryFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@ private ITelemetryController CreateController(
Metrics,
_logs.IsValueCreated ? _logs.Value : null, // if we haven't created it by now, we don't need it
transportManager,
settings.HeartbeatInterval);
settings.HeartbeatInterval,
settings.ExtendedHeartbeatInterval);
}
}

Expand Down
10 changes: 10 additions & 0 deletions tracer/src/Datadog.Trace/Telemetry/TelemetrySettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public TelemetrySettings(
AgentlessSettings? agentlessSettings,
bool agentProxyEnabled,
TimeSpan heartbeatInterval,
TimeSpan extendedHeartbeatInterval,
bool dependencyCollectionEnabled,
bool metricsEnabled,
bool debugEnabled,
Expand All @@ -31,6 +32,7 @@ public TelemetrySettings(
Agentless = agentlessSettings;
AgentProxyEnabled = agentProxyEnabled;
HeartbeatInterval = heartbeatInterval;
ExtendedHeartbeatInterval = extendedHeartbeatInterval;
DependencyCollectionEnabled = dependencyCollectionEnabled;
MetricsEnabled = metricsEnabled;
DebugEnabled = debugEnabled;
Expand All @@ -49,6 +51,8 @@ public TelemetrySettings(

public TimeSpan HeartbeatInterval { get; }

public TimeSpan ExtendedHeartbeatInterval { get; }

public bool AgentProxyEnabled { get; }

public bool DependencyCollectionEnabled { get; }
Expand Down Expand Up @@ -141,6 +145,11 @@ public static TelemetrySettings FromSource(IConfigurationSource source, IConfigu
.AsDouble(defaultValue: 60, rawInterval => rawInterval is > 0 and <= 3600)
.Value;

var extendedHeartbeatInterval = config
.WithKeys(ConfigurationKeys.Telemetry.ExtendedHeartbeatIntervalSeconds)
.AsDouble(defaultValue: 86400, rawInterval => rawInterval > 0)
.Value;

var dependencyCollectionEnabled = config.WithKeys(ConfigurationKeys.Telemetry.DependencyCollectionEnabled).AsBool(true);

var telemetryCompressionMethod = config.WithKeys(ConfigurationKeys.Telemetry.TelemetryCompressionMethod).AsString("gzip");
Expand Down Expand Up @@ -168,6 +177,7 @@ public static TelemetrySettings FromSource(IConfigurationSource source, IConfigu
agentless,
agentProxyEnabled,
TimeSpan.FromSeconds(heartbeatInterval),
TimeSpan.FromSeconds(extendedHeartbeatInterval),
dependencyCollectionEnabled,
metricsEnabled,
debugEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ namespace Datadog.Trace.Tests.Telemetry;
public class TelemetryControllerSchedulerTests
{
private static readonly TimeSpan FlushInterval = TimeSpan.FromSeconds(60);
private static readonly TimeSpan ExtendedHeartbeatInterval = TimeSpan.FromSeconds(86400);
private static readonly TimeSpan FiveSeconds = TimeSpan.FromSeconds(5);
private static readonly Task NeverComplete = Task.Delay(Timeout.Infinite);
private readonly TaskCompletionSource<bool> _processExit = new();
Expand All @@ -24,7 +25,7 @@ public class TelemetryControllerSchedulerTests

public TelemetryControllerSchedulerTests()
{
_scheduler = new TelemetryController.Scheduler(FlushInterval, () => NeverComplete, _processExit, _clock, _delayFactory);
_scheduler = new TelemetryController.Scheduler(FlushInterval, ExtendedHeartbeatInterval, () => NeverComplete, _processExit, _clock, _delayFactory);
}

[Fact]
Expand Down Expand Up @@ -288,7 +289,7 @@ public async Task CanChangeFlushInterval()
}

private TelemetryController.Scheduler GetScheduler(QueueTaskGenerator queueTaskGenerator = null)
=> new(FlushInterval, (queueTaskGenerator ?? new()).GetTask, _processExit, _clock, _delayFactory);
=> new(FlushInterval, ExtendedHeartbeatInterval, (queueTaskGenerator ?? new()).GetTask, _processExit, _clock, _delayFactory);

private class DelayFactory : TelemetryController.Scheduler.IDelayFactory
{
Expand Down
Loading
Loading