Skip to content

Commit

Permalink
feat(vlog): Implement otlp guard with force flush on drop (#2536)
Browse files Browse the repository at this point in the history
## What ❔

Improves the `ObservabilityGuard` so that it flushes both sentry & otlp
events on drop.

## Why ❔

Without it, some events (at least for otlp) may be missed if the
application exits right after events were produced.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
popzxc authored Jul 30, 2024
1 parent 63c92b6 commit c9f76e5
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 7 deletions.
63 changes: 59 additions & 4 deletions core/lib/vlog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! This crate contains the observability subsystem.
//! It is responsible for providing a centralized interface for consistent observability configuration.

use std::time::Duration;

use ::sentry::ClientInitGuard;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

Expand All @@ -23,7 +25,53 @@ pub struct ObservabilityBuilder {
/// Guard for the observability subsystem.
/// Releases configured integrations upon being dropped.
pub struct ObservabilityGuard {
_sentry_guard: Option<ClientInitGuard>,
/// Opentelemetry provider. Can be used to force flush spans.
otlp_provider: Option<opentelemetry_sdk::trace::TracerProvider>,
sentry_guard: Option<ClientInitGuard>,
}

impl ObservabilityGuard {
/// Forces flushing of pending events.
/// This method is blocking.
pub fn force_flush(&self) {
// We don't want to wait for too long.
const FLUSH_TIMEOUT: Duration = Duration::from_secs(1);

if let Some(sentry_guard) = &self.sentry_guard {
sentry_guard.flush(Some(FLUSH_TIMEOUT));
}

if let Some(provider) = &self.otlp_provider {
for result in provider.force_flush() {
if let Err(err) = result {
tracing::warn!("Flushing the spans failed: {err:?}");
}
}
}
}

/// Shutdown the observability subsystem.
/// It will stop the background tasks like collec
pub fn shutdown(&self) {
// We don't want to wait for too long.
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(1);

if let Some(sentry_guard) = &self.sentry_guard {
sentry_guard.close(Some(SHUTDOWN_TIMEOUT));
}
if let Some(provider) = &self.otlp_provider {
if let Err(err) = provider.shutdown() {
tracing::warn!("Shutting down the provider failed: {err:?}");
}
}
}
}

impl Drop for ObservabilityGuard {
fn drop(&mut self) {
self.force_flush();
self.shutdown();
}
}

impl std::fmt::Debug for ObservabilityGuard {
Expand Down Expand Up @@ -62,16 +110,23 @@ impl ObservabilityBuilder {
// Later we may want to enforce each layer to have its own filter.
let global_filter = logs.build_filter();

let logs_layer = logs.into_layer();
let (otlp_provider, otlp_layer) = self
.opentelemetry_layer
.map(|layer| layer.into_layer())
.unzip();

tracing_subscriber::registry()
.with(global_filter)
.with(logs.into_layer())
.with(self.opentelemetry_layer.map(|layer| layer.into_layer()))
.with(logs_layer)
.with(otlp_layer)
.init();

let sentry_guard = self.sentry.map(|sentry| sentry.install());

ObservabilityGuard {
_sentry_guard: sentry_guard,
otlp_provider,
sentry_guard,
}
}
}
8 changes: 5 additions & 3 deletions core/lib/vlog/src/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl OpenTelemetry {
self
}

pub(super) fn into_layer<S>(self) -> impl Layer<S>
pub(super) fn into_layer<S>(self) -> (opentelemetry_sdk::trace::TracerProvider, impl Layer<S>)
where
S: tracing::Subscriber + for<'span> LookupSpan<'span> + Send + Sync,
{
Expand Down Expand Up @@ -151,9 +151,11 @@ impl OpenTelemetry {
let tracer = provider.tracer_builder(service_name).build();

opentelemetry::global::set_text_map_propagator(TraceContextPropagator::new());
tracing_opentelemetry::layer()
let layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(filter)
.with_filter(filter);

(provider, layer)
}
}

Expand Down

0 comments on commit c9f76e5

Please sign in to comment.