Skip to content

Commit

Permalink
subscriber: add ability to record events to a file (console-rs#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar authored Aug 14, 2021
1 parent 778a8f1 commit 4fc72c0
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 2 deletions.
2 changes: 2 additions & 0 deletions console-subscriber/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ tracing = "0.1.26"
tracing-subscriber = { version = "0.2.17", default-features = false, features = ["fmt", "registry", "env-filter"] }
futures = { version = "0.3", default-features = false }
hdrhistogram = { version = "7.3.0", default-features = false, features = ["serialization"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"

[dev-dependencies]

Expand Down
17 changes: 15 additions & 2 deletions console-subscriber/src/aggregator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::WatchRequest;
use crate::{record::Recorder, WatchRequest};

use super::{Event, WakeOp, Watch, WatchKind};
use console_api as proto;
Expand Down Expand Up @@ -67,6 +67,9 @@ pub(crate) struct Aggregator {

/// A table that contains the span ID to pretty task ID mappings.
task_id_mappings: HashMap<span::Id, TaskId>,

/// A sink to record all events to a file.
recorder: Option<Recorder>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -152,6 +155,10 @@ impl Aggregator {
stats: TaskData::default(),
task_id_counter: 0,
task_id_mappings: HashMap::new(),
recorder: builder
.recording_path
.as_ref()
.map(|path| Recorder::new(path).expect("creating recorder")),
}
}

Expand Down Expand Up @@ -205,7 +212,13 @@ impl Aggregator {
// channel is almost full.
while let Some(event) = self.events.recv().now_or_never() {
match event {
Some(event) => self.update_state(event),
Some(event) => {
// always be recording...
if let Some(ref recorder) = self.recorder {
recorder.record(&event);
}
self.update_state(event)
}
// The channel closed, no more events will be emitted...time
// to stop aggregating.
None => {
Expand Down
18 changes: 18 additions & 0 deletions console-subscriber/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{Server, TasksLayer};
use std::{
net::{SocketAddr, ToSocketAddrs},
path::PathBuf,
time::Duration,
};

Expand All @@ -23,6 +24,9 @@ pub struct Builder {

/// The address on which to serve the RPC server.
pub(super) server_addr: SocketAddr,

/// If and where to save a recording of the events.
pub(super) recording_path: Option<PathBuf>,
}

impl Default for Builder {
Expand All @@ -33,6 +37,7 @@ impl Default for Builder {
publish_interval: TasksLayer::DEFAULT_PUBLISH_INTERVAL,
retention: TasksLayer::DEFAULT_RETENTION,
server_addr: SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT),
recording_path: None,
}
}
}
Expand Down Expand Up @@ -100,6 +105,14 @@ impl Builder {
}
}

/// Sets the path to record the events to the file system.
pub fn recording_path(self, path: impl Into<PathBuf>) -> Self {
Self {
recording_path: Some(path.into()),
..self
}
}

/// Completes the builder, returning a [`TasksLayer`] and [`Server`] task.
pub fn build(self) -> (TasksLayer, Server) {
TasksLayer::build(self)
Expand All @@ -112,6 +125,7 @@ impl Builder {
/// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) |
/// | `TOKIO_CONSOLE_BIND` | a HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` |
/// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) |
/// | `TOKIO_CONSOLE_RECORD_PATH` | The file path to save a recording | None |
pub fn with_default_env(mut self) -> Self {
if let Ok(retention) = std::env::var("TOKIO_CONSOLE_RETENTION_SECS") {
self.retention = Duration::from_secs(
Expand All @@ -137,6 +151,10 @@ impl Builder {
);
}

if let Ok(path) = std::env::var("TOKIO_CONSOLE_RECORD_PATH") {
self.recording_path = Some(path.into());
}

self
}
}
1 change: 1 addition & 0 deletions console-subscriber/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type ConsoleSubscriberLayer = Layered<TasksLayer, Layered<EnvFilter, Registry>>;
/// | `TOKIO_CONSOLE_RETENTION_SECS` | The number of seconds to accumulate completed tracing data | 3600s (1h) |
/// | `TOKIO_CONSOLE_BIND` | A HOST:PORT description, such as `localhost:1234` | `127.0.0.1:6669` |
/// | `TOKIO_CONSOLE_PUBLISH_INTERVAL_MS` | The number of milliseconds to wait between sending updates to the console | 1000ms (1s) |
/// | `TOKIO_CONSOLE_RECORD_PATH` | The file path to save a recording | None |
/// | `RUST_LOG` | Configure the tracing filter. See [`EnvFilter`] for further information | `tokio=trace` |
///
/// ## Further customization
Expand Down
6 changes: 6 additions & 0 deletions console-subscriber/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use console_api as proto;
use serde::Serialize;
use tokio::sync::{mpsc, oneshot};

use std::{
Expand All @@ -18,6 +19,7 @@ mod aggregator;
mod builder;
mod callsites;
mod init;
mod record;

use aggregator::Aggregator;
pub use builder::Builder;
Expand Down Expand Up @@ -91,6 +93,7 @@ enum Event {
},
}

#[derive(Clone, Copy, Serialize)]
enum WakeOp {
Wake,
WakeByRef,
Expand Down Expand Up @@ -122,6 +125,7 @@ impl TasksLayer {
?config.publish_interval,
?config.retention,
?config.server_addr,
?config.recording_path,
"configured console subscriber"
);

Expand All @@ -130,6 +134,7 @@ impl TasksLayer {

let aggregator = Aggregator::new(events, rpcs, &config);
let flush = aggregator.flush().clone();

let server = Server {
aggregator: Some(aggregator),
addr: config.server_addr,
Expand Down Expand Up @@ -174,6 +179,7 @@ impl TasksLayer {

fn send(&self, event: Event) {
use mpsc::error::TrySendError;

match self.tx.try_reserve() {
Ok(permit) => permit.send(event),
Err(TrySendError::Closed(_)) => tracing::warn!(
Expand Down
224 changes: 224 additions & 0 deletions console-subscriber/src/record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
use serde::{
ser::{SerializeSeq, SerializeStruct},
Serialize,
};
use std::{
fs::File,
io,
path::Path,
sync::{Arc, Mutex},
time::SystemTime,
};

use console_api as proto;

/// This marks the currently understood version of the recording format. This
/// should be increased whenever the format has a breaking change that we
/// cannot parse. Though, even better, we should probably support parsing
/// older versions.
///
/// But while this is in rapid development, we can move fast and break things.
const DATA_FORMAT_VERSION: u8 = 1;

pub(crate) struct Recorder {
buf: Arc<Mutex<RecordBuf>>,

worker: std::thread::JoinHandle<()>,
}

struct Io {
buf: Arc<Mutex<RecordBuf>>,
file: File,
}

struct RecordBuf {
/// The current buffer to serialize events into.
bytes: Vec<u8>,
/// The "next" buffer that should be used when the IO thread takes the
/// current buffer. After flushing, the IO thread will put the buffer
/// back in this slot, so the allocation can be reused.
next: Vec<u8>,
}

#[derive(Serialize)]
struct Header {
v: u8,
}

#[derive(Serialize)]
enum Event<'a> {
Spawn {
id: u64,
at: SystemTime,
fields: SerializeFields<'a>,
},
Enter {
id: u64,
at: SystemTime,
},
Exit {
id: u64,
at: SystemTime,
},
Close {
id: u64,
at: SystemTime,
},
Waker {
id: u64,
op: super::WakeOp,
at: SystemTime,
},
}

struct SerializeFields<'a>(&'a [proto::Field]);

struct SerializeField<'a>(&'a proto::Field);

impl Recorder {
pub(crate) fn new(path: &Path) -> io::Result<Self> {
let buf = Arc::new(Mutex::new(RecordBuf::new()));
let buf2 = buf.clone();
let file = std::fs::File::create(path)?;

let worker = std::thread::Builder::new()
.name("console/subscriber/recorder/io".into())
.spawn(move || {
record_io(Io { buf: buf2, file });
})?;

let recorder = Recorder { buf, worker };

recorder.write(&Header {
v: DATA_FORMAT_VERSION,
});

Ok(recorder)
}

pub(crate) fn record(&self, event: &crate::Event) {
let event = match event {
crate::Event::Spawn { id, at, fields, .. } => Event::Spawn {
id: id.into_u64(),
at: *at,
fields: SerializeFields(fields),
},
crate::Event::Enter { id, at } => Event::Enter {
id: id.into_u64(),
at: *at,
},
crate::Event::Exit { id, at } => Event::Exit {
id: id.into_u64(),
at: *at,
},
crate::Event::Close { id, at } => Event::Close {
id: id.into_u64(),
at: *at,
},
crate::Event::Waker { id, op, at } => Event::Waker {
id: id.into_u64(),
at: *at,
op: *op,
},
_ => return,
};

self.write(&event);
}

fn write<T: Serialize>(&self, val: &T) {
let mut buf = self.buf.lock().unwrap();
serde_json::to_writer(&mut buf.bytes, val).expect("json");
buf.bytes.push(b'\n');
drop(buf);
self.worker.thread().unpark();
}
}

impl RecordBuf {
fn new() -> Self {
Self {
bytes: Vec::new(),
next: Vec::new(),
}
}

/// Takes the existing bytes to be written, and resets self so that
/// it may continue to buffer events.
fn take(&mut self) -> Vec<u8> {
let next = std::mem::take(&mut self.next);
std::mem::replace(&mut self.bytes, next)
}

fn put(&mut self, mut next: Vec<u8>) {
debug_assert_eq!(self.next.capacity(), 0);
next.clear();
self.next = next;
}
}

fn record_io(mut dst: Io) {
use std::io::Write;

loop {
std::thread::park();

// Only lock the mutex to take the bytes out. The file write could
// take a relatively long time, and we don't want to be blocking
// the serialization end holding this lock.
let bytes = dst.buf.lock().unwrap().take();
match dst.file.write_all(&bytes) {
Ok(()) => {
dst.buf.lock().unwrap().put(bytes);
}
Err(_e) => {
// TODO: what to do if file error?
}
}
}
}

impl serde::Serialize for SerializeFields<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
for element in self.0 {
seq.serialize_element(&SerializeField(element))?;
}
seq.end()
}
}

impl serde::Serialize for SerializeField<'_> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let mut ser = serializer.serialize_struct("Field", 2)?;
ser.serialize_field(
"name",
match self.0.name.as_ref().expect("name") {
proto::field::Name::StrName(ref n) => n,
proto::field::Name::NameIdx(_idx) => todo!("metadata idx"),
},
)?;

match self.0.value.as_ref().expect("field value") {
proto::field::Value::DebugVal(v) | proto::field::Value::StrVal(v) => {
ser.serialize_field("value", v)?;
}
proto::field::Value::U64Val(v) => {
ser.serialize_field("value", v)?;
}
proto::field::Value::I64Val(v) => {
ser.serialize_field("value", v)?;
}
proto::field::Value::BoolVal(v) => {
ser.serialize_field("value", v)?;
}
}
ser.end()
}
}

0 comments on commit 4fc72c0

Please sign in to comment.