Skip to content

Commit

Permalink
Rule based sampling and rate limiting (DataDog#537)
Browse files Browse the repository at this point in the history
* Rule based sampling and rate limiting

* Regex sampling rule from config

* Fix comment and fix whitespace problems

* Use TotalMilliseconds

* Make clear comments for metrics

* add clarity comment back in

* Consolidate constants

* Merge miss

* Rate limiter tests and fixes

* More tests, less flake

* Simplify RateLimiter, solidify tests

* Remove lock

* Sampling rule tests

* Sampler tests

* More safety mechanisms and better string comparison

* Test fallback logic and rate limiter usage, fix them

* Reduce the flake

* Remove constructor log

* even less flake

* CI is fun with stats tests

* Fix bad comment

* Watch for poison regex

* Thanks StyleCop, I don't know what I'd do without you

* Better comment docs
  • Loading branch information
colin-higgins authored Oct 31, 2019
1 parent 13f0706 commit 3983bdd
Show file tree
Hide file tree
Showing 19 changed files with 1,015 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/Datadog.Trace/Agent/Api.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public async Task SendTracesAsync(IList<List<Span>> traces)
var responseContent = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false);
var response = JsonConvert.DeserializeObject<ApiResponse>(responseContent);

Tracer.Instance.Sampler.SetSampleRates(response?.RateByService);
Tracer.Instance.Sampler.SetDefaultSampleRates(response?.RateByService);
}
}
catch (Exception ex)
Expand Down
37 changes: 37 additions & 0 deletions src/Datadog.Trace/Configuration/ConfigurationKeys.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,43 @@ public static class ConfigurationKeys
/// <seealso cref="TracerSettings.LogsInjectionEnabled"/>
public const string LogsInjectionEnabled = "DD_LOGS_INJECTION";

/// <summary>
/// Configuration key for setting the number of traces allowed
/// to be submitted per second.
/// </summary>
/// <seealso cref="TracerSettings.MaxTracesSubmittedPerSecond"/>
public const string MaxTracesSubmittedPerSecond = "DD_MAX_TRACES_PER_SECOND";

/// <summary>
/// Configuration key for setting custom sampling rules based on regular expressions.
/// Semi-colon separated list of sampling rules.
/// The rule is matched in order of specification. The first match in a list is used.
///
/// Per entry:
/// The item 'rate' is required in decimal format.
/// The item 'service' is optional in regular expression format, to match on service name.
/// The item 'operation' is optional in regular expression format, to match on operation name.
///
/// To give a rate of 50% to any traces in a service starting with the text "cart":
/// 'rate=0.5, service=cart.*'
///
/// To give a rate of 20% to any traces which have an operation name of "http.request":
/// 'rate=0.2, operation=http.request'
///
/// To give a rate of 100% to any traces within a service named "background" and with an operation name of "sql.query":
/// 'rate=1.0, service=background, operation=sql.query
///
/// To give a rate of 10% to all traces
/// 'rate=0.1'
///
/// To configure multiple rules, separate by semi-colon and order from most specific to least specific:
/// 'rate=0.5, service=cart.*; rate=0.2, operation=http.request; rate=1.0, service=background, operation=sql.query; rate=0.1'
///
/// If no rules are specified, or none match, default internal sampling logic will be used.
/// </summary>
/// <seealso cref="TracerSettings.CustomSamplingRules"/>
public const string CustomSamplingRules = "DD_CUSTOM_SAMPLING_RULES";

/// <summary>
/// String format patterns used to match integration-specific configuration keys.
/// </summary>
Expand Down
16 changes: 16 additions & 0 deletions src/Datadog.Trace/Configuration/TracerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ public TracerSettings(IConfigurationSource source)
LogsInjectionEnabled = source?.GetBool(ConfigurationKeys.LogsInjectionEnabled) ??
false;

MaxTracesSubmittedPerSecond = source?.GetInt32(ConfigurationKeys.MaxTracesSubmittedPerSecond) ??
100;

Integrations = new IntegrationSettingsCollection(source);

GlobalTags = source?.GetDictionary(ConfigurationKeys.GlobalTags) ??
Expand Down Expand Up @@ -141,6 +144,19 @@ public TracerSettings(IConfigurationSource source)
/// <seealso cref="ConfigurationKeys.LogsInjectionEnabled"/>
public bool LogsInjectionEnabled { get; set; }

/// <summary>
/// Gets or sets a value indicating the maximum number of traces set to AutoKeep (p1) per second.
/// Default is <c>100</c>.
/// </summary>
/// <seealso cref="ConfigurationKeys.MaxTracesSubmittedPerSecond"/>
public int MaxTracesSubmittedPerSecond { get; set; }

/// <summary>
/// Gets or sets a value indicating custom sampling rules.
/// </summary>
/// <seealso cref="ConfigurationKeys.CustomSamplingRules"/>
public string CustomSamplingRules { get; set; }

