diff --git a/Cargo.lock b/Cargo.lock index 78dda3bb1..0b1341c7d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -329,6 +329,7 @@ dependencies = [ "hdrhistogram", "humantime", "parking_lot", + "prost", "prost-types", "serde", "serde_json", diff --git a/console-subscriber/Cargo.toml b/console-subscriber/Cargo.toml index 113a4ceb3..7bdf8c8ef 100644 --- a/console-subscriber/Cargo.toml +++ b/console-subscriber/Cargo.toml @@ -46,6 +46,7 @@ hdrhistogram = { version = "7.3.0", default-features = false, features = ["seria # feature to also enable `tracing-subscriber`'s parking_lot feature flag. parking_lot_crate = { package = "parking_lot", version = "0.12", optional = true } humantime = "2.1.0" +prost = "0.12" prost-types = "0.12.0" # Required for recording: diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index e3a1eb62a..b6846d62e 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -7,6 +7,7 @@ use std::{ }; use console_api as proto; +use prost::Message; use proto::resources::resource; use tokio::sync::{mpsc, Notify}; use tracing_core::{span::Id, Metadata}; @@ -22,6 +23,9 @@ mod shrink; use self::id_data::{IdData, Include}; use self::shrink::{ShrinkMap, ShrinkVec}; +/// Should match tonic's (private) codec::DEFAULT_MAX_RECV_MESSAGE_SIZE +const MAX_MESSAGE_SIZE: usize = 4 * 1024 * 1024; + /// Aggregates instrumentation traces and prepares state for the instrument /// server. /// @@ -278,26 +282,57 @@ impl Aggregator { /// Add the task subscription to the watchers after sending the first update fn add_instrument_subscription(&mut self, subscription: Watch) { tracing::debug!("new instrument subscription"); - - let task_update = Some(self.task_update(Include::All)); - let resource_update = Some(self.resource_update(Include::All)); - let async_op_update = Some(self.async_op_update(Include::All)); let now = Instant::now(); - let update = &proto::instrument::Update { - task_update, - resource_update, - async_op_update, - now: Some(self.base_time.to_timestamp(now)), - new_metadata: Some(proto::RegisterMetadata { - metadata: (*self.all_metadata).clone(), - }), + let update = loop { + let update = proto::instrument::Update { + task_update: Some(self.task_update(Include::All)), + resource_update: Some(self.resource_update(Include::All)), + async_op_update: Some(self.async_op_update(Include::All)), + now: Some(self.base_time.to_timestamp(now)), + new_metadata: Some(proto::RegisterMetadata { + metadata: (*self.all_metadata).clone(), + }), + }; + let message_size = update.encoded_len(); + if message_size < MAX_MESSAGE_SIZE { + // normal case + break Some(update); + } + // If the grpc message is bigger than tokio-console will accept, throw away the oldest + // inactive data and try again + self.retention /= 2; + self.cleanup_closed(); + tracing::debug!( + retention = ?self.retention, + message_size, + max_message_size = MAX_MESSAGE_SIZE, + "Message too big, reduced retention", + ); + + if self.retention <= self.publish_interval { + self.retention = self.publish_interval; + break None; + } }; - // Send the initial state --- if this fails, the subscription is already dead - if subscription.update(update) { - self.watchers.push(subscription) + match update { + // Send the initial state + Some(update) => { + if !subscription.update(&update) { + // If sending the initial update fails, the subscription is already dead, + // so don't add it to `watchers`. + return; + } + } + // User will only get updates. + None => tracing::error!( + min_retention = ?self.publish_interval, + "Message too big. Start with smaller retention.", + ), } + + self.watchers.push(subscription); } fn task_update(&mut self, include: Include) -> proto::tasks::TaskUpdate {