From 4fc72c011ae5552ac4bd97cb69354f4205e1107f Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 13 Aug 2021 17:01:44 -0700 Subject: [PATCH] subscriber: add ability to record events to a file (#86) Closes #84 --- console-subscriber/Cargo.toml | 2 + console-subscriber/src/aggregator.rs | 17 +- console-subscriber/src/builder.rs | 18 +++ console-subscriber/src/init.rs | 1 + console-subscriber/src/lib.rs | 6 + console-subscriber/src/record.rs | 224 +++++++++++++++++++++++++++ 6 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 console-subscriber/src/record.rs diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 8b185a37..00faefc9 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -17,6 +17,8 @@ tracing = "0.1.26" tracing-subscriber = { version = "0.2.17", default-features = false, features = ["fmt", "registry", "env-filter"] } futures = { version = "0.3", default-features = false } hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" [dev-dependencies] diff --git a/console-subscriber/src/aggregator.rs b/console-subscriber/src/aggregator.rs index 640963c2..64da6134 100644 --- a/console-subscriber/src/aggregator.rs +++ b/console-subscriber/src/aggregator.rs @@ -1,4 +1,4 @@ -use crate::WatchRequest; +use crate::{record::Recorder, WatchRequest}; use super::{Event, WakeOp, Watch, WatchKind}; use console_api as proto; @@ -67,6 +67,9 @@ pub(crate) struct Aggregator { /// A table that contains the span ID to pretty task ID mappings. task_id_mappings: HashMap, + + /// A sink to record all events to a file. + recorder: Option, } #[derive(Debug)] @@ -152,6 +155,10 @@ impl Aggregator { stats: TaskData::default(), task_id_counter: 0, task_id_mappings: HashMap::new(), + recorder: builder + .recording_path + .as_ref() + .map(|path| Recorder::new(path).expect("creating recorder")), } } @@ -205,7 +212,13 @@ impl Aggregator { // channel is almost full. while let Some(event) = self.events.recv().now_or_never() { match event { - Some(event) => self.update_state(event), + Some(event) => { + // always be recording... + if let Some(ref recorder) = self.recorder { + recorder.record(&event); + } + self.update_state(event) + } // The channel closed, no more events will be emitted...time // to stop aggregating. None => { diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index 4e9bd6d8..9d69978c 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -1,6 +1,7 @@ use super::{Server, TasksLayer}; use std::{ net::{SocketAddr, ToSocketAddrs}, + path::PathBuf, time::Duration, }; @@ -23,6 +24,9 @@ pub struct Builder { /// The address on which to serve the RPC server. pub(super) server_addr: SocketAddr, + + /// If and where to save a recording of the events. + pub(super) recording_path: Option, } impl Default for Builder { @@ -33,6 +37,7 @@ impl Default for Builder { publish_interval: TasksLayer::DEFAULT_PUBLISH_INTERVAL, retention: TasksLayer::DEFAULT_RETENTION, server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT), + recording_path: None, } } } @@ -100,6 +105,14 @@ impl Builder { } } + /// Sets the path to record the events to the file system. + pub fn recording_path(self, path: impl Into) -> Self { + Self { + recording_path: Some(path.into()), + ..self + } + } + /// Completes the builder, returning a [`TasksLayer`] and [`Server`] task. pub fn build(self) -> (TasksLayer, Server) { TasksLayer::build(self) @@ -112,6 +125,7 @@ impl Builder { /// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) | /// | `TOKIO_CONSOLE_BIND` | a HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` | /// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) | + /// | `TOKIO_CONSOLE_RECORD_PATH` | The file path to save a recording | None | pub fn with_default_env(mut self) -> Self { if let Ok(retention) = std::env::var("TOKIO_CONSOLE_RETENTION_SECS") { self.retention = Duration::from_secs( @@ -137,6 +151,10 @@ impl Builder { ); } + if let Ok(path) = std::env::var("TOKIO_CONSOLE_RECORD_PATH") { + self.recording_path = Some(path.into()); + } + self } } diff --git a/console-subscriber/src/init.rs b/console-subscriber/src/init.rs index 601fb71d..41c5dc1d 100644 --- a/console-subscriber/src/init.rs +++ b/console-subscriber/src/init.rs @@ -27,6 +27,7 @@ type ConsoleSubscriberLayer = Layered>; /// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) | /// | `TOKIO_CONSOLE_BIND` | A HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` | /// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) | +/// | `TOKIO_CONSOLE_RECORD_PATH` | The file path to save a recording | None | /// | `RUST_LOG` | Configure the tracing filter. See [`EnvFilter`] for further information | `tokio=trace` | /// /// ## Further customization diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index c12ceaeb..9283b908 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -1,4 +1,5 @@ use console_api as proto; +use serde::Serialize; use tokio::sync::{mpsc, oneshot}; use std::{ @@ -18,6 +19,7 @@ mod aggregator; mod builder; mod callsites; mod init; +mod record; use aggregator::Aggregator; pub use builder::Builder; @@ -91,6 +93,7 @@ enum Event { }, } +#[derive(Clone, Copy, Serialize)] enum WakeOp { Wake, WakeByRef, @@ -122,6 +125,7 @@ impl TasksLayer { ?config.publish_interval, ?config.retention, ?config.server_addr, + ?config.recording_path, "configured console subscriber" ); @@ -130,6 +134,7 @@ impl TasksLayer { let aggregator = Aggregator::new(events, rpcs, &config); let flush = aggregator.flush().clone(); + let server = Server { aggregator: Some(aggregator), addr: config.server_addr, @@ -174,6 +179,7 @@ impl TasksLayer { fn send(&self, event: Event) { use mpsc::error::TrySendError; + match self.tx.try_reserve() { Ok(permit) => permit.send(event), Err(TrySendError::Closed(_)) => tracing::warn!( diff --git a/console-subscriber/src/record.rs b/console-subscriber/src/record.rs new file mode 100644 index 00000000..be8fe4a3 --- /dev/null +++ b/console-subscriber/src/record.rs @@ -0,0 +1,224 @@ +use serde::{ + ser::{SerializeSeq, SerializeStruct}, + Serialize, +}; +use std::{ + fs::File, + io, + path::Path, + sync::{Arc, Mutex}, + time::SystemTime, +}; + +use console_api as proto; + +/// This marks the currently understood version of the recording format. This +/// should be increased whenever the format has a breaking change that we +/// cannot parse. Though, even better, we should probably support parsing +/// older versions. +/// +/// But while this is in rapid development, we can move fast and break things. +const DATA_FORMAT_VERSION: u8 = 1; + +pub(crate) struct Recorder { + buf: Arc>, + + worker: std::thread::JoinHandle<()>, +} + +struct Io { + buf: Arc>, + file: File, +} + +struct RecordBuf { + /// The current buffer to serialize events into. + bytes: Vec, + /// The "next" buffer that should be used when the IO thread takes the + /// current buffer. After flushing, the IO thread will put the buffer + /// back in this slot, so the allocation can be reused. + next: Vec, +} + +#[derive(Serialize)] +struct Header { + v: u8, +} + +#[derive(Serialize)] +enum Event<'a> { + Spawn { + id: u64, + at: SystemTime, + fields: SerializeFields<'a>, + }, + Enter { + id: u64, + at: SystemTime, + }, + Exit { + id: u64, + at: SystemTime, + }, + Close { + id: u64, + at: SystemTime, + }, + Waker { + id: u64, + op: super::WakeOp, + at: SystemTime, + }, +} + +struct SerializeFields<'a>(&'a [proto::Field]); + +struct SerializeField<'a>(&'a proto::Field); + +impl Recorder { + pub(crate) fn new(path: &Path) -> io::Result { + let buf = Arc::new(Mutex::new(RecordBuf::new())); + let buf2 = buf.clone(); + let file = std::fs::File::create(path)?; + + let worker = std::thread::Builder::new() + .name("console/subscriber/recorder/io".into()) + .spawn(move || { + record_io(Io { buf: buf2, file }); + })?; + + let recorder = Recorder { buf, worker }; + + recorder.write(&Header { + v: DATA_FORMAT_VERSION, + }); + + Ok(recorder) + } + + pub(crate) fn record(&self, event: &crate::Event) { + let event = match event { + crate::Event::Spawn { id, at, fields, .. } => Event::Spawn { + id: id.into_u64(), + at: *at, + fields: SerializeFields(fields), + }, + crate::Event::Enter { id, at } => Event::Enter { + id: id.into_u64(), + at: *at, + }, + crate::Event::Exit { id, at } => Event::Exit { + id: id.into_u64(), + at: *at, + }, + crate::Event::Close { id, at } => Event::Close { + id: id.into_u64(), + at: *at, + }, + crate::Event::Waker { id, op, at } => Event::Waker { + id: id.into_u64(), + at: *at, + op: *op, + }, + _ => return, + }; + + self.write(&event); + } + + fn write(&self, val: &T) { + let mut buf = self.buf.lock().unwrap(); + serde_json::to_writer(&mut buf.bytes, val).expect("json"); + buf.bytes.push(b'\n'); + drop(buf); + self.worker.thread().unpark(); + } +} + +impl RecordBuf { + fn new() -> Self { + Self { + bytes: Vec::new(), + next: Vec::new(), + } + } + + /// Takes the existing bytes to be written, and resets self so that + /// it may continue to buffer events. + fn take(&mut self) -> Vec { + let next = std::mem::take(&mut self.next); + std::mem::replace(&mut self.bytes, next) + } + + fn put(&mut self, mut next: Vec) { + debug_assert_eq!(self.next.capacity(), 0); + next.clear(); + self.next = next; + } +} + +fn record_io(mut dst: Io) { + use std::io::Write; + + loop { + std::thread::park(); + + // Only lock the mutex to take the bytes out. The file write could + // take a relatively long time, and we don't want to be blocking + // the serialization end holding this lock. + let bytes = dst.buf.lock().unwrap().take(); + match dst.file.write_all(&bytes) { + Ok(()) => { + dst.buf.lock().unwrap().put(bytes); + } + Err(_e) => { + // TODO: what to do if file error? + } + } + } +} + +impl serde::Serialize for SerializeFields<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.0.len()))?; + for element in self.0 { + seq.serialize_element(&SerializeField(element))?; + } + seq.end() + } +} + +impl serde::Serialize for SerializeField<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let mut ser = serializer.serialize_struct("Field", 2)?; + ser.serialize_field( + "name", + match self.0.name.as_ref().expect("name") { + proto::field::Name::StrName(ref n) => n, + proto::field::Name::NameIdx(_idx) => todo!("metadata idx"), + }, + )?; + + match self.0.value.as_ref().expect("field value") { + proto::field::Value::DebugVal(v) | proto::field::Value::StrVal(v) => { + ser.serialize_field("value", v)?; + } + proto::field::Value::U64Val(v) => { + ser.serialize_field("value", v)?; + } + proto::field::Value::I64Val(v) => { + ser.serialize_field("value", v)?; + } + proto::field::Value::BoolVal(v) => { + ser.serialize_field("value", v)?; + } + } + ser.end() + } +}