diff --git a/Cargo.lock b/Cargo.lock index 722826e9c10..2becd60323c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5163,9 +5163,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.137" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61ea8d54c77f8315140a05f4c7237403bf38b72704d031543aa1d16abbf517d1" +checksum = "728eb6351430bccb993660dfffc5a72f91ccc1295abaa8ce19b27ebe4f75568b" dependencies = [ "serde_derive", ] @@ -5201,9 +5201,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.137" +version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f26faba0c3959972377d3b2d306ee9f71faee9714294e41bb777f83f88578be" +checksum = "81fa1584d3d1bcacd84c277a0dfe21f5b0f6accf4a23d04d4c6d61f1af522b4c" dependencies = [ "proc-macro2", "quote", diff --git a/core/o11y/src/lib.rs b/core/o11y/src/lib.rs index 201285388b6..bae48dcbe52 100644 --- a/core/o11y/src/lib.rs +++ b/core/o11y/src/lib.rs @@ -3,8 +3,6 @@ pub use {backtrace, tracing, tracing_appender, tracing_subscriber}; use clap::Parser; -use near_crypto::PublicKey; -use near_primitives::types::AccountId; use once_cell::sync::OnceCell; use opentelemetry::sdk::trace::{self, IdGenerator, Sampler, Tracer}; use opentelemetry::sdk::Resource; @@ -18,11 +16,9 @@ use tracing::subscriber::DefaultGuard; use tracing_appender::non_blocking::NonBlocking; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::filter::{Filtered, ParseError}; -use tracing_subscriber::fmt::format::{DefaultFields, Format}; use tracing_subscriber::layer::{Layered, SubscriberExt}; use tracing_subscriber::registry::LookupSpan; -use tracing_subscriber::reload::{Error, Handle}; -use tracing_subscriber::{reload, EnvFilter, Layer, Registry}; +use tracing_subscriber::{fmt, reload, EnvFilter, Layer, Registry}; /// Custom tracing subscriber implementation that produces IO traces. mod io_tracer; @@ -45,21 +41,23 @@ macro_rules! io_trace { ($($fields:tt)*) => {}; } -static LOG_LAYER_RELOAD_HANDLE: OnceCell> = OnceCell::new(); - -static OTLP_LAYER_RELOAD_HANDLE: OnceCell< - Handle< - LevelFilter, - Layered< - Filtered< - tracing_subscriber::fmt::Layer, - reload::Layer, - Registry, - >, - Registry, - >, +static LOG_LAYER_RELOAD_HANDLE: OnceCell> = OnceCell::new(); +static OTLP_LAYER_RELOAD_HANDLE: OnceCell>> = + OnceCell::new(); + +type LogLayer = Layered< + Filtered< + fmt::Layer, + reload::Layer, + Inner, >, -> = OnceCell::new(); + Inner, +>; + +type TracingLayer = Layered< + Filtered, reload::Layer, Inner>, + Inner, +>; // Records the level of opentelemetry tracing verbosity configured via command-line flags at the startup. static DEFAULT_OTLP_LEVEL: OnceCell = OnceCell::new(); @@ -92,7 +90,7 @@ pub struct DefaultSubscriberGuard { subscriber: Option, local_subscriber_guard: Option, #[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard. - writer_guard: Option, + writer_guard: tracing_appender::non_blocking::WorkerGuard, #[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard. io_trace_guard: Option, } @@ -177,60 +175,25 @@ fn is_terminal() -> bool { atty::is(atty::Stream::Stderr) } -/// Adds a logging layer which writes to stderr synchronously. -fn add_simple_log_layer( - filter: EnvFilter, - ansi: bool, - subscriber: S, -) -> Layered, EnvFilter, S>, S> -where - S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, -{ - let layer = tracing_subscriber::fmt::layer() - .with_ansi(ansi) - // Synthesizing ENTER and CLOSE events lets us log durations of spans to the log. - .with_span_events( - tracing_subscriber::fmt::format::FmtSpan::ENTER - | tracing_subscriber::fmt::format::FmtSpan::CLOSE, - ) - .with_filter(filter); - - let subscriber = subscriber.with(layer); - subscriber -} - -fn add_non_blocking_log_layer( +fn add_log_layer( filter: EnvFilter, writer: NonBlocking, ansi: bool, subscriber: S, -) -> ( - Layered< - Filtered< - tracing_subscriber::fmt::Layer, - reload::Layer, - S, - >, - S, - >, - Handle, -) +) -> (LogLayer, reload::Handle) where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, { let (filter, handle) = reload::Layer::::new(filter); - let layer = tracing_subscriber::fmt::layer() + let layer = fmt::layer() .with_ansi(ansi) // Synthesizing ENTER and CLOSE events lets us log durations of spans to the log. - .with_span_events( - tracing_subscriber::fmt::format::FmtSpan::ENTER - | tracing_subscriber::fmt::format::FmtSpan::CLOSE, - ) + .with_span_events(fmt::format::FmtSpan::ENTER | fmt::format::FmtSpan::CLOSE) .with_writer(writer) .with_filter(filter); - - (subscriber.with(layer), handle) + let subscriber = subscriber.with(layer); + (subscriber, handle) } /// Constructs an OpenTelemetryConfig which sends span data to an external collector. @@ -239,29 +202,14 @@ where // register timers and channels and whatnot. async fn add_opentelemetry_layer( opentelemetry_level: OpenTelemetryLevel, - chain_id: String, - node_public_key: PublicKey, - account_id: Option, subscriber: S, -) -> ( - Layered, reload::Layer, S>, S>, - Handle, -) +) -> (TracingLayer, reload::Handle) where S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync, { let filter = get_opentelemetry_filter(opentelemetry_level); let (filter, handle) = reload::Layer::::new(filter); - let mut resource = vec![ - KeyValue::new(SERVICE_NAME, "neard"), - KeyValue::new("chain_id", chain_id), - KeyValue::new("node_id", node_public_key.to_string()), - ]; - if let Some(account_id) = account_id { - resource.push(KeyValue::new("account_id", account_id.to_string())); - } - let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(opentelemetry_otlp::new_exporter().tonic()) @@ -269,12 +217,13 @@ where trace::config() .with_sampler(Sampler::AlwaysOn) .with_id_generator(IdGenerator::default()) - .with_resource(Resource::new(resource)), + .with_resource(Resource::new(vec![KeyValue::new(SERVICE_NAME, "neard")])), ) .install_batch(opentelemetry::runtime::Tokio) .unwrap(); let layer = tracing_opentelemetry::layer().with_tracer(tracer).with_filter(filter); - (subscriber.with(layer), handle) + let subscriber = subscriber.with(layer); + (subscriber, handle) } pub fn get_opentelemetry_filter(opentelemetry_level: OpenTelemetryLevel) -> LevelFilter { @@ -306,43 +255,10 @@ where (io_layer, guard) } -fn use_color_output(options: &Options) -> bool { - match options.color { - ColorOutput::Always => true, - ColorOutput::Never => false, - ColorOutput::Auto => std::env::var_os("NO_COLOR").is_none() && is_terminal(), - } -} - -/// Constructs a subscriber set to the option appropriate for the NEAR code. +/// Run the code with a default subscriber set to the option appropriate for the NEAR code. /// -/// Subscriber enables only logging. -/// -/// # Example -/// -/// ```rust -/// let _subscriber = near_o11y::default_subscriber(filter, &Default::default()).global(); -/// ``` -pub fn default_subscriber( - env_filter: EnvFilter, - options: &Options, -) -> DefaultSubscriberGuard LookupSpan<'span> + Send + Sync> { - let color_output = use_color_output(options); - - let subscriber = tracing_subscriber::registry(); - let subscriber = add_simple_log_layer(env_filter, color_output, subscriber); - - DefaultSubscriberGuard { - subscriber: Some(subscriber), - local_subscriber_guard: None, - writer_guard: None, - io_trace_guard: None, - } -} - -/// Constructs a subscriber set to the option appropriate for the NEAR code. -/// -/// The subscriber enables logging, tracing and io tracing. +/// This will override any subscribers set until now, and will be in effect until the value +/// returned by this function goes out of scope. /// Subscriber creation needs an async runtime. /// /// # Example @@ -354,40 +270,32 @@ pub fn default_subscriber( /// near_o11y::default_subscriber(filter, &Default::default()).await.global() /// }); /// ``` -pub async fn default_subscriber_with_opentelemetry( +pub async fn default_subscriber( env_filter: EnvFilter, options: &Options, - chain_id: String, - node_public_key: PublicKey, - account_id: Option, -) -> DefaultSubscriberGuard LookupSpan<'span> + Send + Sync> { - let color_output = use_color_output(options); - +) -> DefaultSubscriberGuard { // Do not lock the `stderr` here to allow for things like `dbg!()` work during development. let stderr = std::io::stderr(); let lined_stderr = std::io::LineWriter::new(stderr); let (writer, writer_guard) = tracing_appender::non_blocking(lined_stderr); - let subscriber = tracing_subscriber::registry(); + let ansi = match options.color { + ColorOutput::Always => true, + ColorOutput::Never => false, + ColorOutput::Auto => std::env::var_os("NO_COLOR").is_none() && is_terminal(), + }; - // Record the initial tracing level specified as a command-line flag. Use this recorded value to + let subscriber = tracing_subscriber::registry(); + // Record the initial OTLP level specified as a command-line flag. Use this recorded value to // reset opentelemetry filter when the LogConfig file gets deleted. DEFAULT_OTLP_LEVEL.set(options.opentelemetry).unwrap(); - let (subscriber, handle) = - add_non_blocking_log_layer(env_filter, writer, color_output, subscriber); + let (subscriber, handle) = add_log_layer(env_filter, writer, ansi, subscriber); LOG_LAYER_RELOAD_HANDLE .set(handle) .unwrap_or_else(|_| panic!("Failed to set Log Layer Filter")); - let (subscriber, handle) = add_opentelemetry_layer( - options.opentelemetry, - chain_id, - node_public_key, - account_id, - subscriber, - ) - .await; + let (subscriber, handle) = add_opentelemetry_layer(options.opentelemetry, subscriber).await; OTLP_LAYER_RELOAD_HANDLE .set(handle) .unwrap_or_else(|_| panic!("Failed to set OTLP Layer Filter")); @@ -407,7 +315,7 @@ pub async fn default_subscriber_with_opentelemetry( DefaultSubscriberGuard { subscriber: Some(subscriber), local_subscriber_guard: None, - writer_guard: Some(writer_guard), + writer_guard, io_trace_guard, } } @@ -415,56 +323,80 @@ pub async fn default_subscriber_with_opentelemetry( #[derive(thiserror::Error, Debug)] #[non_exhaustive] pub enum ReloadError { + #[error("env_filter reload handle is not available")] + NoLogReloadHandle, + #[error("opentelemetry reload handle is not available")] + NoOpentelemetryReloadHandle, #[error("could not set the new log filter")] - Reload(#[source] Error), + ReloadLogLayer(#[source] reload::Error), + #[error("could not set the new opentelemetry filter")] + ReloadOpentelemetryLayer(#[source] reload::Error), #[error("could not create the log filter")] Parse(#[source] BuildEnvFilterError), - #[error("env_filter reload handle is not available")] - NoReloadHandle, } /// Constructs new filters for the logging and opentelemetry layers. /// +/// Attempts to reload all available errors. Returns errors for each layer that failed to reload. +/// /// The newly constructed `EnvFilter` provides behavior equivalent to what can be obtained via /// setting `RUST_LOG` environment variable and the `--verbose` command-line flag. /// `rust_log` is equivalent to setting `RUST_LOG` environment variable. /// `verbose` indicates whether `--verbose` command-line flag is present. /// `verbose_module` is equivalent to the value of the `--verbose` command-line flag. -pub fn reload_layers( +pub fn reload( rust_log: Option<&str>, verbose_module: Option<&str>, opentelemetry_level: Option, -) -> Result<(), ReloadError> { - LOG_LAYER_RELOAD_HANDLE.get().map_or(Err(ReloadError::NoReloadHandle), |reload_handle| { - let mut builder = rust_log.map_or_else( - || EnvFilterBuilder::from_env(), - |rust_log| EnvFilterBuilder::new(rust_log), - ); - if let Some(module) = verbose_module { - builder = builder.verbose(Some(module)); - } - let env_filter = builder.finish().map_err(ReloadError::Parse)?; +) -> Result<(), Vec> { + let log_reload_result = LOG_LAYER_RELOAD_HANDLE.get().map_or( + Err(ReloadError::NoLogReloadHandle), + |reload_handle| { + let mut builder = rust_log.map_or_else( + || EnvFilterBuilder::from_env(), + |rust_log| EnvFilterBuilder::new(rust_log), + ); + if let Some(module) = verbose_module { + builder = builder.verbose(Some(module)); + } + let env_filter = builder.finish().map_err(ReloadError::Parse)?; - reload_handle - .modify(|log_filter| { - *log_filter = env_filter; - }) - .map_err(ReloadError::Reload)?; - Ok(()) - })?; + reload_handle + .modify(|log_filter| { + *log_filter = env_filter; + }) + .map_err(ReloadError::ReloadLogLayer)?; + Ok(()) + }, + ); let opentelemetry_level = opentelemetry_level .unwrap_or(*DEFAULT_OTLP_LEVEL.get().unwrap_or(&OpenTelemetryLevel::OFF)); - OTLP_LAYER_RELOAD_HANDLE.get().map_or(Err(ReloadError::NoReloadHandle), |reload_handle| { - reload_handle - .modify(|otlp_layer| { - *otlp_layer = get_opentelemetry_filter(opentelemetry_level); - }) - .map_err(ReloadError::Reload)?; - Ok(()) - })?; + let opentelemetry_reload_result = OTLP_LAYER_RELOAD_HANDLE.get().map_or( + Err(ReloadError::NoOpentelemetryReloadHandle), + |reload_handle| { + reload_handle + .modify(|otlp_filter| { + *otlp_filter = get_opentelemetry_filter(opentelemetry_level); + }) + .map_err(ReloadError::ReloadOpentelemetryLayer)?; + Ok(()) + }, + ); + + let mut errors: Vec = vec![]; + if let Err(err) = log_reload_result { + errors.push(err); + } + if let Err(err) = opentelemetry_reload_result { + errors.push(err); + } - Ok(()) + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } } #[non_exhaustive] diff --git a/neard/src/log_config_watcher.rs b/neard/src/log_config_watcher.rs index f90edb7e886..56a607f22f8 100644 --- a/neard/src/log_config_watcher.rs +++ b/neard/src/log_config_watcher.rs @@ -1,4 +1,4 @@ -use near_o11y::{reload_layers, OpenTelemetryLevel, ReloadError}; +use near_o11y::{reload, OpenTelemetryLevel, ReloadError}; use serde::{Deserialize, Serialize}; use std::io; use std::io::ErrorKind; @@ -30,7 +30,7 @@ pub(crate) enum UpdateBehavior { #[non_exhaustive] enum LogConfigError { #[error("Failed to reload the logging config")] - Reload(#[source] ReloadError), + Reload(Vec), #[error("Failed to reload the logging config")] Parse(#[source] serde_json::Error), #[error("Can't open or read the logging config file")] @@ -44,7 +44,7 @@ impl LogConfigWatcher { let log_config = serde_json::from_str::(&log_config_str) .map_err(LogConfigError::Parse)?; info!(target: "neard", log_config=?log_config, "Changing the logging config."); - return reload_layers( + return reload( log_config.rust_log.as_deref(), log_config.verbose_module.as_deref(), log_config.opentelemetry_level, @@ -55,7 +55,7 @@ impl LogConfigWatcher { ErrorKind::NotFound => { if let UpdateBehavior::UpdateOrReset = update_behavior { info!(target: "neard", logging_config_path=%self.watched_path.display(), ?err, "Reset the logging config because the logging config file doesn't exist."); - return reload_layers(None, None, None).map_err(LogConfigError::Reload); + return reload(None, None, None).map_err(LogConfigError::Reload); } Ok(()) }