Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
b0af10d
enhancement(datadog_logs sink, datadog_traces sink): default to zstd …
dsmith3197 Dec 22, 2023
6afe1fe
revert traces
dsmith3197 Jan 11, 2024
68fa03d
Merge remote-tracking branch 'origin' into dougsmith/datadog-default-…
pront Oct 4, 2024
519e8fd
Merge remote-tracking branch 'origin/master' into dougsmith/datadog-d…
jszwedko Oct 11, 2024
4d2b4f6
Hardcode zstd
jszwedko Oct 11, 2024
3ffee36
Merge remote-tracking branch 'origin/master' into dougsmith/datadog-d…
jszwedko Oct 17, 2024
4153481
Add changelog entry
jszwedko Oct 18, 2024
174e76d
Add missing authors line
jszwedko Oct 18, 2024
e5f0f37
Remove hardcoded compression and override
jszwedko Oct 18, 2024
54b9e5d
Try to fix component validation tests
jszwedko Jan 13, 2025
34bb341
Merge remote-tracking branch 'origin/master' into dougsmith/datadog-d…
pront Apr 8, 2025
673eaff
Merge branch 'master' into dougsmith/datadog-default-zstd
thomasqueirozb Jun 30, 2025
0694476
Merge master into dougsmith/datadog-default-zstd and resolve conflicts
pront Feb 19, 2026
9b53008
Merge master and fix datadog_logs validation tests
pront Feb 19, 2026
be6ee74
chore(dev): cargo fmt
pront Feb 19, 2026
0457ade
Merge branch 'master' into dougsmith/datadog-default-zstd
pront Feb 19, 2026
c3a4537
Fix changelog: correct prior default from gzip to no compression
pront Feb 19, 2026
2d0aba8
fix(datadog_logs): Use consistent default compression fallback
pront Feb 19, 2026
0b95a4d
chore(dev): cargo fmt
pront Feb 19, 2026
fc13485
test(datadog_logs): Add test coverage for default zstd compression
pront Feb 19, 2026
b786b95
test(datadog_logs): Add test coverage for default zstd compression
pront Feb 19, 2026
5fdc702
Merge branch 'master' into dougsmith/datadog-default-zstd
pront Feb 19, 2026
010a844
remove compression for validation tests
pront Feb 20, 2026
e74be90
clippy fix
pront Feb 20, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ opendal = { version = "0.54", default-features = false, features = ["services-we

# Tower
tower = { version = "0.5.2", default-features = false, features = ["buffer", "limit", "retry", "timeout", "util", "balance", "discover"] }
tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-gzip", "trace"] }
tower-http = { version = "0.4.4", default-features = false, features = ["compression-full", "decompression-full", "trace"] }
# Serde
serde.workspace = true
serde-toml-merge = { version = "0.3.11", default-features = false }
Expand Down
6 changes: 6 additions & 0 deletions changelog.d/datadog-logs-zstd.enhancement.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The `datadog_logs` sink now defaults to `zstd` compression instead of no compression. This results in
better network efficiency and higher throughput. You can explicitly set `compression = "none"` to
restore the previous behavior of no compression, or set `compression = "gzip"` if you were previously
using gzip compression explicitly.

authors: jszwedko pront
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ sinks:
inputs: [ "parse_message" ]
endpoint: "http://localhost:8080"
default_api_key: "DEADBEEF"
compression: "gzip"
healthcheck:
enabled: false
buffer:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ sinks:
inputs: [ "parse_message" ]
endpoint: "http://localhost:8080"
default_api_key: "DEADBEEF"
compression: "gzip"
healthcheck:
enabled: false
buffer:
Expand Down
21 changes: 20 additions & 1 deletion src/components/validation/resources/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,29 @@ impl HttpResourceOutputContext<'_> {
let mut decoder = decoder.clone();

async move {
// Extract the Content-Encoding header before consuming the request
let content_encoding = request
.headers()
.get("content-encoding")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());

match request.into_body().collect().await.map(Collected::to_bytes) {
Err(_) => StatusCode::INTERNAL_SERVER_ERROR.into_response(),
Ok(body) => {
let byte_size = body.len();

// Validation tests should not use compression - error if we receive compressed data
if let Some(encoding) = &content_encoding
&& encoding != "identity"
{
error!(
"Received compressed data (Content-Encoding: {encoding}). \
Validation tests assert on bytes sizes and compressed size might not be deterministic."
);
return StatusCode::BAD_REQUEST.into_response();
}

let mut body = BytesMut::from(&body[..]);
loop {
match decoder.decode_eof(&mut body) {
Expand Down Expand Up @@ -381,7 +400,7 @@ impl HttpResourceOutputContext<'_> {
}
Err(_) => {
error!(
"HTTP server failed to decode {:?}",
"HTTP server failed to decode body: {:?}",
String::from_utf8_lossy(&body)
);
return StatusCode::INTERNAL_SERVER_ERROR.into_response();
Expand Down
64 changes: 61 additions & 3 deletions src/sinks/datadog/logs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ impl SinkBatchSettings for DatadogLogsDefaultBatchSettings {

/// Configuration for the `datadog_logs` sink.
#[configurable_component(sink("datadog_logs", "Publish log events to Datadog."))]
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct DatadogLogsConfig {
#[serde(flatten)]
pub local_dd_common: LocalDatadogCommonConfig,

#[configurable(derived)]
#[serde(default)]
#[derivative(Default(value = "default_compression()"))]
#[serde(default = "default_compression")]
pub compression: Option<Compression>,

#[configurable(derived)]
Expand All @@ -73,6 +75,10 @@ pub struct DatadogLogsConfig {
pub conforms_as_agent: bool,
}

const fn default_compression() -> Option<Compression> {
Some(Compression::zstd_default())
}

impl GenerateConfig for DatadogLogsConfig {
fn generate_config() -> toml::Value {
toml::from_str(indoc! {r#"
Expand Down Expand Up @@ -155,7 +161,7 @@ impl DatadogLogsConfig {
protocol,
conforms_as_agent,
)
.compression(self.compression.unwrap_or_default())
.compression(self.compression.or_else(default_compression).unwrap())
.build();

Ok(VectorSink::from_event_streamsink(sink))
Expand Down Expand Up @@ -226,6 +232,56 @@ mod test {
crate::test_util::test_generate_config::<DatadogLogsConfig>();
}

#[test]
fn compression_config_field() {
// Verify the default compression function returns zstd
assert_eq!(default_compression(), Some(Compression::zstd_default()));

// Test 1: Config deserialized without compression field gets zstd default
// (due to #[serde(default = "default_compression")])
let config_yaml = indoc! {r#"
default_api_key: "test_key"
"#};

let config: DatadogLogsConfig = serde_yaml::from_str(config_yaml).unwrap();
// The serde default applies immediately during deserialization
assert!(matches!(config.compression, Some(Compression::Zstd(_))));

// Test 2: When explicitly set to "none", it should be Some(Compression::None)
let config_yaml_with_none = indoc! {r#"
default_api_key: "test_key"
compression: "none"
"#};

let config_no_compression: DatadogLogsConfig =
serde_yaml::from_str(config_yaml_with_none).unwrap();
assert_eq!(config_no_compression.compression, Some(Compression::None));

// Test 3: When explicitly set to "zstd", it should be Some(Compression::Zstd)
let config_yaml_with_zstd = indoc! {r#"
default_api_key: "test_key"
compression: "zstd"
"#};

let config_zstd: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_zstd).unwrap();
assert!(matches!(
config_zstd.compression,
Some(Compression::Zstd(_))
));

// Test 4: When explicitly set to "gzip", it should be Some(Compression::Gzip)
let config_yaml_with_gzip = indoc! {r#"
default_api_key: "test_key"
compression: "gzip"
"#};

let config_gzip: DatadogLogsConfig = serde_yaml::from_str(config_yaml_with_gzip).unwrap();
assert!(matches!(
config_gzip.compression,
Some(Compression::Gzip(_))
));
}

impl ValidatableComponent for DatadogLogsConfig {
fn validation_configuration() -> ValidationConfiguration {
let endpoint = "http://127.0.0.1:9005".to_string();
Expand All @@ -235,6 +291,8 @@ mod test {
default_api_key: Some("unused".to_string().into()),
..Default::default()
},
// Disable compression for validation tests to ensure byte counting is accurate
compression: Some(Compression::None),
..Default::default()
};

Expand Down
1 change: 0 additions & 1 deletion tests/e2e/datadog-logs/data/vector.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ type = "datadog_logs"
default_api_key = "unused"
endpoint = "http://fakeintake-vector:80"
batch.timeout_secs = 1
compression = "gzip"
39 changes: 21 additions & 18 deletions website/cue/reference/components/sinks/generated/datadog_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -69,28 +69,31 @@ generated: components: sinks: datadog_logs: configuration: {
All compression algorithms use the default compression level unless otherwise specified.
"""
required: false
type: string: enum: {
gzip: """
[Gzip][gzip] compression.
type: string: {
default: "zstd"
enum: {
gzip: """
[Gzip][gzip] compression.

[gzip]: https://www.gzip.org/
"""
none: "No compression."
snappy: """
[Snappy][snappy] compression.
[gzip]: https://www.gzip.org/
"""
none: "No compression."
snappy: """
[Snappy][snappy] compression.

[snappy]: https://github.com/google/snappy/blob/main/docs/README.md
"""
zlib: """
[Zlib][zlib] compression.
[snappy]: https://github.com/google/snappy/blob/main/docs/README.md
"""
zlib: """
[Zlib][zlib] compression.

[zlib]: https://zlib.net/
"""
zstd: """
[Zstandard][zstd] compression.
[zlib]: https://zlib.net/
"""
zstd: """
[Zstandard][zstd] compression.

[zstd]: https://facebook.github.io/zstd/
"""
[zstd]: https://facebook.github.io/zstd/
"""
}
}
}
conforms_as_agent: {
Expand Down
Loading