Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 73 additions & 36 deletions lib/tracing-limit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
//! # Rate limit grouping
//!
//! Events are rate limited independently based on a combination of:
//! - **Callsite**: The code location where the log statement appears
//! - **Message content**: The actual log message text
//! - **Contextual fields**: Any fields attached to the event or its parent spans
//!
//! ## How fields contribute to grouping
//!
//! **Only these fields create distinct rate limit groups:**
//! **These fields create distinct rate limit groups:**
//! - `message` - Different messages are rate limited independently
//! - `component_id` - Different components are rate limited independently
//!
//! **All other fields are ignored for grouping**, including:
//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid resource/cost implications from high-cardinality tags
//! - `message` - The log message itself doesn't differentiate groups
//! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting
//! - `internal_log_rate_secs` - Control field for customizing the rate limit window
//! - Any custom fields you add
Expand All @@ -39,42 +39,46 @@
//! info!(component_id = "transform_2", "Processing event"); // Group B
//! // Even though the message is identical, these are rate limited independently
//!
//! // Example 2: Only component_id matters for grouping
//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C
//! info!(component_id = "router", fanout_id = "output_2", "Routing event"); // Group C (same group!)
//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group C (same group!)
//! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event"); // Group C (same!)
//! // All of these share the same group because they have the same component_id
//! // Example 2: Same message with same component_id = same group
//! info!(message = "Hello", component_id = "router"); // Group C
//! info!("Hello", component_id = "router"); // Group C
//! // Both forms of logging with the same message are now treated identically
//!
//! // Example 3: Only component_id and message matter for grouping
//! info!(component_id = "router", fanout_id = "output_1", "Routing event"); // Group D
//! info!(component_id = "router", fanout_id = "output_2", "Routing event"); // Group D
//! info!(component_id = "router", input_id = "kafka", "Routing event"); // Group D
//! // All of these share the same group because they have the same component_id and message
//! // The fanout_id and input_id fields are ignored to avoid resource/cost implications
//!
//! // Example 3: Span fields contribute to grouping
//! // Example 4: Different messages create different groups
//! info!(component_id = "router", "Message A"); // Group E
//! info!(component_id = "router", "Message B"); // Group F
//! // Different messages are rate limited independently
//!
//! // Example 5: Span fields contribute to grouping
//! let span = info_span!("process", component_id = "transform_1");
//! let _enter = span.enter();
//! info!("Processing event"); // Group E: callsite + component_id from span
//! info!("Processing event"); // Group G: message + component_id from span
//! drop(_enter);
//!
//! let span = info_span!("process", component_id = "transform_2");
//! let _enter = span.enter();
//! info!("Processing event"); // Group F: same callsite but different component_id
//! info!("Processing event"); // Group H: same message but different component_id
//!
//! // Example 4: Nested spans - child span fields take precedence
//! // Example 6: Nested spans - child span fields take precedence
//! let outer = info_span!("outer", component_id = "parent");
//! let _outer_guard = outer.enter();
//! let inner = info_span!("inner", component_id = "child");
//! let _inner_guard = inner.enter();
//! info!("Nested event"); // Grouped by component_id = "child"
//!
//! // Example 5: Same callsite with no fields = single rate limit group
//! info!("Simple message"); // Group G
//! info!("Simple message"); // Group G
//! info!("Simple message"); // Group G
//! info!("Nested event"); // Grouped by component_id = "child" + message = "Nested event"
//!
//! // Example 6: Custom fields are ignored for grouping
//! info!(component_id = "source", input_id = "in_1", "Received data"); // Group H
//! info!(component_id = "source", input_id = "in_2", "Received data"); // Group H (same group!)
//! // The input_id field is ignored - only component_id matters
//! // Example 7: Same message with no component_id = single rate limit group
//! info!("Simple message"); // Group I
//! info!("Simple message"); // Group I
//! info!("Simple message"); // Group I
//!
//! // Example 7: Disabling rate limiting for specific logs
//! // Example 8: Disabling rate limiting for specific logs
//! // Rate limiting is ON by default - explicitly disable for important logs
//! warn!(
//! component_id = "critical_component",
Expand All @@ -83,7 +87,7 @@
//! );
//! // This event will NEVER be rate limited, regardless of how often it fires
//!
//! // Example 8: Custom rate limit window for specific events
//! // Example 9: Custom rate limit window for specific events
//! info!(
//! component_id = "noisy_component",
//! message = "Frequent status update",
Expand All @@ -92,15 +96,14 @@
//! // Override the default window for this specific log
//! ```
//!
//! This ensures logs from different components are rate limited independently,
//! This ensures logs from different components and with different messages are rate limited independently,
//! while avoiding resource/cost implications from high-cardinality tags.

