Skip to content

Commit

Permalink
[Trace SDK] Send resource once to processor and exporter, and not wit…
Browse files Browse the repository at this point in the history
…h every span (#1830)

Co-authored-by: Cijo Thomas <cijo.thomas@gmail.com>
Co-authored-by: Zhongyang Wu <zhongyang.wu@outlook.com>
  • Loading branch information
3 people authored May 30, 2024
1 parent 1beffd2 commit 84c23a3
Show file tree
Hide file tree
Showing 17 changed files with 122 additions and 54 deletions.
14 changes: 10 additions & 4 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,12 +307,18 @@ impl OtlpHttpClient {
fn build_trace_export_body(
&self,
spans: Vec<SpanData>,
resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
) -> opentelemetry::trace::TraceResult<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;

let req = ExportTraceServiceRequest {
resource_spans: spans.into_iter().map(Into::into).collect(),
use opentelemetry_proto::tonic::{
collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans,
};

let resource_spans = spans
.into_iter()
.map(|span| ResourceSpans::new(span, resource))
.collect::<Vec<_>>();

let req = ExportTraceServiceRequest { resource_spans };
match self.protocol {
#[cfg(feature = "http-json")]
Protocol::HttpJson => match serde_json::to_string_pretty(&req) {
Expand Down
6 changes: 5 additions & 1 deletion opentelemetry-otlp/src/exporter/http/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient {
Err(err) => return Box::pin(std::future::ready(Err(err))),
};

let (body, content_type) = match self.build_trace_export_body(batch) {
let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) {
Ok(body) => body,
Err(e) => return Box::pin(std::future::ready(Err(e))),
};
Expand Down Expand Up @@ -66,4 +66,8 @@ impl SpanExporter for OtlpHttpClient {
fn shutdown(&mut self) {
let _ = self.client.lock().map(|mut c| c.take());
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
2 changes: 1 addition & 1 deletion opentelemetry-otlp/src/exporter/tonic/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::BoxInterceptor;
pub(crate) struct TonicLogsClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics and traces.
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

Expand Down
21 changes: 18 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ use opentelemetry::trace::TraceError;
use opentelemetry_proto::tonic::collector::trace::v1::{
trace_service_client::TraceServiceClient, ExportTraceServiceRequest,
};
use opentelemetry_proto::tonic::trace::v1::ResourceSpans;
use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;

pub(crate) struct TonicTracesClient {
inner: Option<ClientInner>,
#[allow(dead_code)]
// <allow dead> would be removed once we support set_resource for metrics.
resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema,
}

struct ClientInner {
Expand Down Expand Up @@ -43,6 +47,7 @@ impl TonicTracesClient {
client,
interceptor,
}),
resource: Default::default(),
}
}
}
Expand All @@ -66,14 +71,20 @@ impl SpanExporter for TonicTracesClient {
}
};

// TODO: Avoid cloning here.
let resource_spans = {
batch
.into_iter()
.map(|log_data| ResourceSpans::new(log_data, &self.resource))
.collect()
};

Box::pin(async move {
client
.export(Request::from_parts(
metadata,
extensions,
ExportTraceServiceRequest {
resource_spans: batch.into_iter().map(Into::into).collect(),
},
ExportTraceServiceRequest { resource_spans },
))
.await
.map_err(crate::Error::from)?;
Expand All @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient {
fn shutdown(&mut self) {
let _ = self.inner.take();
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
4 changes: 4 additions & 0 deletions opentelemetry-otlp/src/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,4 +213,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter {
fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, ExportResult> {
self.0.export(batch)
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
self.0.set_resource(resource);
}
}
14 changes: 5 additions & 9 deletions opentelemetry-proto/src/transform/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pub mod tonic {
use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
use crate::transform::common::{
to_nanos,
tonic::{resource_attributes, Attributes},
tonic::{Attributes, ResourceAttributesWithSchema},
};
use opentelemetry::trace;
use opentelemetry::trace::{Link, SpanId, SpanKind};
Expand Down Expand Up @@ -45,19 +45,15 @@ pub mod tonic {
}
}

impl From<SpanData> for ResourceSpans {
fn from(source_span: SpanData) -> Self {
impl ResourceSpans {
pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self {
let span_kind: span::SpanKind = source_span.span_kind.into();
ResourceSpans {
resource: Some(Resource {
attributes: resource_attributes(&source_span.resource).0,
attributes: resource.attributes.0.clone(),
dropped_attributes_count: 0,
}),
schema_url: source_span
.resource
.schema_url()
.map(|url| url.to_string())
.unwrap_or_default(),
schema_url: resource.schema_url.clone().unwrap_or_default(),
scope_spans: vec![ScopeSpans {
schema_url: source_span
.instrumentation_lib
Expand Down
9 changes: 9 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@
asynchronously, it should clone the log data to ensure it can be safely processed without
lifetime issues.

- **Breaking** [#1830](https://github.com/open-telemetry/opentelemetry-rust/pull/1830/files) [Traces SDK] Improves
performance by sending Resource information to processors (and exporters) once, instead of sending with every log. If you are an author
of Processor, Exporter, the following are *BREAKING* changes.
- Implement `set_resource` method in your custom SpanProcessor, which invokes exporter's `set_resource`.
- Implement `set_resource` method in your custom SpanExporter. This method should save the resource object
in original or serialized format, to be merged with every span event during export.
- `SpanData` doesn't have the resource attributes. The `SpanExporter::export()` method needs to merge it
with the earlier preserved resource before export.

- **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans.

## v0.23.0
Expand Down
3 changes: 0 additions & 3 deletions opentelemetry-sdk/benches/batch_span_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ use opentelemetry_sdk::testing::trace::NoopSpanExporter;
use opentelemetry_sdk::trace::{
BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor,
};
use opentelemetry_sdk::Resource;
use std::borrow::Cow;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Runtime;
Expand All @@ -34,7 +32,6 @@ fn get_span_data() -> Vec<SpanData> {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: Cow::Owned(Resource::empty()),
instrumentation_lib: Default::default(),
})
.collect::<Vec<SpanData>>()
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-sdk/src/export/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub trait SpanExporter: Send + Sync + Debug {
fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> {
Box::pin(async { Ok(()) })
}

/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
}

/// `SpanData` contains all the information collected by a `Span` and can be used
Expand Down Expand Up @@ -92,8 +95,6 @@ pub struct SpanData {
pub links: crate::trace::SpanLinks,
/// Span status
pub status: Status,
/// Resource contains attributes representing an entity that produced this span.
pub resource: Cow<'static, Resource>,
/// Instrumentation library that produced this span
pub instrumentation_lib: crate::InstrumentationLibrary,
}
10 changes: 10 additions & 0 deletions opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::export::trace::{ExportResult, SpanData, SpanExporter};
use crate::resource::Resource;
use futures_util::future::BoxFuture;
use opentelemetry::trace::{TraceError, TraceResult};
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -51,6 +52,7 @@ use std::sync::{Arc, Mutex};
#[derive(Clone, Debug)]
pub struct InMemorySpanExporter {
spans: Arc<Mutex<Vec<SpanData>>>,
resource: Arc<Mutex<Resource>>,
}

impl Default for InMemorySpanExporter {
Expand Down Expand Up @@ -85,6 +87,7 @@ impl InMemorySpanExporterBuilder {
pub fn build(&self) -> InMemorySpanExporter {
InMemorySpanExporter {
spans: Arc::new(Mutex::new(Vec::new())),
resource: Arc::new(Mutex::new(Resource::default())),
}
}
}
Expand Down Expand Up @@ -142,4 +145,11 @@ impl SpanExporter for InMemorySpanExporter {
fn shutdown(&mut self) {
self.reset()
}

fn set_resource(&mut self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_guard| *res_guard = resource.clone())
.expect("Resource lock poisoned");
}
}
4 changes: 1 addition & 3 deletions opentelemetry-sdk/src/testing/trace/span_exporters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
trace::{ExportResult, SpanData, SpanExporter},
ExportError,
},
trace::{Config, SpanEvents, SpanLinks},
trace::{SpanEvents, SpanLinks},
InstrumentationLibrary,
};
use futures_util::future::BoxFuture;
Expand All @@ -14,7 +14,6 @@ use opentelemetry::trace::{
use std::fmt::{Display, Formatter};

pub fn new_test_export_span_data() -> SpanData {
let config = Config::default();
SpanData {
span_context: SpanContext::new(
TraceId::from_u128(1),
Expand All @@ -33,7 +32,6 @@ pub fn new_test_export_span_data() -> SpanData {
events: SpanEvents::default(),
links: SpanLinks::default(),
status: Status::Unset,
resource: config.resource,
instrumentation_lib: InstrumentationLibrary::default(),
}
}
Expand Down
13 changes: 9 additions & 4 deletions opentelemetry-sdk/src/trace/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,11 +218,16 @@ impl Builder {
}
}

// Create a new vector to hold the modified processors
let mut processors = self.processors;

// Set the resource for each processor
for p in &mut processors {
p.set_resource(config.resource.as_ref());
}

TracerProvider {
inner: Arc::new(TracerProviderInner {
processors: self.processors,
config,
}),
inner: Arc::new(TracerProviderInner { processors, config }),
}
}
}
Expand Down
9 changes: 1 addition & 8 deletions opentelemetry-sdk/src/trace/span.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
//! is possible to change its name, set its `Attributes`, and add `Links` and `Events`.
//! These cannot be changed after the `Span`'s end time has been set.
use crate::trace::SpanLimits;
use crate::Resource;
use opentelemetry::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status};
use opentelemetry::KeyValue;
use std::borrow::Cow;
Expand Down Expand Up @@ -77,11 +76,10 @@ impl Span {
/// overhead.
pub fn exported_data(&self) -> Option<crate::export::trace::SpanData> {
let (span_context, tracer) = (self.span_context.clone(), &self.tracer);
let resource = self.tracer.provider()?.config().resource.clone();

self.data
.as_ref()
.map(|data| build_export_data(data.clone(), span_context, resource, tracer))
.map(|data| build_export_data(data.clone(), span_context, tracer))
}
}

Expand Down Expand Up @@ -225,17 +223,14 @@ impl Span {
processor.on_end(build_export_data(
data,
self.span_context.clone(),
provider.config().resource.clone(),
&self.tracer,
));
}
processors => {
let config = provider.config();
for processor in processors {
processor.on_end(build_export_data(
data.clone(),
self.span_context.clone(),
config.resource.clone(),
&self.tracer,
));
}
Expand All @@ -254,7 +249,6 @@ impl Drop for Span {
fn build_export_data(
data: SpanData,
span_context: SpanContext,
resource: Cow<'static, Resource>,
tracer: &crate::trace::Tracer,
) -> crate::export::trace::SpanData {
crate::export::trace::SpanData {
Expand All @@ -269,7 +263,6 @@ fn build_export_data(
events: data.events,
links: data.links,
status: data.status,
resource,
instrumentation_lib: tracer.instrumentation_library().clone(),
}
}
Expand Down
Loading

0 comments on commit 84c23a3

Please sign in to comment.