Skip to content
148 changes: 141 additions & 7 deletions crashtracker/src/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
use super::*;
use crate::shared::constants::*;
use anyhow::Context;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use std::time::{Duration, Instant};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::net::UnixListener;

pub fn resolve_frames(
Expand Down Expand Up @@ -64,12 +64,23 @@ pub fn receiver_entry_point_unix_socket(socket_path: impl AsRef<str>) -> anyhow:
// Dropping the stream closes it, allowing the collector to exit if it was waiting.
}

pub fn receiver_timeout() -> Duration {
// https://github.com/DataDog/libdatadog/issues/717
if let Ok(s) = std::env::var("DD_CRASHTRACKER_RECEIVER_TIMEOUT_MS") {
if let Ok(v) = s.parse() {
return Duration::from_millis(v);
}
}
// Default value
Duration::from_millis(4000)
}

pub fn receiver_entry_point_stdin() -> anyhow::Result<()> {
let stream = BufReader::new(tokio::io::stdin());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
rt.block_on(receiver_entry_point(stream))?;
rt.block_on(receiver_entry_point(receiver_timeout(), stream))?;
Ok(())
}

Expand All @@ -81,7 +92,7 @@ pub async fn async_receiver_entry_point_unix_socket(
loop {
let (unix_stream, _) = listener.accept().await?;
let stream = BufReader::new(unix_stream);
let res = receiver_entry_point(stream).await;
let res = receiver_entry_point(receiver_timeout(), stream).await;

if one_shot {
return res;
Expand All @@ -99,9 +110,10 @@ pub async fn async_receiver_entry_point_unix_socket(
/// See comments in [crashtracker/lib.rs] for a full architecture
/// description.
async fn receiver_entry_point(
timeout: Duration,
stream: impl AsyncBufReadExt + std::marker::Unpin,
) -> anyhow::Result<()> {
match receive_report(stream).await? {
match receive_report(timeout, stream).await? {
CrashReportStatus::NoCrash => Ok(()),
CrashReportStatus::CrashReport(config, mut crash_info) => {
resolve_frames(&config, &mut crash_info)?;
Expand Down Expand Up @@ -283,30 +295,64 @@ enum CrashReportStatus {
/// (for instance if it crashed while calculating the crash-report), we return
/// a PartialCrashReport.
async fn receive_report(
timeout: Duration,
stream: impl AsyncBufReadExt + std::marker::Unpin,
) -> anyhow::Result<CrashReportStatus> {
let mut crashinfo = CrashInfo::new();
let mut stdin_state = StdinState::Waiting;
let mut config = None;

let mut lines = stream.lines();
let mut deadline = None;
// Start the timeout counter when the deadline when the first crash message is recieved
let mut remaining_timeout = Duration::MAX;

//TODO: This assumes that the input is valid UTF-8.
while let Some(line) = lines.next_line().await? {
loop {
let next = tokio::time::timeout(remaining_timeout, lines.next_line()).await;
if let Err(elapsed) = next {
eprintln!("Timeout: {elapsed}");
break;
};
let next = next.unwrap();
if let Err(io_err) = next {
eprintln!("IO Error: {io_err}");
break;
}
let next = next.unwrap();
if next.is_none() {
break;
}
let line = next.unwrap();

match process_line(&mut crashinfo, &mut config, line, stdin_state) {
Ok(next_state) => stdin_state = next_state,
Ok(next_state) => {
stdin_state = next_state;
if matches!(stdin_state, StdinState::Done) {
break;
}
}
Err(e) => {
// If the input is corrupted, stop and salvage what we can
stdin_state = StdinState::InternalError(e.to_string());
break;
}
}
if let Some(deadline) = deadline {
// The clock was already ticking, update the remaining time
remaining_timeout = deadline - Instant::now()
} else {
// We've recieved the first message from the collector, start the clock ticking.
deadline = Some(Instant::now() + timeout);
remaining_timeout = timeout;
}
}

if !crashinfo.crash_seen() {
return Ok(CrashReportStatus::NoCrash);
}

// Without a config, we don't even know the endpoint to transmit to. Not much to do to recover.
let config = config.context("Missing crashtracker configuration")?;
for filename in &config.additional_files {
crashinfo
Expand All @@ -327,3 +373,91 @@ async fn receive_report(
))
}
}

#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncWriteExt;
use tokio::net::UnixStream;

async fn to_socket(
target: &mut tokio::net::UnixStream,
msg: impl AsRef<str>,
) -> anyhow::Result<usize> {
let msg = msg.as_ref();
let n = target.write(format!("{msg}\n").as_bytes()).await?;
target.flush().await?;
Ok(n)
}

async fn send_report(delay: Duration, mut stream: UnixStream) -> anyhow::Result<()> {
let sender = &mut stream;
to_socket(sender, DD_CRASHTRACK_BEGIN_SIGINFO).await?;
to_socket(
sender,
serde_json::to_string(&SigInfo {
signame: Some("SIGSEGV".to_string()),
signum: 11,
faulting_address: None,
})?,
)
.await?;
to_socket(sender, DD_CRASHTRACK_END_SIGINFO).await?;

to_socket(sender, DD_CRASHTRACK_BEGIN_CONFIG).await?;
to_socket(
sender,
serde_json::to_string(&CrashtrackerConfiguration::new(
vec![],
false,
false,
None,
StacktraceCollection::Disabled,
3000,
)?)?,
)
.await?;
to_socket(sender, DD_CRASHTRACK_END_CONFIG).await?;
tokio::time::sleep(delay).await;
to_socket(sender, DD_CRASHTRACK_DONE).await?;
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_receive_report_short_timeout() -> anyhow::Result<()> {
let (sender, receiver) = tokio::net::UnixStream::pair()?;

let join_handle1 = tokio::spawn(receive_report(
Duration::from_secs(1),
BufReader::new(receiver),
));
let join_handle2 = tokio::spawn(send_report(Duration::from_secs(2), sender));

let crash_report = join_handle1.await??;
assert!(matches!(
crash_report,
CrashReportStatus::PartialCrashReport(_, _, _)
));
let sender_error = join_handle2.await?.unwrap_err().to_string();
assert_eq!(sender_error, "Broken pipe (os error 32)");
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_receive_report_long_timeout() -> anyhow::Result<()> {
let (sender, receiver) = tokio::net::UnixStream::pair()?;

let join_handle1 = tokio::spawn(receive_report(
Duration::from_secs(2),
BufReader::new(receiver),
));
let join_handle2 = tokio::spawn(send_report(Duration::from_secs(1), sender));

let crash_report = join_handle1.await??;
assert!(matches!(crash_report, CrashReportStatus::CrashReport(_, _)));
join_handle2.await??;
Ok(())
}
}