use std::fmt;

use dashmap::DashMap;
use tracing_core::{
Event, Metadata, Subscriber,
callsite::Identifier,
field::{Field, Value, Visit, display},
span,
subscriber::Interest,
Expand All @@ -127,7 +130,6 @@ const COMPONENT_ID_FIELD: &str = "component_id";

#[derive(Eq, PartialEq, Hash, Clone)]
struct RateKeyIdentifier {
callsite: Identifier,
rate_limit_key_values: RateLimitedSpanKeys,
}

Expand Down Expand Up @@ -243,16 +245,16 @@ where
// Build a composite key from event fields and span context to determine the rate limit group.
// This multi-step process ensures we capture all relevant contextual information:
//
// 1. Start with event-level fields (e.g., fields directly on the log macro call)
// 1. Start with event-level fields (e.g., message and fields directly on the log macro call)
// 2. Walk up the span hierarchy from root to current span
// 3. Merge in fields from each span, with child spans taking precedence
//
// This means an event's rate limit group is determined by the combination of:
// - Its callsite (handled separately via RateKeyIdentifier)
// - All contextual fields from both the event and its span ancestry
// - Its message content (ensures info!(message = "foo") and info!("foo") are grouped together)
// - All contextual fields from both the event and its span ancestry (e.g., component_id)
//
// Example: The same `info!("msg")` callsite in different component contexts becomes
// distinct rate limit groups, allowing fine-grained control over log flooding.
// Example: The same message in different component contexts becomes distinct rate limit groups,
// allowing fine-grained control over log flooding.
let rate_limit_key_values = {
let mut keys = RateLimitedSpanKeys::default();
// Capture fields directly on this event
Expand All @@ -275,7 +277,6 @@ where
// not, we'll initialize an entry for it.
let metadata = event.metadata();
let id = RateKeyIdentifier {
callsite: metadata.callsite(),
rate_limit_key_values,
};

Expand Down Expand Up @@ -478,25 +479,32 @@ impl From<String> for TraceValue {
///
/// **Tracked fields** (only these create distinct rate limit groups):
/// - `component_id` - Different components are rate limited independently
/// - `message` - Different messages are rate limited independently
///
/// **Ignored fields**: All other fields are ignored for grouping purposes. This avoids resource/cost implications from high-cardinality tags.
/// ```
#[derive(Default, Eq, PartialEq, Hash, Clone)]
struct RateLimitedSpanKeys {
component_id: Option<TraceValue>,
message: Option<TraceValue>,
}

impl RateLimitedSpanKeys {
fn record(&mut self, field: &Field, value: TraceValue) {
if field.name() == COMPONENT_ID_FIELD {
self.component_id = Some(value);
match field.name() {
COMPONENT_ID_FIELD => self.component_id = Some(value),
MESSAGE_FIELD => self.message = Some(value),
_ => {}
}
}

fn merge(&mut self, other: &Self) {
if let Some(component_id) = &other.component_id {
self.component_id = Some(component_id.clone());
}
if let Some(message) = &other.message {
self.message = Some(message.clone());
}
}
}

Expand Down Expand Up @@ -1012,4 +1020,33 @@ mod test {
]
);
}

#[test]
#[serial]
fn message_field_explicit_vs_implicit_same_bucket() {
// info!(message = "Hello") and info!("Hello") should be grouped under the same bucket
let (events, sub) = setup_test(1);
tracing::subscriber::with_default(sub, || {
for _ in 0..30 {
info!(message = "Hello");
info!("Hello");
}
});

let events = events.lock().unwrap();

assert_eq!(
events.len(),
2,
"Expected 2 events (1 message + 1 suppression warning), got {}",
events.len()
);

assert_eq!(events[0].message, "Hello");
assert!(
events[1]
.message
.contains("is being suppressed to avoid flooding")
);
}
}
Loading