Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(o11y): Configure opentelemetry filter at runtime #7701

Merged
merged 47 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
effb47b
Optimize two metrics which are incremented most often: near_chunk_cac…
nikurt Sep 20, 2022
03d8423
Dynamic configuration of opentelemetry level
nikurt Sep 21, 2022
e9cbe34
integration-tests: fix hashing of a list of elements (#7658)
mina86 Sep 21, 2022
9fe23fa
refactor: add EpochManagerAdapter infra (#7652)
matklad Sep 21, 2022
919ff26
refactor: move validator accessor to EpochManagerHandle (#7655)
matklad Sep 21, 2022
043cbb6
refactor: move signature verification functions to EpochManagerAdapte…
matklad Sep 21, 2022
af94c06
fix: disable life support for nearlib test (#7667)
matklad Sep 22, 2022
c9a919d
Optimize prometheus metrics which are incremented most often (#7646)
nikurt Sep 22, 2022
60a7e06
refactor: prettify chunk management test (#7659)
matklad Sep 22, 2022
3287c48
refactor: move verify_approvals_and_threshold_orphan to EMA (#7657)
matklad Sep 22, 2022
0a01cb1
refactor: move shard layout API to EpochManagerAdapter (#7660)
matklad Sep 22, 2022
beada8d
move block->epoch APIs to EpochManagerHandle (#7665)
matklad Sep 22, 2022
a38d4ba
refactor: expose get_epoch_id as a method of EpochManagerAdapter (#7…
matklad Sep 22, 2022
c7b809f
chore: upgrade Rust to 1.64.0 (#7669)
matklad Sep 22, 2022
b515676
Allowing configure more peer-related things in config (#7647)
mm-near Sep 22, 2022
7fe60db
fix: actually test dropping chunk forwarding (#7664)
matklad Sep 22, 2022
e8872ac
feat: enable receipt prefetching by default (#7661)
jakmeier Sep 23, 2022
bf59f61
feat: scaffold for dev guide (#7668)
matklad Sep 23, 2022
ae4bb2c
doc: book skeloton (#7675)
matklad Sep 23, 2022
ee801d7
doc: improve wording (#7678)
matklad Sep 23, 2022
88b8021
doc: move the content from docs to book (#7677)
matklad Sep 23, 2022
8f8579b
docs: move book to the docs folder (#7682)
matklad Sep 23, 2022
00a3f43
chore: bump crates version to 0.15.0 (#7680)
frol Sep 23, 2022
ac95593
refactor: more useful error message (#7684)
matklad Sep 23, 2022
201a24f
Move chunk completion out of ShardsManager. (#7622)
robin-near Sep 24, 2022
a2171e4
chore: Update a couple of more crates to 1.64 (#7688)
akhi3030 Sep 26, 2022
d5cd375
Upgrade tracing and opentelemetry crates.
nikurt Sep 26, 2022
d9e8127
Dynamic configuration of opentelemetry level
nikurt Sep 21, 2022
08ae5ef
Prototyping separate reloadable layers for filters of logs and traces
nikurt Sep 26, 2022
ce38f2d
Log and opentelemetry layers can reload their filters independently a…
nikurt Sep 26, 2022
3831ff6
Optimize prometheus metrics which are incremented most often (#7646)
nikurt Sep 22, 2022
724d8ea
feat: always display uncertain estimation warnings (#7690)
jakmeier Sep 26, 2022
c12b29a
chore: Use inheritance to reduce copy-paste in `Cargo.toml`s (#7687)
akhi3030 Sep 26, 2022
9dd83c7
Fix upgradable.py db migration flags too; like #7607 (#7694)
robin-near Sep 26, 2022
ca00ea1
Implement FlatStorageState (#7663)
mzhangmzz Sep 26, 2022
f14956a
Remove DBCol::ChunkPerHeightShard (#7671)
robin-near Sep 26, 2022
a17dfc8
fix: logo URL in README.md (#7696)
jakmeier Sep 27, 2022
6e92c62
Upgrade tracing and opentelemetry crates (#7691)
nikurt Sep 27, 2022
bcba369
chore: remove unused dependency (#7699)
matklad Sep 27, 2022
677f7a1
Upgrade tracing and opentelemetry crates.
nikurt Sep 26, 2022
1e20b6c
Merge
nikurt Sep 27, 2022
7cd1bbd
serde
nikurt Sep 27, 2022
3d72c51
Merge branch 'master' into nikurt-nodekey
nikurt Sep 27, 2022
4d227ef
Merge branch 'master' into nikurt-nodekey
nikurt Sep 27, 2022
dab9da9
Review comments
nikurt Sep 28, 2022
235366e
Merge branch 'master' into nikurt-nodekey
nikurt Sep 28, 2022
c8a97d7
Generic type alias
nikurt Sep 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/o11y/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ opentelemetry.workspace = true
opentelemetry-otlp.workspace = true
opentelemetry-semantic-conventions.workspace = true
prometheus.workspace = true
serde.workspace = true
strum.workspace = true
thiserror.workspace = true
tokio.workspace = true
Expand Down
184 changes: 126 additions & 58 deletions core/o11y/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ use opentelemetry::sdk::trace::{self, IdGenerator, Sampler, Tracer};
use opentelemetry::sdk::Resource;
use opentelemetry::KeyValue;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::path::PathBuf;
use tracing::level_filters::LevelFilter;
use tracing::subscriber::DefaultGuard;
use tracing_appender::non_blocking::NonBlocking;
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::filter::{Filtered, ParseError};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::layer::{Layered, SubscriberExt};
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::reload::{Error, Handle};
use tracing_subscriber::{EnvFilter, Layer, Registry};
use tracing_subscriber::{fmt, reload, EnvFilter, Layer, Registry};

/// Custom tracing subscriber implementation that produces IO traces.
mod io_tracer;
Expand All @@ -41,16 +41,21 @@ macro_rules! io_trace {
($($fields:tt)*) => {};
}

static LOG_LAYER_RELOAD_HANDLE: OnceCell<
Handle<
Filtered<
tracing_subscriber::fmt::Layer<Registry, DefaultFields, Format, NonBlocking>,
EnvFilter,
Registry,
>,
type SubscriberWithLogLayer = Layered<
Filtered<
fmt::Layer<Registry, fmt::format::DefaultFields, fmt::format::Format, NonBlocking>,
reload::Layer<EnvFilter, Registry>,
Registry,
>,
> = OnceCell::new();
Registry,
>;

static LOG_LAYER_RELOAD_HANDLE: OnceCell<reload::Handle<EnvFilter, Registry>> = OnceCell::new();
static OTLP_LAYER_RELOAD_HANDLE: OnceCell<reload::Handle<LevelFilter, SubscriberWithLogLayer>> =
OnceCell::new();

// Records the level of opentelemetry tracing verbosity configured via command-line flags at the startup.
static DEFAULT_OTLP_LEVEL: OnceCell<OpenTelemetryLevel> = OnceCell::new();

/// The default value for the `RUST_LOG` environment variable if one isn't specified otherwise.
pub const DEFAULT_RUST_LOG: &'static str = "tokio_reactor=info,\
Expand Down Expand Up @@ -78,15 +83,15 @@ pub struct DefaultSubscriberGuard<S> {
// other way around, the events/spans generated while the subscriber drop guard runs would be
// lost.
subscriber: Option<S>,
local_subscriber_guard: Option<tracing::subscriber::DefaultGuard>,
local_subscriber_guard: Option<DefaultGuard>,
#[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard.
writer_guard: tracing_appender::non_blocking::WorkerGuard,
#[allow(dead_code)] // This field is never read, but has semantic purpose as a drop guard.
io_trace_guard: Option<tracing_appender::non_blocking::WorkerGuard>,
}

// Doesn't define WARN and ERROR, because the highest verbosity of spans is INFO.
#[derive(Copy, Clone, Debug, clap::ArgEnum)]
#[derive(Copy, Clone, Debug, clap::ArgEnum, Serialize, Deserialize)]
pub enum OpenTelemetryLevel {
OFF,
INFO,
Expand Down Expand Up @@ -165,36 +170,54 @@ fn is_terminal() -> bool {
atty::is(atty::Stream::Stderr)
}

fn make_log_layer<S>(
fn add_log_layer<S>(
filter: EnvFilter,
writer: NonBlocking,
ansi: bool,
) -> Filtered<tracing_subscriber::fmt::Layer<S, DefaultFields, Format, NonBlocking>, EnvFilter, S>
subscriber: S,
) -> (
Layered<
Filtered<
fmt::Layer<S, fmt::format::DefaultFields, fmt::format::Format, NonBlocking>,
reload::Layer<EnvFilter, S>,
S,
>,
S,
>,
nikurt marked this conversation as resolved.
Show resolved Hide resolved
reload::Handle<EnvFilter, S>,
)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span>,
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let layer = tracing_subscriber::fmt::layer()
let (filter, handle) = reload::Layer::<EnvFilter, S>::new(filter);

let layer = fmt::layer()
.with_ansi(ansi)
// Synthesizing ENTER and CLOSE events lets us log durations of spans to the log.
.with_span_events(
tracing_subscriber::fmt::format::FmtSpan::ENTER
| tracing_subscriber::fmt::format::FmtSpan::CLOSE,
)
.with_span_events(fmt::format::FmtSpan::ENTER | fmt::format::FmtSpan::CLOSE)
.with_writer(writer)
.with_filter(filter);
layer
let subscriber = subscriber.with(layer);
(subscriber, handle)
}

/// Constructs an OpenTelemetryConfig which sends span data to an external collector.
//
// NB: this function is `async` because `install_batch(Tokio)` requires a tokio context to
// register timers and channels and whatnot.
async fn make_opentelemetry_layer<S>(
config: &Options,
) -> Filtered<OpenTelemetryLayer<S, Tracer>, LevelFilter, S>
async fn add_opentelemetry_layer<S>(
opentelemetry_level: OpenTelemetryLevel,
subscriber: S,
) -> (
Layered<Filtered<OpenTelemetryLayer<S, Tracer>, reload::Layer<LevelFilter, S>, S>, S>,
reload::Handle<LevelFilter, S>,
)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span>,
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
let filter = get_opentelemetry_filter(opentelemetry_level);
let (filter, handle) = reload::Layer::<LevelFilter, S>::new(filter);

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().tonic())
Expand All @@ -206,13 +229,13 @@ where
)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap();
let filter = get_opentelemetry_filter(config);
let layer = tracing_opentelemetry::layer().with_tracer(tracer).with_filter(filter);
layer
let subscriber = subscriber.with(layer);
(subscriber, handle)
}

fn get_opentelemetry_filter(config: &Options) -> LevelFilter {
match config.opentelemetry {
pub fn get_opentelemetry_filter(opentelemetry_level: OpenTelemetryLevel) -> LevelFilter {
match opentelemetry_level {
OpenTelemetryLevel::OFF => LevelFilter::OFF,
OpenTelemetryLevel::INFO => LevelFilter::INFO,
OpenTelemetryLevel::DEBUG => LevelFilter::DEBUG,
Expand Down Expand Up @@ -270,13 +293,20 @@ pub async fn default_subscriber(
ColorOutput::Auto => std::env::var_os("NO_COLOR").is_none() && is_terminal(),
};

let log_layer = make_log_layer(env_filter, writer, ansi);
let (log_layer, handle) = tracing_subscriber::reload::Layer::new(log_layer);
LOG_LAYER_RELOAD_HANDLE.set(handle).unwrap();

let subscriber = tracing_subscriber::registry();
let subscriber = subscriber.with(log_layer);
let subscriber = subscriber.with(make_opentelemetry_layer(options).await);
// Record the initial OTLP level specified as a command-line flag. Use this recorded value to
// reset opentelemetry filter when the LogConfig file gets deleted.
DEFAULT_OTLP_LEVEL.set(options.opentelemetry).unwrap();

let (subscriber, handle) = add_log_layer(env_filter, writer, ansi, subscriber);
LOG_LAYER_RELOAD_HANDLE
.set(handle)
.unwrap_or_else(|_| panic!("Failed to set Log Layer Filter"));

let (subscriber, handle) = add_opentelemetry_layer(options.opentelemetry, subscriber).await;
OTLP_LAYER_RELOAD_HANDLE
.set(handle)
.unwrap_or_else(|_| panic!("Failed to set OTLP Layer Filter"));

#[allow(unused_mut)]
let mut io_trace_guard = None;
Expand All @@ -301,42 +331,80 @@ pub async fn default_subscriber(
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum ReloadError {
#[error("env_filter reload handle is not available")]
NoLogReloadHandle,
#[error("opentelemetry reload handle is not available")]
NoOpentelemetryReloadHandle,
#[error("could not set the new log filter")]
Reload(#[source] Error),
ReloadLogLayer(#[source] reload::Error),
#[error("could not set the new opentelemetry filter")]
ReloadOpentelemetryLayer(#[source] reload::Error),
#[error("could not create the log filter")]
Parse(#[source] BuildEnvFilterError),
#[error("env_filter reload handle is not available")]
NoReloadHandle,
}

/// Constructs an `EnvFilter` and sets it as the active filter in the default tracing subscriber.
/// Constructs new filters for the logging and opentelemetry layers.
///
/// Attempts to reload all available errors. Returns errors for each layer that failed to reload.
///
/// The newly constructed `EnvFilter` provides behavior equivalent to what can be obtained via
/// setting `RUST_LOG` environment variable and the `--verbose` command-line flag.
/// `rust_log` is equivalent to setting `RUST_LOG` environment variable.
/// `verbose` indicates whether `--verbose` command-line flag is present.
/// `verbose_module` is equivalent to the value of the `--verbose` command-line flag.
pub fn reload_log_layer(
pub fn reload(
rust_log: Option<&str>,
verbose_module: Option<&str>,
) -> Result<(), ReloadError> {
LOG_LAYER_RELOAD_HANDLE.get().map_or(Err(ReloadError::NoReloadHandle), |reload_handle| {
let mut builder = rust_log.map_or_else(
|| EnvFilterBuilder::from_env(),
|rust_log| EnvFilterBuilder::new(rust_log),
);
if let Some(module) = verbose_module {
builder = builder.verbose(Some(module));
}
let env_filter = builder.finish().map_err(ReloadError::Parse)?;
opentelemetry_level: Option<OpenTelemetryLevel>,
) -> Result<(), Vec<ReloadError>> {
let log_reload_result = LOG_LAYER_RELOAD_HANDLE.get().map_or(
Err(ReloadError::NoLogReloadHandle),
|reload_handle| {
let mut builder = rust_log.map_or_else(
|| EnvFilterBuilder::from_env(),
|rust_log| EnvFilterBuilder::new(rust_log),
);
if let Some(module) = verbose_module {
builder = builder.verbose(Some(module));
}
let env_filter = builder.finish().map_err(ReloadError::Parse)?;

reload_handle
.modify(|log_filter| {
*log_filter = env_filter;
})
.map_err(ReloadError::ReloadLogLayer)?;
Ok(())
},
);

let opentelemetry_level = opentelemetry_level
.unwrap_or(*DEFAULT_OTLP_LEVEL.get().unwrap_or(&OpenTelemetryLevel::OFF));
let opentelemetry_reload_result = OTLP_LAYER_RELOAD_HANDLE.get().map_or(
Err(ReloadError::NoOpentelemetryReloadHandle),
|reload_handle| {
reload_handle
.modify(|otlp_filter| {
*otlp_filter = get_opentelemetry_filter(opentelemetry_level);
})
.map_err(ReloadError::ReloadOpentelemetryLayer)?;
Ok(())
},
);

let mut errors: Vec<ReloadError> = vec![];
if let Err(err) = log_reload_result {
errors.push(err);
}
if let Err(err) = opentelemetry_reload_result {
errors.push(err);
}

reload_handle
.modify(|log_layer| {
*log_layer.filter_mut() = env_filter;
})
.map_err(ReloadError::Reload)?;
if errors.is_empty() {
Ok(())
})
} else {
Err(errors)
}
}

#[non_exhaustive]
Expand Down
11 changes: 7 additions & 4 deletions neard/src/log_config_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use near_o11y::{reload_log_layer, ReloadError};
use near_o11y::{reload, OpenTelemetryLevel, ReloadError};
use serde::{Deserialize, Serialize};
use std::io;
use std::io::ErrorKind;
Expand All @@ -13,6 +13,8 @@ struct LogConfig {
/// Some("") enables global debug logging.
/// Some("module") enables debug logging for "module".
pub verbose_module: Option<String>,
/// Verbosity level of collected traces.
pub opentelemetry_level: Option<OpenTelemetryLevel>,
}

pub(crate) struct LogConfigWatcher {
Expand All @@ -28,7 +30,7 @@ pub(crate) enum UpdateBehavior {
#[non_exhaustive]
enum LogConfigError {
#[error("Failed to reload the logging config")]
Reload(#[source] ReloadError),
Reload(Vec<ReloadError>),
#[error("Failed to reload the logging config")]
Parse(#[source] serde_json::Error),
#[error("Can't open or read the logging config file")]
Expand All @@ -42,17 +44,18 @@ impl LogConfigWatcher {
let log_config = serde_json::from_str::<LogConfig>(&log_config_str)
.map_err(LogConfigError::Parse)?;
info!(target: "neard", log_config=?log_config, "Changing the logging config.");
return reload_log_layer(
return reload(
log_config.rust_log.as_deref(),
log_config.verbose_module.as_deref(),
log_config.opentelemetry_level,
)
.map_err(LogConfigError::Reload);
}
Err(err) => match err.kind() {
ErrorKind::NotFound => {
if let UpdateBehavior::UpdateOrReset = update_behavior {
info!(target: "neard", logging_config_path=%self.watched_path.display(), ?err, "Reset the logging config because the logging config file doesn't exist.");
return reload_log_layer(None, None).map_err(LogConfigError::Reload);
return reload(None, None, None).map_err(LogConfigError::Reload);
}
Ok(())
}
Expand Down