Skip to content

feat: Support ordering LogEvent's #41

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ default = []
rusoto = ["rusoto_logs/default", "rusoto_core/default"]
rusoto_rustls = ["rusoto_logs/rustls", "rusoto_core/rustls"]
awssdk = ["aws-sdk-cloudwatchlogs"]
ordered_logs = []

[dependencies]
anyhow = "1.0"
Expand All @@ -34,6 +35,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [

[dev-dependencies]
aws-config = "1"
insta = "1.40.0"
tokio = { version = "1.28.0", features = [
"rt",
"rt-multi-thread",
Expand All @@ -42,4 +44,3 @@ tokio = { version = "1.28.0", features = [
] }
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["json"] }
insta = "1.40.0"
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# tracing-cloudwatch

tracing-cloudwatch is a custom tracing-subscriber layer that sends your application's tracing events(logs) to AWS CloudWatch Logs.
tracing-cloudwatch is a custom tracing-subscriber layer that sends your application's tracing events(logs) to AWS CloudWatch Logs.

We have supported [rusoto](https://github.com/rusoto/rusoto) and the [AWS SDK](https://github.com/awslabs/aws-sdk-rust) as AWS clients.

Expand Down Expand Up @@ -35,6 +35,12 @@ async fn main() {
}
```

#### Chronological order

When aggregating logs from multiple places (or integrations such as [tracing-gstreamer](https://crates.io/crates/tracing-gstreamer)), messages can become unordered. This causes a `InvalidParameterException: Log events in a single PutLogEvents request must be in chronological order.` error from the CloudWatch client. To mediate this, you may enable the `ordered_logs` feature. Take into consideration that this can possibly increase processing time significantly depending on the number of events in the batch. Your milage may vary!

There is some additional context in https://github.com/ymgyt/tracing-cloudwatch/issues/40

### With Rusoto

feature `rusoto` required
Expand Down Expand Up @@ -84,7 +90,7 @@ tracing_subscriber::registry::Registry::default()

Currently, following AWS IAM Permissions required

* `logs:PutLogEvents`
- `logs:PutLogEvents`

## CloudWatch Log Groups and Streams

Expand All @@ -95,7 +101,6 @@ This crate does not create a log group and log stream, so if the specified log g
We haven't implemented any custom retry logic or timeout settings within the crate. We assume that these configurations are handled through the SDK Client.
For instance, in the AWS SDK, you can set up these configurations using [`timeout_config`](https://docs.rs/aws-sdk-cloudwatchlogs/0.28.0/aws_sdk_cloudwatchlogs/config/struct.Builder.html#method.timeout_config) and [`retry_config`](https://docs.rs/aws-sdk-cloudwatchlogs/0.28.0/aws_sdk_cloudwatchlogs/config/struct.Builder.html#method.retry_config)


## License

This project is licensed under the [MIT license.](./LICENSE)
97 changes: 96 additions & 1 deletion src/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ where
}
}

let logs = std::mem::take(&mut queue);
let logs: Vec<LogEvent> = Self::take_from_queue(&mut queue);

if let Err(err) = client.put_logs(config.destination.clone(), logs).await {
eprintln!(
Expand All @@ -143,4 +143,99 @@ where
}
}
}

fn take_from_queue(queue: &mut Vec<LogEvent>) -> Vec<LogEvent> {
if cfg!(feature = "ordered_logs") {
let mut logs = std::mem::take(queue);
logs.sort_by_key(|log| log.timestamp);
logs
} else {
std::mem::take(queue)
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};

const ONE_DAY_NS: i64 = 86_400_000_000_000;
const DAY_ONE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + ONE_DAY_NS);
const DAY_TWO: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 2));
const DAY_THREE: DateTime<Utc> = DateTime::from_timestamp_nanos(0 + (ONE_DAY_NS * 3));

#[cfg(not(feature = "ordered_logs"))]
#[test]
fn does_not_order_logs_by_default() {
let mut unordered_queue = vec![
LogEvent {
message: "1".to_string(),
timestamp: DAY_ONE,
},
LogEvent {
message: "3".to_string(),
timestamp: DAY_THREE,
},
LogEvent {
message: "2".to_string(),
timestamp: DAY_TWO,
},
];
let still_unordered_queue =
BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);

let mut still_unordered_queue_iter = still_unordered_queue.iter();
assert_eq!(
DAY_ONE,
still_unordered_queue_iter.next().unwrap().timestamp
);
assert_eq!(
DAY_THREE,
still_unordered_queue_iter.next().unwrap().timestamp
);
assert_eq!(
DAY_TWO,
still_unordered_queue_iter.next().unwrap().timestamp
);
}

#[cfg(feature = "ordered_logs")]
mod ordering {
use super::*;

fn assert_is_ordered(logs: Vec<LogEvent>) {
let mut last_timestamp = DateTime::from_timestamp_nanos(0);

for log in logs {
assert!(
log.timestamp > last_timestamp,
"Not true: {} > {}",
log.timestamp,
last_timestamp
);
last_timestamp = log.timestamp;
}
}

#[test]
fn orders_logs_when_enabled() {
let mut unordered_queue = vec![
LogEvent {
message: "1".to_string(),
timestamp: DAY_ONE,
},
LogEvent {
message: "3".to_string(),
timestamp: DAY_THREE,
},
LogEvent {
message: "2".to_string(),
timestamp: DAY_TWO,
},
];
let ordered_queue = BatchExporter::<NoopClient>::take_from_queue(&mut unordered_queue);
assert_is_ordered(ordered_queue);
}
}
}
Loading