Skip to content

Commit f8854d6

Browse files
authored
Merge pull request #438 from answerbook/mdeltito/LOG-19565
feat(throttle): reimplement throttle transform with basic rate limiting + state persistence
2 parents a033400 + ad882f2 commit f8854d6

File tree

4 files changed

+739
-0
lines changed

4 files changed

+739
-0
lines changed

Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ transforms-metrics-mezmo = [
661661
"transforms-mezmo_aggregate",
662662
"transforms-mezmo_tag_cardinality_limit",
663663
"transforms-mezmo_log_to_metric",
664+
"transforms-mezmo_throttle",
664665
"transforms-protobuf_to_metric",
665666
]
666667

@@ -680,6 +681,7 @@ transforms-mezmo_log_to_metric = []
680681
transforms-mezmo_log_clustering = ["dep:lru", "dep:blake2", "dep:base64", "dep:tokio-postgres"]
681682
transforms-mezmo_log_classification = ["dep:grok"]
682683
transforms-mezmo_tag_cardinality_limit = ["dep:bloomy", "dep:hashbrown"]
684+
transforms-mezmo_throttle = []
683685
transforms-remap = []
684686
transforms-route = []
685687
transforms-sample = []
+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
use super::*;
2+
3+
use crate::{
4+
config::{DataType, Input, TransformConfig},
5+
schema,
6+
template::Template,
7+
transforms::Transform,
8+
};
9+
use serde_with::serde_as;
10+
use vector_lib::config::{clone_input_definitions, LogNamespace, OutputId, TransformOutput};
11+
use vector_lib::configurable::configurable_component;
12+
13+
/// Configuration for the `mezmo_throttle` transform.
14+
#[serde_as]
15+
#[configurable_component(transform(
16+
"mezmo_throttle",
17+
"Rate limit logs passing through a topology."
18+
))]
19+
#[derive(Clone, Debug, Default)]
20+
#[serde(deny_unknown_fields)]
21+
pub struct MezmoThrottleConfig {
22+
/// The number of events allowed for a given bucket per configured `window_ms`.
23+
///
24+
/// Each unique key has its own `threshold`.
25+
pub(super) threshold: u32,
26+
27+
/// The time window in which the configured `threshold` is applied, in milliseconds.
28+
#[configurable(metadata(docs::human_name = "Time Window"))]
29+
pub(super) window_ms: u64,
30+
31+
/// The value to group events into separate buckets to be rate limited independently.
32+
///
33+
/// If left unspecified, or if the event doesn't have `key_field`, then the event is not rate
34+
/// limited separately.
35+
#[configurable(metadata(docs::examples = "{{ message }}", docs::examples = "{{ hostname }}",))]
36+
pub(super) key_field: Option<Template>,
37+
38+
/// A logical condition used to exclude events from sampling.
39+
pub(super) exclude: Option<AnyCondition>,
40+
41+
/// Sets the base path for the persistence connection.
42+
/// NOTE: Leaving this value empty will disable state persistence.
43+
#[serde(default = "default_state_persistence_base_path")]
44+
pub(super) state_persistence_base_path: Option<String>,
45+
46+
/// Set how often the state of this transform will be persisted to the [PersistenceConnection]
47+
/// storage backend.
48+
#[serde(default = "default_state_persistence_tick_ms")]
49+
pub(super) state_persistence_tick_ms: u64,
50+
51+
/// The maximum amount of jitter (ms) to add to the `state_persistence_tick_ms`
52+
/// flush interval.
53+
#[serde(default = "default_state_persistence_max_jitter_ms")]
54+
pub(super) state_persistence_max_jitter_ms: u64,
55+
}
56+
57+
const fn default_state_persistence_base_path() -> Option<String> {
58+
None
59+
}
60+
61+
const fn default_state_persistence_tick_ms() -> u64 {
62+
30000
63+
}
64+
65+
const fn default_state_persistence_max_jitter_ms() -> u64 {
66+
750
67+
}
68+
69+
impl_generate_config_from_default!(MezmoThrottleConfig);
70+
71+
#[async_trait::async_trait]
72+
#[typetag::serde(name = "mezmo_throttle")]
73+
impl TransformConfig for MezmoThrottleConfig {
74+
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
75+
Throttle::new(self, context, ThrottleClock::new()).map(Transform::event_task)
76+
}
77+
78+
fn input(&self) -> Input {
79+
Input::log()
80+
}
81+
82+
fn outputs(
83+
&self,
84+
_: vector_lib::enrichment::TableRegistry,
85+
input_definitions: &[(OutputId, schema::Definition)],
86+
_: LogNamespace,
87+
) -> Vec<TransformOutput> {
88+
// The event is not modified, so the definition is passed through as-is
89+
vec![TransformOutput::new(
90+
DataType::Log,
91+
clone_input_definitions(input_definitions),
92+
)]
93+
}
94+
}

0 commit comments

Comments
 (0)