Skip to content

Commit 066cac3

Browse files
committed
feat(throttle): reimplement throttle transform with basic rate limiting
The native `throttle` transform is backed by the `governor` library which provides a very flexible GCRA implementation for rate limiting. The library does not provide any way to explicitly initialize the quota state, however, meaning that we cannot rehydrate the initial state of the transform from state persistence (rocksdb). For our purposes, we do not need the flexibity and advanced features (e.g. burst rate capacity) provided by this crate. Instead, this adds our own implementation `mezmo_throttle` which is identical, but instead uses a very simple leaky bucket implementation. The state is represented by a `HashMap` storing types that can natively be represented as a JSON string for storage. Ref: LOG-19565
1 parent a033400 commit 066cac3

File tree

4 files changed

+505
-0
lines changed

4 files changed

+505
-0
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ transforms-metrics-mezmo = [
661661
"transforms-mezmo_aggregate",
662662
"transforms-mezmo_tag_cardinality_limit",
663663
"transforms-mezmo_log_to_metric",
664+
"transforms-mezmo_throttle",
664665
"transforms-protobuf_to_metric",
665666
]
666667

@@ -680,6 +681,7 @@ transforms-mezmo_log_to_metric = []
680681
transforms-mezmo_log_clustering = ["dep:lru", "dep:blake2", "dep:base64", "dep:tokio-postgres"]
681682
transforms-mezmo_log_classification = ["dep:grok"]
682683
transforms-mezmo_tag_cardinality_limit = ["dep:bloomy", "dep:hashbrown"]
684+
transforms-mezmo_throttle = []
683685
transforms-remap = []
684686
transforms-route = []
685687
transforms-sample = []
+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use super::*;
2+
3+
/// Configuration for the `mezmo_throttle` transform.
4+
#[serde_as]
5+
#[configurable_component(transform(
6+
"mezmo_throttle",
7+
"Rate limit logs passing through a topology."
8+
))]
9+
#[derive(Clone, Debug, Default)]
10+
#[serde(deny_unknown_fields)]
11+
pub struct MezmoThrottleConfig {
12+
/// The number of events allowed for a given bucket per configured `window_secs`.
13+
///
14+
/// Each unique key has its own `threshold`.
15+
pub(super) threshold: u32,
16+
17+
/// The time window in which the configured `threshold` is applied, in seconds.
18+
#[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
19+
#[configurable(metadata(docs::human_name = "Time Window"))]
20+
pub(super) window_secs: Duration,
21+
22+
/// The value to group events into separate buckets to be rate limited independently.
23+
///
24+
/// If left unspecified, or if the event doesn't have `key_field`, then the event is not rate
25+
/// limited separately.
26+
#[configurable(metadata(docs::examples = "{{ message }}", docs::examples = "{{ hostname }}",))]
27+
pub(super) key_field: Option<Template>,
28+
29+
/// A logical condition used to exclude events from sampling.
30+
pub(super) exclude: Option<AnyCondition>,
31+
}
32+
33+
impl_generate_config_from_default!(MezmoThrottleConfig);
34+
35+
#[async_trait::async_trait]
36+
#[typetag::serde(name = "mezmo_throttle")]
37+
impl TransformConfig for MezmoThrottleConfig {
38+
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
39+
Throttle::new(self, context, ThrottleClock {}).map(Transform::event_task)
40+
}
41+
42+
fn input(&self) -> Input {
43+
Input::log()
44+
}
45+
46+
fn outputs(
47+
&self,
48+
_: vector_lib::enrichment::TableRegistry,
49+
input_definitions: &[(OutputId, schema::Definition)],
50+
_: LogNamespace,
51+
) -> Vec<TransformOutput> {
52+
// The event is not modified, so the definition is passed through as-is
53+
vec![TransformOutput::new(
54+
DataType::Log,
55+
clone_input_definitions(input_definitions),
56+
)]
57+
}
58+
}

0 commit comments

Comments
 (0)