diff --git a/src/Datadog.Trace/Agent/Api.cs b/src/Datadog.Trace/Agent/Api.cs index e7dd010b749a..751c8909c036 100644 --- a/src/Datadog.Trace/Agent/Api.cs +++ b/src/Datadog.Trace/Agent/Api.cs @@ -128,7 +128,7 @@ public async Task SendTracesAsync(IList> traces) var responseContent = await responseMessage.Content.ReadAsStringAsync().ConfigureAwait(false); var response = JsonConvert.DeserializeObject(responseContent); - Tracer.Instance.Sampler.SetSampleRates(response?.RateByService); + Tracer.Instance.Sampler.SetDefaultSampleRates(response?.RateByService); } } catch (Exception ex) diff --git a/src/Datadog.Trace/Configuration/ConfigurationKeys.cs b/src/Datadog.Trace/Configuration/ConfigurationKeys.cs index 57e1e63d1e9e..24f744c71d6a 100644 --- a/src/Datadog.Trace/Configuration/ConfigurationKeys.cs +++ b/src/Datadog.Trace/Configuration/ConfigurationKeys.cs @@ -89,6 +89,43 @@ public static class ConfigurationKeys /// public const string LogsInjectionEnabled = "DD_LOGS_INJECTION"; + /// + /// Configuration key for setting the number of traces allowed + /// to be submitted per second. + /// + /// + public const string MaxTracesSubmittedPerSecond = "DD_MAX_TRACES_PER_SECOND"; + + /// + /// Configuration key for setting custom sampling rules based on regular expressions. + /// Semi-colon separated list of sampling rules. + /// The rule is matched in order of specification. The first match in a list is used. + /// + /// Per entry: + /// The item 'rate' is required in decimal format. + /// The item 'service' is optional in regular expression format, to match on service name. + /// The item 'operation' is optional in regular expression format, to match on operation name. + /// + /// To give a rate of 50% to any traces in a service starting with the text "cart": + /// 'rate=0.5, service=cart.*' + /// + /// To give a rate of 20% to any traces which have an operation name of "http.request": + /// 'rate=0.2, operation=http.request' + /// + /// To give a rate of 100% to any traces within a service named "background" and with an operation name of "sql.query": + /// 'rate=1.0, service=background, operation=sql.query + /// + /// To give a rate of 10% to all traces + /// 'rate=0.1' + /// + /// To configure multiple rules, separate by semi-colon and order from most specific to least specific: + /// 'rate=0.5, service=cart.*; rate=0.2, operation=http.request; rate=1.0, service=background, operation=sql.query; rate=0.1' + /// + /// If no rules are specified, or none match, default internal sampling logic will be used. + /// + /// + public const string CustomSamplingRules = "DD_CUSTOM_SAMPLING_RULES"; + /// /// String format patterns used to match integration-specific configuration keys. /// diff --git a/src/Datadog.Trace/Configuration/TracerSettings.cs b/src/Datadog.Trace/Configuration/TracerSettings.cs index a7e8849e04ae..58913e05b1d6 100644 --- a/src/Datadog.Trace/Configuration/TracerSettings.cs +++ b/src/Datadog.Trace/Configuration/TracerSettings.cs @@ -77,6 +77,9 @@ public TracerSettings(IConfigurationSource source) LogsInjectionEnabled = source?.GetBool(ConfigurationKeys.LogsInjectionEnabled) ?? false; + MaxTracesSubmittedPerSecond = source?.GetInt32(ConfigurationKeys.MaxTracesSubmittedPerSecond) ?? + 100; + Integrations = new IntegrationSettingsCollection(source); GlobalTags = source?.GetDictionary(ConfigurationKeys.GlobalTags) ?? @@ -141,6 +144,19 @@ public TracerSettings(IConfigurationSource source) /// public bool LogsInjectionEnabled { get; set; } + /// + /// Gets or sets a value indicating the maximum number of traces set to AutoKeep (p1) per second. + /// Default is 100. + /// + /// + public int MaxTracesSubmittedPerSecond { get; set; } + + /// + /// Gets or sets a value indicating custom sampling rules. + /// + /// + public string CustomSamplingRules { get; set; } + /// /// Gets a collection of keyed by integration name. /// diff --git a/src/Datadog.Trace/Logging/DatadogLogging.cs b/src/Datadog.Trace/Logging/DatadogLogging.cs index e52182d97c0c..603adb874225 100644 --- a/src/Datadog.Trace/Logging/DatadogLogging.cs +++ b/src/Datadog.Trace/Logging/DatadogLogging.cs @@ -2,7 +2,6 @@ using System.Diagnostics; using System.IO; using System.Runtime.InteropServices; -using Datadog.Trace.Configuration; using Datadog.Trace.Vendors.Serilog; using Datadog.Trace.Vendors.Serilog.Events; using Datadog.Trace.Vendors.Serilog.Sinks.File; diff --git a/src/Datadog.Trace/Metrics.cs b/src/Datadog.Trace/Metrics.cs index b80f94fbdbaa..e18f9f35af19 100644 --- a/src/Datadog.Trace/Metrics.cs +++ b/src/Datadog.Trace/Metrics.cs @@ -3,5 +3,23 @@ namespace Datadog.Trace internal static class Metrics { public const string SamplingPriority = "_sampling_priority_v1"; + + /// + /// To be set when the agent determines the sampling rate for a trace + /// Read: Agent Priority Sample Rate + /// + public const string SamplingAgentDecision = "_dd.agent_psr"; + + /// + /// To be set when a sampling rule is applied to a trace + /// Read: Sampling Rule Priority Sample Rate + /// + public const string SamplingRuleDecision = "_dd.rule_psr"; + + /// + /// To be set when a rate limiter is applied to a trace. + /// Read: Rate Limiter Priority Sample Rate + /// + public const string SamplingLimitDecision = "_dd.limit_psr"; } } diff --git a/src/Datadog.Trace/Sampling/IRateLimiter.cs b/src/Datadog.Trace/Sampling/IRateLimiter.cs new file mode 100644 index 000000000000..adec223be6ba --- /dev/null +++ b/src/Datadog.Trace/Sampling/IRateLimiter.cs @@ -0,0 +1,9 @@ +namespace Datadog.Trace.Sampling +{ + internal interface IRateLimiter + { + bool Allowed(ulong traceId); + + float GetEffectiveRate(); + } +} diff --git a/src/Datadog.Trace/Sampling/ISampler.cs b/src/Datadog.Trace/Sampling/ISampler.cs index a8fbcfce7469..571e3d9ef2f1 100644 --- a/src/Datadog.Trace/Sampling/ISampler.cs +++ b/src/Datadog.Trace/Sampling/ISampler.cs @@ -4,8 +4,10 @@ namespace Datadog.Trace.Sampling { internal interface ISampler { - void SetSampleRates(IEnumerable> sampleRates); + void SetDefaultSampleRates(IEnumerable> sampleRates); - SamplingPriority GetSamplingPriority(string service, string env, ulong traceId); + SamplingPriority GetSamplingPriority(Span span); + + void RegisterRule(ISamplingRule rule); } } diff --git a/src/Datadog.Trace/Sampling/ISamplingRule.cs b/src/Datadog.Trace/Sampling/ISamplingRule.cs new file mode 100644 index 000000000000..d9aa86cba041 --- /dev/null +++ b/src/Datadog.Trace/Sampling/ISamplingRule.cs @@ -0,0 +1,13 @@ +namespace Datadog.Trace.Sampling +{ + internal interface ISamplingRule + { + string Name { get; } + + int Priority { get; } + + bool IsMatch(Span span); + + float GetSamplingRate(); + } +} diff --git a/src/Datadog.Trace/Sampling/RateByServiceSampler.cs b/src/Datadog.Trace/Sampling/RateByServiceSampler.cs deleted file mode 100644 index 7d1b57c86977..000000000000 --- a/src/Datadog.Trace/Sampling/RateByServiceSampler.cs +++ /dev/null @@ -1,48 +0,0 @@ -using System; -using System.Collections.Generic; - -namespace Datadog.Trace.Sampling -{ - internal class RateByServiceSampler : ISampler - { - private const ulong MaxTraceId = 9_223_372_036_854_775_807; // 2^63-1 - private const ulong KnuthFactor = 1_111_111_111_111_111_111; - - // can support multiple readers concurrently, as long as the collection is not modified. - // start with an empty collection by default, so we can skip the null check in GetSamplingPriority() - private Dictionary _sampleRates = new Dictionary(); - - public void SetSampleRates(IEnumerable> sampleRates) - { - // to avoid locking if writers and readers can access the dictionary at the same time, - // build the new dictionary first, then replace the old one - var rates = new Dictionary(StringComparer.OrdinalIgnoreCase); - - if (sampleRates != null) - { - foreach (var pair in sampleRates) - { - rates.Add(pair.Key, pair.Value); - } - } - - _sampleRates = rates; - } - - public SamplingPriority GetSamplingPriority(string service, string env, ulong traceId) - { - string key = $"service:{service},env:{env}"; - - if (_sampleRates.TryGetValue(key, out float sampleRate)) - { - var sample = ((traceId * KnuthFactor) % MaxTraceId) <= (sampleRate * MaxTraceId); - - return sample - ? SamplingPriority.AutoKeep - : SamplingPriority.AutoReject; - } - - return SamplingPriority.AutoKeep; - } - } -} diff --git a/src/Datadog.Trace/Sampling/RateLimiter.cs b/src/Datadog.Trace/Sampling/RateLimiter.cs new file mode 100644 index 000000000000..4f70c1bead53 --- /dev/null +++ b/src/Datadog.Trace/Sampling/RateLimiter.cs @@ -0,0 +1,141 @@ +using System; +using System.Collections.Concurrent; +using System.Threading; +using Datadog.Trace.Logging; + +namespace Datadog.Trace.Sampling +{ + internal class RateLimiter : IRateLimiter + { + private static readonly Vendors.Serilog.ILogger Log = DatadogLogging.For(); + + private readonly ManualResetEventSlim _refreshEvent = new ManualResetEventSlim(initialState: true); + private readonly ConcurrentQueue _intervalQueue = new ConcurrentQueue(); + + private readonly int _maxTracesPerInterval; + private readonly int _intervalMilliseconds; + private readonly TimeSpan _interval; + + private DateTime _windowBegin; + private DateTime _lastRefresh; + + private int _windowChecks = 0; + private int _windowAllowed = 0; + + private int _previousWindowChecks = 0; + private int _previousWindowAllowed = 0; + + public RateLimiter(int? maxTracesPerInterval) + { + _maxTracesPerInterval = maxTracesPerInterval ?? 100; + _intervalMilliseconds = 1_000; + _interval = TimeSpan.FromMilliseconds(_intervalMilliseconds); + _windowBegin = _lastRefresh = DateTime.Now; + } + + public bool Allowed(ulong traceId) + { + if (_maxTracesPerInterval == 0) + { + // Rate limit of 0 blocks everything + return false; + } + + if (_maxTracesPerInterval < 0) + { + // Negative rate limit disables rate limiting + return true; + } + + WaitForRefresh(); + + // This must happen after the wait, because we check for window statistics, modifying this number + Interlocked.Increment(ref _windowChecks); + + var count = _intervalQueue.Count; + + if (count >= _maxTracesPerInterval) + { + Log.Debug("Dropping trace id {0} with count of {1} for last {2}ms.", traceId, count, _intervalMilliseconds); + return false; + } + + _intervalQueue.Enqueue(DateTime.Now); + Interlocked.Increment(ref _windowAllowed); + + return true; + } + + public float GetEffectiveRate() + { + if (_maxTracesPerInterval == 0) + { + // Rate limit of 0 blocks everything + return 0; + } + + if (_maxTracesPerInterval < 0) + { + // Negative rate limit disables rate limiting + return 1; + } + + var totalChecksForLastTwoWindows = _windowChecks + _previousWindowChecks; + + if (totalChecksForLastTwoWindows == 0) + { + // no checks, effectively 100%. don't divide by zero + return 1; + } + + // Current window + previous window to prevent burst-iness and low new window numbers from skewing the rate + return (_windowAllowed + _previousWindowAllowed) / (float)totalChecksForLastTwoWindows; + } + + private void WaitForRefresh() + { + var previousRefresh = _lastRefresh; + + // Block if a refresh event is happening + _refreshEvent.Wait(); + + if (previousRefresh != _lastRefresh) + { + // Some other thread already did this very recently + // Let's save some cycles and prevent contention + return; + } + + try + { + // Block threads + _refreshEvent.Reset(); + + var now = DateTime.Now; + _lastRefresh = now; + + var timeSinceWindowStart = (now - _windowBegin).TotalMilliseconds; + + if (timeSinceWindowStart >= _intervalMilliseconds) + { + // statistical window has passed, shift the counts + _previousWindowAllowed = _windowAllowed; + _previousWindowChecks = _windowChecks; + _windowAllowed = 0; + _windowChecks = 0; + _windowBegin = now; + } + + while (_intervalQueue.TryPeek(out var time) && now.Subtract(time) > _interval) + { + _intervalQueue.TryDequeue(out _); + } + } + finally + { + // Resume threads + _refreshEvent.Set(); + } + } + } +} diff --git a/src/Datadog.Trace/Sampling/RegexSamplingRule.cs b/src/Datadog.Trace/Sampling/RegexSamplingRule.cs new file mode 100644 index 000000000000..db57c446c3a3 --- /dev/null +++ b/src/Datadog.Trace/Sampling/RegexSamplingRule.cs @@ -0,0 +1,178 @@ +using System; +using System.Collections.Generic; +using System.Text.RegularExpressions; +using Datadog.Trace.Logging; + +namespace Datadog.Trace.Sampling +{ + internal class RegexSamplingRule : ISamplingRule + { + private static readonly Vendors.Serilog.ILogger Log = DatadogLogging.For(); + private static readonly TimeSpan RegexTimeout = TimeSpan.FromSeconds(1); + + private readonly float _samplingRate; + private readonly string _serviceNameRegex; + private readonly string _operationNameRegex; + + private bool _hasPoisonedRegex = false; + + public RegexSamplingRule( + float rate, + string name, + string serviceNameRegex, + string operationNameRegex) + { + _samplingRate = rate; + _serviceNameRegex = serviceNameRegex; + _operationNameRegex = operationNameRegex; + + Name = name; + } + + public string Name { get; } + + /// + /// Gets the Priority of the rule. + /// Configuration rules will default to 1 as a priority and rely on order of specification. + /// + public int Priority => 1; + + public static IEnumerable BuildFromConfigurationString(string configuration) + { + if (!string.IsNullOrEmpty(configuration)) + { + var ruleStrings = configuration.Split(new[] { ";", ":" }, StringSplitOptions.RemoveEmptyEntries); + var index = 0; + + foreach (var ruleString in ruleStrings) + { + index++; + + var ruleParts = ruleString.Split(new[] { "," }, StringSplitOptions.RemoveEmptyEntries); + + var rateSet = false; + float rate = 0; + string serviceNameRegex = null, operationNameRegex = null, ruleName = $"config_rule_{index}"; + + foreach (var rulePart in ruleParts) + { + var kvp = rulePart.Split(new[] { "=" }, StringSplitOptions.None); + + if (kvp.Length != 2 || string.IsNullOrWhiteSpace(kvp[0]) || string.IsNullOrWhiteSpace(kvp[1])) + { + Log.Warning("Rule {0} is malformed, skipping.", ruleName); + continue; + } + + var key = kvp[0].Trim(); + var value = kvp[1].Trim(); + + if (key.Equals("rate", StringComparison.OrdinalIgnoreCase) && float.TryParse(value, out rate)) + { + if (rate < 0 || rate > 1) + { + // invalid rate + Log.Warning("Invalid rate {0} specified for sampling rule {1}, skipping.", rate, ruleName); + break; + } + + rateSet = true; + } + else if (key.Equals("service", StringComparison.OrdinalIgnoreCase)) + { + serviceNameRegex = WrapWithLineCharacters(value); + } + else if (key.Equals("operation", StringComparison.OrdinalIgnoreCase)) + { + operationNameRegex = WrapWithLineCharacters(value); + } + else if (key.Equals("name", StringComparison.OrdinalIgnoreCase)) + { + ruleName = value; + } + } + + if (rateSet == false) + { + // Need a valid rate to be set to use a rule + Log.Warning("Rule {0} is missing the required rate, skipping.", ruleName); + continue; + } + + yield return new RegexSamplingRule( + rate: rate, + name: ruleName, + serviceNameRegex: serviceNameRegex, + operationNameRegex: operationNameRegex); + } + } + } + + public bool IsMatch(Span span) + { + if (_hasPoisonedRegex) + { + return false; + } + + if (DoesNotMatch(input: span.ServiceName, pattern: _serviceNameRegex)) + { + return false; + } + + if (DoesNotMatch(input: span.OperationName, pattern: _operationNameRegex)) + { + return false; + } + + return true; + } + + public float GetSamplingRate() + { + return _samplingRate; + } + + private static string WrapWithLineCharacters(string regex) + { + if (!regex.StartsWith("^")) + { + regex = "^" + regex; + } + + if (!regex.EndsWith("$")) + { + regex = regex + "$"; + } + + return regex; + } + + private bool DoesNotMatch(string input, string pattern) + { + try + { + if (pattern != null && + !Regex.IsMatch( + input: input, + pattern: pattern, + options: RegexOptions.None, + matchTimeout: RegexTimeout)) + { + return true; + } + } + catch (RegexMatchTimeoutException timeoutEx) + { + _hasPoisonedRegex = true; + Log.Error( + timeoutEx, + "Timeout when trying to match against {0} on {1}.", + input, + pattern); + } + + return false; + } + } +} diff --git a/src/Datadog.Trace/Sampling/RuleBasedSampler.cs b/src/Datadog.Trace/Sampling/RuleBasedSampler.cs new file mode 100644 index 000000000000..61144e141de4 --- /dev/null +++ b/src/Datadog.Trace/Sampling/RuleBasedSampler.cs @@ -0,0 +1,127 @@ +using System; +using System.Collections.Generic; +using Datadog.Trace.Logging; + +namespace Datadog.Trace.Sampling +{ + internal class RuleBasedSampler : ISampler + { + private const ulong KnuthFactor = 1_111_111_111_111_111_111; + + private static readonly Vendors.Serilog.ILogger Log = DatadogLogging.For(); + + private readonly IRateLimiter _limiter; + private readonly List _rules = new List(); + + private Dictionary _sampleRates = new Dictionary(); + + public RuleBasedSampler(IRateLimiter limiter) + { + _limiter = limiter ?? new RateLimiter(null); + } + + public void SetDefaultSampleRates(IEnumerable> sampleRates) + { + // to avoid locking if writers and readers can access the dictionary at the same time, + // build the new dictionary first, then replace the old one + var rates = new Dictionary(StringComparer.OrdinalIgnoreCase); + + if (sampleRates != null) + { + foreach (var pair in sampleRates) + { + rates.Add(pair.Key, pair.Value); + } + } + + _sampleRates = rates; + } + + public SamplingPriority GetSamplingPriority(Span span) + { + float sampleRate; + var traceId = span.TraceId; + + if (_rules.Count > 0) + { + foreach (var rule in _rules) + { + if (rule.IsMatch(span)) + { + sampleRate = rule.GetSamplingRate(); + Log.Debug( + "Matched on rule {0}. Applying rate of {1} to trace id {2}", + rule.Name, + sampleRate, + traceId); + span.SetMetric(Metrics.SamplingRuleDecision, sampleRate); + return GetSamplingPriority(span, sampleRate, withRateLimiter: true); + } + } + } + + var env = span.GetTag(Tags.Env); + var service = span.ServiceName; + + var key = $"service:{service},env:{env}"; + + if (_sampleRates.TryGetValue(key, out sampleRate)) + { + Log.Debug("Using the default sampling logic for trace {0}", traceId); + return GetSamplingPriority(span, sampleRate, withRateLimiter: false); + } + + Log.Debug("Could not establish sample rate for trace {0}", traceId); + return SamplingPriority.AutoKeep; + } + + /// + /// Will insert a rule according to how high the Priority field is set. + /// If the priority is equal to other rules, the new rule will be the last in that priority group. + /// + /// The new rule being registered. + public void RegisterRule(ISamplingRule rule) + { + for (var i = 0; i < _rules.Count; i++) + { + if (_rules[i].Priority < rule.Priority) + { + _rules.Insert(i, rule); + return; + } + } + + // No items or this is the last priority + _rules.Add(rule); + } + + private SamplingPriority GetSamplingPriority(Span span, float rate, bool withRateLimiter) + { + var sample = ((span.TraceId * KnuthFactor) % TracerConstants.MaxTraceId) <= (rate * TracerConstants.MaxTraceId); + var priority = SamplingPriority.AutoReject; + + if (sample) + { + if (withRateLimiter) + { + // Ensure all allowed traces adhere to the global rate limit + if (_limiter.Allowed(span.TraceId)) + { + priority = SamplingPriority.AutoKeep; + } + + // Always set the sample rate metric whether it was allowed or not + // DEV: Setting this allows us to properly compute metrics and debug the + // various sample rates that are getting applied to this span + span.SetMetric(Metrics.SamplingLimitDecision, _limiter.GetEffectiveRate()); + } + else + { + priority = SamplingPriority.AutoKeep; + } + } + + return priority; + } + } +} diff --git a/src/Datadog.Trace/TraceContext.cs b/src/Datadog.Trace/TraceContext.cs index cc3301a9a377..49e307b3871f 100644 --- a/src/Datadog.Trace/TraceContext.cs +++ b/src/Datadog.Trace/TraceContext.cs @@ -67,11 +67,9 @@ public void AddSpan(Span span) else { // this is a local root span (i.e. not propagated). - string env = RootSpan.GetTag(Tags.Env); - // determine an initial sampling priority for this trace, but don't lock it yet _samplingPriority = - Tracer.Sampler?.GetSamplingPriority(RootSpan.ServiceName, env, RootSpan.Context.TraceId); + Tracer.Sampler?.GetSamplingPriority(RootSpan); } } } diff --git a/src/Datadog.Trace/Tracer.cs b/src/Datadog.Trace/Tracer.cs index 609716248dd1..e024d3ea9d5a 100644 --- a/src/Datadog.Trace/Tracer.cs +++ b/src/Datadog.Trace/Tracer.cs @@ -55,7 +55,15 @@ internal Tracer(TracerSettings settings, IAgentWriter agentWriter, ISampler samp Settings = settings ?? TracerSettings.FromDefaultSources(); _agentWriter = agentWriter ?? new AgentWriter(new Api(Settings.AgentUri)); _scopeManager = scopeManager ?? new AsyncLocalScopeManager(); - Sampler = sampler ?? new RateByServiceSampler(); + Sampler = sampler ?? new RuleBasedSampler(new RateLimiter(Settings.MaxTracesSubmittedPerSecond)); + + if (!string.IsNullOrWhiteSpace(Settings.CustomSamplingRules)) + { + foreach (var rule in RegexSamplingRule.BuildFromConfigurationString(Settings.CustomSamplingRules)) + { + Sampler.RegisterRule(rule); + } + } // if not configured, try to determine an appropriate service name DefaultServiceName = Settings.ServiceName ?? diff --git a/src/Datadog.Trace/TracerConstants.cs b/src/Datadog.Trace/TracerConstants.cs index 6d26de7d9cdc..6ed8daed4bc5 100644 --- a/src/Datadog.Trace/TracerConstants.cs +++ b/src/Datadog.Trace/TracerConstants.cs @@ -3,5 +3,10 @@ namespace Datadog.Trace internal static class TracerConstants { public const string Language = "dotnet"; + + /// + /// 2^63-1 + /// + public const ulong MaxTraceId = 9_223_372_036_854_775_807; } } diff --git a/test/Datadog.Trace.TestHelpers/TestRunners.cs b/test/Datadog.Trace.TestHelpers/TestRunners.cs index b36d1fbc2dc5..793ff184aaab 100644 --- a/test/Datadog.Trace.TestHelpers/TestRunners.cs +++ b/test/Datadog.Trace.TestHelpers/TestRunners.cs @@ -11,7 +11,8 @@ public class TestRunners "vstest.console", "xunit.console.x86", "xunit.console.x64", - "ReSharperTestRunner64" + "ReSharperTestRunner64", + "ReSharperTestRunner64c" }; } } diff --git a/test/Datadog.Trace.Tests/Sampling/RateLimiterTests.cs b/test/Datadog.Trace.Tests/Sampling/RateLimiterTests.cs new file mode 100644 index 000000000000..b5662b295aa0 --- /dev/null +++ b/test/Datadog.Trace.Tests/Sampling/RateLimiterTests.cs @@ -0,0 +1,225 @@ +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using Datadog.Trace.ExtensionMethods; +using Datadog.Trace.Sampling; +using Xunit; + +namespace Datadog.Trace.Tests.Sampling +{ + [Collection(nameof(Datadog.Trace.Tests.Sampling))] + public class RateLimiterTests + { + private const int DefaultLimitPerSecond = 100; + private static readonly ThreadLocal Random = new ThreadLocal(() => new Random()); + + [Fact] + public void One_Is_Allowed() + { + var rateLimiter = new RateLimiter(maxTracesPerInterval: null); + var allowed = rateLimiter.Allowed(1); + Assert.True(allowed); + } + + [Fact] + public void All_Traces_Disabled() + { + var rateLimiter = new RateLimiter(maxTracesPerInterval: 0); + var allowedCount = AskTheRateLimiterABunchOfTimes(rateLimiter, 500); + Assert.Equal(expected: 0, actual: allowedCount); + } + + [Fact] + public void All_Traces_Allowed() + { + var rateLimiter = new RateLimiter(maxTracesPerInterval: -1); + var allowedCount = AskTheRateLimiterABunchOfTimes(rateLimiter, 500); + Assert.Equal(expected: 500, actual: allowedCount); + } + + [Fact] + public void Only_100_Allowed_In_500_Burst_For_Default() + { + var rateLimiter = new RateLimiter(maxTracesPerInterval: null); + var allowedCount = AskTheRateLimiterABunchOfTimes(rateLimiter, 500); + Assert.Equal(expected: DefaultLimitPerSecond, actual: allowedCount); + } + + [Fact] + public void Limits_Approximately_To_Defaults() + { + Run_Limit_Test(intervalLimit: null, numberPerBurst: 200, numberOfBursts: 20, millisecondsBetweenBursts: 247); + } + + [Fact] + public void Limits_To_Custom_Amount_Per_Second() + { + Run_Limit_Test(intervalLimit: 500, numberPerBurst: 200, numberOfBursts: 20, millisecondsBetweenBursts: 247); + } + + private static void Run_Limit_Test(int? intervalLimit, int numberPerBurst, int numberOfBursts, int millisecondsBetweenBursts) + { + var actualIntervalLimit = intervalLimit ?? DefaultLimitPerSecond; + + var test = new RateLimitLoadTest() + { + NumberPerBurst = numberPerBurst, + TimeBetweenBursts = TimeSpan.FromMilliseconds(millisecondsBetweenBursts), + NumberOfBursts = numberOfBursts + }; + + var result = RunTest(intervalLimit, test); + + var totalMilliseconds = result.TimeElapsed.TotalMilliseconds; + + var expectedLimit = totalMilliseconds * actualIntervalLimit / 1_000; + + var upperLimit = expectedLimit + (actualIntervalLimit * 0.8); + var lowerLimit = expectedLimit - (actualIntervalLimit * 0.8); + + Assert.True( + result.TotalAllowed >= lowerLimit && result.TotalAllowed <= upperLimit, + $"Expected between {lowerLimit} and {upperLimit}, received {result.TotalAllowed} out of {result.TotalAttempted} within {totalMilliseconds} milliseconds."); + + // Rate should match for the last two intervals, which is a total of two seconds + var numberOfBurstsWithinTwoIntervals = 2_000 / millisecondsBetweenBursts; + var totalExpectedSent = numberOfBurstsWithinTwoIntervals * numberPerBurst; + var totalExpectedAllowed = 2 * actualIntervalLimit; + var expectedRate = totalExpectedAllowed / (float)totalExpectedSent; + + var maxPercentVariance = 0.20f; + var lowestRate = expectedRate - maxPercentVariance; + var highestRate = expectedRate + maxPercentVariance; + + Assert.True( + result.ReportedRate >= lowestRate && result.ReportedRate <= highestRate, + $"Expected rate between {lowestRate} and {highestRate}, received {result.ReportedRate}."); + } + + private static int AskTheRateLimiterABunchOfTimes(RateLimiter rateLimiter, int howManyTimes) + { + var remaining = howManyTimes; + var allowedCount = 0; + while (remaining-- > 0) + { + var allowed = rateLimiter.Allowed(1); + if (allowed) + { + allowedCount++; + } + } + + return allowedCount; + } + + private static RateLimitResult RunTest(int? intervalLimit, RateLimitLoadTest test) + { + var parallelism = test.NumberPerBurst; + + if (parallelism > 10) + { + parallelism = 10; + } + + var resetEvent = new ManualResetEventSlim(initialState: false); // Start blocked + + var workerReps = Enumerable.Range(1, parallelism).ToArray(); + + var registry = new ConcurrentQueue(); + + var result = new RateLimitResult(); + + var start = DateTime.Now; + var limiter = new RateLimiter(maxTracesPerInterval: intervalLimit); + var end = DateTime.Now; + var endLock = new object(); + + for (var i = 0; i < test.NumberOfBursts; i++) + { + var remaining = test.NumberPerBurst; + + var workers = + workerReps + .Select(t => new Thread( + thread => + { + resetEvent.Wait(); + while (remaining > 0) + { + Interlocked.Decrement(ref remaining); + var id = Random.Value.NextUInt63(); + if (limiter.Allowed(id)) + { + result.Allowed.Add(id); + } + else + { + result.Denied.Add(id); + } + } + + lock (endLock) + { + end = DateTime.Now; + } + })); + + foreach (var worker in workers) + { + registry.Enqueue(worker); + worker.Start(); + } + + resetEvent.Set(); + + Thread.Sleep(test.TimeBetweenBursts); + + resetEvent.Reset(); + } + + while (!registry.IsEmpty) + { + if (registry.TryDequeue(out var item)) + { + if (item.IsAlive) + { + registry.Enqueue(item); + } + } + } + + result.RateLimiter = limiter; + result.ReportedRate = limiter.GetEffectiveRate(); + result.TimeElapsed = end.Subtract(start); + + return result; + } + + private class RateLimitLoadTest + { + public int NumberPerBurst { get; set; } + + public TimeSpan TimeBetweenBursts { get; set; } + + public int NumberOfBursts { get; set; } + } + + private class RateLimitResult + { + public RateLimiter RateLimiter { get; set; } + + public TimeSpan TimeElapsed { get; set; } + + public ConcurrentBag Allowed { get; } = new ConcurrentBag(); + + public ConcurrentBag Denied { get; } = new ConcurrentBag(); + + public float ReportedRate { get; set; } + + public int TotalAttempted => Allowed.Count + Denied.Count; + + public int TotalAllowed => Allowed.Count; + } + } +} diff --git a/test/Datadog.Trace.Tests/Sampling/RegexSamplingRuleTests.cs b/test/Datadog.Trace.Tests/Sampling/RegexSamplingRuleTests.cs new file mode 100644 index 000000000000..7bfe61591ff2 --- /dev/null +++ b/test/Datadog.Trace.Tests/Sampling/RegexSamplingRuleTests.cs @@ -0,0 +1,88 @@ +using System; +using System.Linq; +using Datadog.Trace.Sampling; +using Xunit; + +namespace Datadog.Trace.Tests.Sampling +{ + [Collection(nameof(Datadog.Trace.Tests.Sampling))] + public class RegexSamplingRuleTests + { + private static readonly ulong Id = 1; + private static readonly Span CartCheckoutSpan = new Span(new SpanContext(Id++, Id++, null, serviceName: "shopping-cart-service"), DateTimeOffset.Now) { OperationName = "checkout" }; + private static readonly Span AddToCartSpan = new Span(new SpanContext(Id++, Id++, null, serviceName: "shopping-cart-service"), DateTimeOffset.Now) { OperationName = "cart-add" }; + private static readonly Span ShippingAuthSpan = new Span(new SpanContext(Id++, Id++, null, serviceName: "shipping-auth-service"), DateTimeOffset.Now) { OperationName = "authorize" }; + private static readonly Span ShippingRevertSpan = new Span(new SpanContext(Id++, Id++, null, serviceName: "shipping-auth-service"), DateTimeOffset.Now) { OperationName = "authorize-revert" }; + private static readonly Span RequestShippingSpan = new Span(new SpanContext(Id++, Id++, null, serviceName: "request-shipping"), DateTimeOffset.Now) { OperationName = "submit" }; + + [Fact] + public void Constructs_ZeroRateOnly_From_Config_String() + { + var config = "rate=0"; + var rule = RegexSamplingRule.BuildFromConfigurationString(config).Single(); + Assert.Equal(expected: 0, actual: rule.GetSamplingRate()); + + Assert.True(rule.IsMatch(CartCheckoutSpan)); + Assert.True(rule.IsMatch(AddToCartSpan)); + Assert.True(rule.IsMatch(ShippingAuthSpan)); + Assert.True(rule.IsMatch(ShippingRevertSpan)); + Assert.True(rule.IsMatch(RequestShippingSpan)); + } + + [Fact] + public void Constructs_CartOnlyRule_From_Config_String() + { + var config = "rate=0.5, service=shopping-cart.*"; + var rule = RegexSamplingRule.BuildFromConfigurationString(config).Single(); + + Assert.True(rule.IsMatch(CartCheckoutSpan)); + Assert.True(rule.IsMatch(AddToCartSpan)); + Assert.False(rule.IsMatch(ShippingAuthSpan)); + Assert.False(rule.IsMatch(ShippingRevertSpan)); + Assert.False(rule.IsMatch(RequestShippingSpan)); + } + + [Fact] + public void Constructs_All_Expected_From_Config_String() + { + var config = "rate=0.5, service=.*cart.*; rate=1, service=.*shipping.*, operation=authorize; rate=0.1, service=.*shipping.*; rate=0.05"; + var rules = RegexSamplingRule.BuildFromConfigurationString(config).ToArray(); + + var cartRule = rules[0]; + Assert.Equal(expected: 0.5f, actual: cartRule.GetSamplingRate()); + + Assert.True(cartRule.IsMatch(CartCheckoutSpan)); + Assert.True(cartRule.IsMatch(AddToCartSpan)); + Assert.False(cartRule.IsMatch(ShippingAuthSpan)); + Assert.False(cartRule.IsMatch(ShippingRevertSpan)); + Assert.False(cartRule.IsMatch(RequestShippingSpan)); + + var shippingAuthRule = rules[1]; + Assert.Equal(expected: 1f, actual: shippingAuthRule.GetSamplingRate()); + + Assert.False(shippingAuthRule.IsMatch(CartCheckoutSpan)); + Assert.False(shippingAuthRule.IsMatch(AddToCartSpan)); + Assert.True(shippingAuthRule.IsMatch(ShippingAuthSpan)); + Assert.False(shippingAuthRule.IsMatch(ShippingRevertSpan)); + Assert.False(shippingAuthRule.IsMatch(RequestShippingSpan)); + + var fallbackShippingRule = rules[2]; + Assert.Equal(expected: 0.1f, actual: fallbackShippingRule.GetSamplingRate()); + + Assert.False(fallbackShippingRule.IsMatch(CartCheckoutSpan)); + Assert.False(fallbackShippingRule.IsMatch(AddToCartSpan)); + Assert.True(fallbackShippingRule.IsMatch(ShippingAuthSpan)); + Assert.True(fallbackShippingRule.IsMatch(ShippingRevertSpan)); + Assert.True(fallbackShippingRule.IsMatch(RequestShippingSpan)); + + var fallbackRule = rules[3]; + Assert.Equal(expected: 0.05f, actual: fallbackRule.GetSamplingRate()); + + Assert.True(fallbackRule.IsMatch(CartCheckoutSpan)); + Assert.True(fallbackRule.IsMatch(AddToCartSpan)); + Assert.True(fallbackRule.IsMatch(ShippingAuthSpan)); + Assert.True(fallbackRule.IsMatch(ShippingRevertSpan)); + Assert.True(fallbackRule.IsMatch(RequestShippingSpan)); + } + } +} diff --git a/test/Datadog.Trace.Tests/Sampling/RuleBasedSamplerTests.cs b/test/Datadog.Trace.Tests/Sampling/RuleBasedSamplerTests.cs new file mode 100644 index 000000000000..097e7036048a --- /dev/null +++ b/test/Datadog.Trace.Tests/Sampling/RuleBasedSamplerTests.cs @@ -0,0 +1,141 @@ +using System; +using System.Collections.Generic; +using Datadog.Trace.Sampling; +using Xunit; + +namespace Datadog.Trace.Tests.Sampling +{ + [Collection(nameof(Datadog.Trace.Tests.Sampling))] + public class RuleBasedSamplerTests + { + private static readonly float FallbackRate = 0.25f; + private static readonly string ServiceName = "my-service-name"; + private static readonly string Env = "my-test-env"; + private static readonly string OperationName = "test"; + private static readonly IEnumerable> MockAgentRates = new List>() { new KeyValuePair($"service:{ServiceName},env:{Env}", FallbackRate) }; + + private static ulong _id = 1; + + [Fact] + public void RateLimiter_Denies_All_Traces() + { + var sampler = new RuleBasedSampler(new DenyAll()); + sampler.RegisterRule(new RegexSamplingRule(1, "Allow_all", ".*", ".*")); + RunSamplerTest( + sampler, + 500, + 0, + 0); + } + + [Fact] + public void Keep_Everything_Rule() + { + var sampler = new RuleBasedSampler(new NoLimits()); + sampler.RegisterRule(new RegexSamplingRule(1, "Allow_all", ".*", ".*")); + RunSamplerTest( + sampler, + 500, + 1, + 0); + } + + [Fact] + public void Keep_Nothing_Rule() + { + var sampler = new RuleBasedSampler(new NoLimits()); + sampler.RegisterRule(new RegexSamplingRule(0, "Allow_nothing", ".*", ".*")); + RunSamplerTest( + sampler, + 500, + 0, + 0); + } + + [Fact] + public void Keep_Half_Rule() + { + var sampler = new RuleBasedSampler(new NoLimits()); + sampler.RegisterRule(new RegexSamplingRule(0.5f, "Allow_nothing", ".*", ".*")); + RunSamplerTest( + sampler, + 10_000, // Higher number for lower variance + 0.5f, + 0.05f); + } + + [Fact] + public void No_Registered_Rules_Uses_Legacy_Rates() + { + var sampler = new RuleBasedSampler(new NoLimits()); + sampler.SetDefaultSampleRates(MockAgentRates); + + RunSamplerTest( + sampler, + 10_000, // Higher number for lower variance + FallbackRate, + 0.05f); + } + + private static Span GetMyServiceSpan() + { + var span = new Span(new SpanContext(_id++, _id++, null, serviceName: ServiceName), DateTimeOffset.Now) { OperationName = OperationName }; + span.SetTag(Tags.Env, Env); + return span; + } + + private void RunSamplerTest( + ISampler sampler, + int iterations, + float expectedAutoKeepRate, + float acceptableVariancePercent) + { + var sampleSize = iterations; + var autoKeeps = 0; + while (sampleSize-- > 0) + { + var span = GetMyServiceSpan(); + var priority = sampler.GetSamplingPriority(span); + if (priority == SamplingPriority.AutoKeep) + { + autoKeeps++; + } + } + + var autoKeepRate = autoKeeps / (float)iterations; + + var lowerLimit = expectedAutoKeepRate * (1 - acceptableVariancePercent); + var upperLimit = expectedAutoKeepRate * (1 + acceptableVariancePercent); + + Assert.True( + autoKeepRate >= lowerLimit && autoKeepRate <= upperLimit, + $"Expected between {lowerLimit} and {upperLimit}, actual rate is {autoKeepRate}."); + } + + private class NoLimits : IRateLimiter + { + public bool Allowed(ulong traceId) + { + return true; + } + + public float GetEffectiveRate() + { + return 1; + } + } + + private class DenyAll : IRateLimiter + { + public bool Allowed(ulong traceId) + { + return false; + } + + public float GetEffectiveRate() + { + return 0; + } + } + } +}