/// <summary>
/// Gets a collection of <see cref="Integrations"/> keyed by integration name.
/// </summary>
Expand Down
1 change: 0 additions & 1 deletion src/Datadog.Trace/Logging/DatadogLogging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using Datadog.Trace.Configuration;
using Datadog.Trace.Vendors.Serilog;
using Datadog.Trace.Vendors.Serilog.Events;
using Datadog.Trace.Vendors.Serilog.Sinks.File;
Expand Down
18 changes: 18 additions & 0 deletions src/Datadog.Trace/Metrics.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,23 @@ namespace Datadog.Trace
internal static class Metrics
{
public const string SamplingPriority = "_sampling_priority_v1";

/// <summary>
/// To be set when the agent determines the sampling rate for a trace
/// Read: Agent Priority Sample Rate
/// </summary>
public const string SamplingAgentDecision = "_dd.agent_psr";

/// <summary>
/// To be set when a sampling rule is applied to a trace
/// Read: Sampling Rule Priority Sample Rate
/// </summary>
public const string SamplingRuleDecision = "_dd.rule_psr";

/// <summary>
/// To be set when a rate limiter is applied to a trace.
/// Read: Rate Limiter Priority Sample Rate
/// </summary>
public const string SamplingLimitDecision = "_dd.limit_psr";
}
}
9 changes: 9 additions & 0 deletions src/Datadog.Trace/Sampling/IRateLimiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Datadog.Trace.Sampling
{
internal interface IRateLimiter
{
bool Allowed(ulong traceId);

float GetEffectiveRate();
}
}
6 changes: 4 additions & 2 deletions src/Datadog.Trace/Sampling/ISampler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ namespace Datadog.Trace.Sampling
{
internal interface ISampler
{
void SetSampleRates(IEnumerable<KeyValuePair<string, float>> sampleRates);
void SetDefaultSampleRates(IEnumerable<KeyValuePair<string, float>> sampleRates);

SamplingPriority GetSamplingPriority(string service, string env, ulong traceId);
SamplingPriority GetSamplingPriority(Span span);

void RegisterRule(ISamplingRule rule);
}
}
13 changes: 13 additions & 0 deletions src/Datadog.Trace/Sampling/ISamplingRule.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
namespace Datadog.Trace.Sampling
{
internal interface ISamplingRule
{
string Name { get; }

int Priority { get; }

bool IsMatch(Span span);

float GetSamplingRate();
}
}
48 changes: 0 additions & 48 deletions src/Datadog.Trace/Sampling/RateByServiceSampler.cs

This file was deleted.

141 changes: 141 additions & 0 deletions src/Datadog.Trace/Sampling/RateLimiter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
using System;
using System.Collections.Concurrent;
using System.Threading;
using Datadog.Trace.Logging;

namespace Datadog.Trace.Sampling
{
internal class RateLimiter : IRateLimiter
{
private static readonly Vendors.Serilog.ILogger Log = DatadogLogging.For<RateLimiter>();

private readonly ManualResetEventSlim _refreshEvent = new ManualResetEventSlim(initialState: true);
private readonly ConcurrentQueue<DateTime> _intervalQueue = new ConcurrentQueue<DateTime>();

private readonly int _maxTracesPerInterval;
private readonly int _intervalMilliseconds;
private readonly TimeSpan _interval;

private DateTime _windowBegin;
private DateTime _lastRefresh;

private int _windowChecks = 0;
private int _windowAllowed = 0;

private int _previousWindowChecks = 0;
private int _previousWindowAllowed = 0;

public RateLimiter(int? maxTracesPerInterval)
{
_maxTracesPerInterval = maxTracesPerInterval ?? 100;
_intervalMilliseconds = 1_000;
_interval = TimeSpan.FromMilliseconds(_intervalMilliseconds);
_windowBegin = _lastRefresh = DateTime.Now;
}

public bool Allowed(ulong traceId)
{
if (_maxTracesPerInterval == 0)
{
// Rate limit of 0 blocks everything
return false;
}

if (_maxTracesPerInterval < 0)
{
// Negative rate limit disables rate limiting
return true;
}

WaitForRefresh();

// This must happen after the wait, because we check for window statistics, modifying this number
Interlocked.Increment(ref _windowChecks);

var count = _intervalQueue.Count;

if (count >= _maxTracesPerInterval)
{
Log.Debug("Dropping trace id {0} with count of {1} for last {2}ms.", traceId, count, _intervalMilliseconds);
return false;
}

_intervalQueue.Enqueue(DateTime.Now);
Interlocked.Increment(ref _windowAllowed);

return true;
}

public float GetEffectiveRate()
{
if (_maxTracesPerInterval == 0)
{
// Rate limit of 0 blocks everything
return 0;
}

if (_maxTracesPerInterval < 0)
{
// Negative rate limit disables rate limiting
return 1;
}

var totalChecksForLastTwoWindows = _windowChecks + _previousWindowChecks;

if (totalChecksForLastTwoWindows == 0)
{
// no checks, effectively 100%. don't divide by zero
return 1;
}

// Current window + previous window to prevent burst-iness and low new window numbers from skewing the rate
return (_windowAllowed + _previousWindowAllowed) / (float)totalChecksForLastTwoWindows;
}

private void WaitForRefresh()
{
var previousRefresh = _lastRefresh;

// Block if a refresh event is happening
_refreshEvent.Wait();

if (previousRefresh != _lastRefresh)
{
// Some other thread already did this very recently
// Let's save some cycles and prevent contention
return;
}

try
{
// Block threads
_refreshEvent.Reset();

var now = DateTime.Now;
_lastRefresh = now;

var timeSinceWindowStart = (now - _windowBegin).TotalMilliseconds;

if (timeSinceWindowStart >= _intervalMilliseconds)
{
// statistical window has passed, shift the counts
_previousWindowAllowed = _windowAllowed;
_previousWindowChecks = _windowChecks;
_windowAllowed = 0;
_windowChecks = 0;
_windowBegin = now;
}

while (_intervalQueue.TryPeek(out var time) && now.Subtract(time) > _interval)
{
_intervalQueue.TryDequeue(out _);
}
}
finally
{
// Resume threads
_refreshEvent.Set();
}
}
}
}
Loading

0 comments on commit 3983bdd

Please sign in to comment.