diff --git a/opentelemetry-otlp/src/exporter/http/mod.rs b/opentelemetry-otlp/src/exporter/http/mod.rs index a5ced848d4..4a45379121 100644 --- a/opentelemetry-otlp/src/exporter/http/mod.rs +++ b/opentelemetry-otlp/src/exporter/http/mod.rs @@ -307,12 +307,18 @@ impl OtlpHttpClient { fn build_trace_export_body( &self, spans: Vec, + resource: &opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, ) -> opentelemetry::trace::TraceResult<(Vec, &'static str)> { - use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; - - let req = ExportTraceServiceRequest { - resource_spans: spans.into_iter().map(Into::into).collect(), + use opentelemetry_proto::tonic::{ + collector::trace::v1::ExportTraceServiceRequest, trace::v1::ResourceSpans, }; + + let resource_spans = spans + .into_iter() + .map(|span| ResourceSpans::new(span, resource)) + .collect::>(); + + let req = ExportTraceServiceRequest { resource_spans }; match self.protocol { #[cfg(feature = "http-json")] Protocol::HttpJson => match serde_json::to_string_pretty(&req) { diff --git a/opentelemetry-otlp/src/exporter/http/trace.rs b/opentelemetry-otlp/src/exporter/http/trace.rs index 8e272c93cf..8d6c3116cd 100644 --- a/opentelemetry-otlp/src/exporter/http/trace.rs +++ b/opentelemetry-otlp/src/exporter/http/trace.rs @@ -21,7 +21,7 @@ impl SpanExporter for OtlpHttpClient { Err(err) => return Box::pin(std::future::ready(Err(err))), }; - let (body, content_type) = match self.build_trace_export_body(batch) { + let (body, content_type) = match self.build_trace_export_body(batch, &self.resource) { Ok(body) => body, Err(e) => return Box::pin(std::future::ready(Err(e))), }; @@ -66,4 +66,8 @@ impl SpanExporter for OtlpHttpClient { fn shutdown(&mut self) { let _ = self.client.lock().map(|mut c| c.take()); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/exporter/tonic/logs.rs b/opentelemetry-otlp/src/exporter/tonic/logs.rs index 8a6637a5b0..6cefd611ff 100644 --- a/opentelemetry-otlp/src/exporter/tonic/logs.rs +++ b/opentelemetry-otlp/src/exporter/tonic/logs.rs @@ -12,7 +12,7 @@ use super::BoxInterceptor; pub(crate) struct TonicLogsClient { inner: Option, #[allow(dead_code)] - // would be removed once we support set_resource for metrics and traces. + // would be removed once we support set_resource for metrics. resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } diff --git a/opentelemetry-otlp/src/exporter/tonic/trace.rs b/opentelemetry-otlp/src/exporter/tonic/trace.rs index b328dfba5f..a0dbe0e76b 100644 --- a/opentelemetry-otlp/src/exporter/tonic/trace.rs +++ b/opentelemetry-otlp/src/exporter/tonic/trace.rs @@ -5,6 +5,7 @@ use opentelemetry::trace::TraceError; use opentelemetry_proto::tonic::collector::trace::v1::{ trace_service_client::TraceServiceClient, ExportTraceServiceRequest, }; +use opentelemetry_proto::tonic::trace::v1::ResourceSpans; use opentelemetry_sdk::export::trace::{ExportResult, SpanData, SpanExporter}; use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request}; @@ -12,6 +13,9 @@ use super::BoxInterceptor; pub(crate) struct TonicTracesClient { inner: Option, + #[allow(dead_code)] + // would be removed once we support set_resource for metrics. + resource: opentelemetry_proto::transform::common::tonic::ResourceAttributesWithSchema, } struct ClientInner { @@ -43,6 +47,7 @@ impl TonicTracesClient { client, interceptor, }), + resource: Default::default(), } } } @@ -66,14 +71,20 @@ impl SpanExporter for TonicTracesClient { } }; + // TODO: Avoid cloning here. + let resource_spans = { + batch + .into_iter() + .map(|log_data| ResourceSpans::new(log_data, &self.resource)) + .collect() + }; + Box::pin(async move { client .export(Request::from_parts( metadata, extensions, - ExportTraceServiceRequest { - resource_spans: batch.into_iter().map(Into::into).collect(), - }, + ExportTraceServiceRequest { resource_spans }, )) .await .map_err(crate::Error::from)?; @@ -85,4 +96,8 @@ impl SpanExporter for TonicTracesClient { fn shutdown(&mut self) { let _ = self.inner.take(); } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.resource = resource.into(); + } } diff --git a/opentelemetry-otlp/src/span.rs b/opentelemetry-otlp/src/span.rs index ba70f0825e..6e61cfd1a2 100644 --- a/opentelemetry-otlp/src/span.rs +++ b/opentelemetry-otlp/src/span.rs @@ -213,4 +213,8 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { self.0.export(batch) } + + fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) { + self.0.set_resource(resource); + } } diff --git a/opentelemetry-proto/src/transform/trace.rs b/opentelemetry-proto/src/transform/trace.rs index 77f4f18de2..3f0003d44e 100644 --- a/opentelemetry-proto/src/transform/trace.rs +++ b/opentelemetry-proto/src/transform/trace.rs @@ -4,7 +4,7 @@ pub mod tonic { use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status}; use crate::transform::common::{ to_nanos, - tonic::{resource_attributes, Attributes}, + tonic::{Attributes, ResourceAttributesWithSchema}, }; use opentelemetry::trace; use opentelemetry::trace::{Link, SpanId, SpanKind}; @@ -45,19 +45,15 @@ pub mod tonic { } } - impl From for ResourceSpans { - fn from(source_span: SpanData) -> Self { + impl ResourceSpans { + pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self { let span_kind: span::SpanKind = source_span.span_kind.into(); ResourceSpans { resource: Some(Resource { - attributes: resource_attributes(&source_span.resource).0, + attributes: resource.attributes.0.clone(), dropped_attributes_count: 0, }), - schema_url: source_span - .resource - .schema_url() - .map(|url| url.to_string()) - .unwrap_or_default(), + schema_url: resource.schema_url.clone().unwrap_or_default(), scope_spans: vec![ScopeSpans { schema_url: source_span .instrumentation_lib diff --git a/opentelemetry-sdk/CHANGELOG.md b/opentelemetry-sdk/CHANGELOG.md index ff0fa8cdb8..ccd7fd1e36 100644 --- a/opentelemetry-sdk/CHANGELOG.md +++ b/opentelemetry-sdk/CHANGELOG.md @@ -28,6 +28,15 @@ asynchronously, it should clone the log data to ensure it can be safely processed without lifetime issues. +- **Breaking** [#1830](https://github.com/open-telemetry/opentelemetry-rust/pull/1830/files) [Traces SDK] Improves + performance by sending Resource information to processors (and exporters) once, instead of sending with every log. If you are an author + of Processor, Exporter, the following are *BREAKING* changes. + - Implement `set_resource` method in your custom SpanProcessor, which invokes exporter's `set_resource`. + - Implement `set_resource` method in your custom SpanExporter. This method should save the resource object + in original or serialized format, to be merged with every span event during export. + - `SpanData` doesn't have the resource attributes. The `SpanExporter::export()` method needs to merge it + with the earlier preserved resource before export. + - **Breaking** [1836](https://github.com/open-telemetry/opentelemetry-rust/pull/1836) `SpanProcessor::shutdown` now takes an immutable reference to self. Any reference can call shutdown on the processor. After the first call to `shutdown` the processor will not process any new spans. ## v0.23.0 diff --git a/opentelemetry-sdk/benches/batch_span_processor.rs b/opentelemetry-sdk/benches/batch_span_processor.rs index 7b6c096f4e..abc7d0df02 100644 --- a/opentelemetry-sdk/benches/batch_span_processor.rs +++ b/opentelemetry-sdk/benches/batch_span_processor.rs @@ -8,8 +8,6 @@ use opentelemetry_sdk::testing::trace::NoopSpanExporter; use opentelemetry_sdk::trace::{ BatchConfigBuilder, BatchSpanProcessor, SpanEvents, SpanLinks, SpanProcessor, }; -use opentelemetry_sdk::Resource; -use std::borrow::Cow; use std::sync::Arc; use std::time::SystemTime; use tokio::runtime::Runtime; @@ -34,7 +32,6 @@ fn get_span_data() -> Vec { events: SpanEvents::default(), links: SpanLinks::default(), status: Status::Unset, - resource: Cow::Owned(Resource::empty()), instrumentation_lib: Default::default(), }) .collect::>() diff --git a/opentelemetry-sdk/src/export/trace.rs b/opentelemetry-sdk/src/export/trace.rs index b3d99c9a13..4b43e00c36 100644 --- a/opentelemetry-sdk/src/export/trace.rs +++ b/opentelemetry-sdk/src/export/trace.rs @@ -63,6 +63,9 @@ pub trait SpanExporter: Send + Sync + Debug { fn force_flush(&mut self) -> BoxFuture<'static, ExportResult> { Box::pin(async { Ok(()) }) } + + /// Set the resource for the exporter. + fn set_resource(&mut self, _resource: &Resource) {} } /// `SpanData` contains all the information collected by a `Span` and can be used @@ -92,8 +95,6 @@ pub struct SpanData { pub links: crate::trace::SpanLinks, /// Span status pub status: Status, - /// Resource contains attributes representing an entity that produced this span. - pub resource: Cow<'static, Resource>, /// Instrumentation library that produced this span pub instrumentation_lib: crate::InstrumentationLibrary, } diff --git a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs index d10a4c1922..5853558436 100644 --- a/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs +++ b/opentelemetry-sdk/src/testing/trace/in_memory_exporter.rs @@ -1,4 +1,5 @@ use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::resource::Resource; use futures_util::future::BoxFuture; use opentelemetry::trace::{TraceError, TraceResult}; use std::sync::{Arc, Mutex}; @@ -51,6 +52,7 @@ use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] pub struct InMemorySpanExporter { spans: Arc>>, + resource: Arc>, } impl Default for InMemorySpanExporter { @@ -85,6 +87,7 @@ impl InMemorySpanExporterBuilder { pub fn build(&self) -> InMemorySpanExporter { InMemorySpanExporter { spans: Arc::new(Mutex::new(Vec::new())), + resource: Arc::new(Mutex::new(Resource::default())), } } } @@ -142,4 +145,11 @@ impl SpanExporter for InMemorySpanExporter { fn shutdown(&mut self) { self.reset() } + + fn set_resource(&mut self, resource: &Resource) { + self.resource + .lock() + .map(|mut res_guard| *res_guard = resource.clone()) + .expect("Resource lock poisoned"); + } } diff --git a/opentelemetry-sdk/src/testing/trace/span_exporters.rs b/opentelemetry-sdk/src/testing/trace/span_exporters.rs index 48ea2c6b43..c92a64f399 100644 --- a/opentelemetry-sdk/src/testing/trace/span_exporters.rs +++ b/opentelemetry-sdk/src/testing/trace/span_exporters.rs @@ -3,7 +3,7 @@ use crate::{ trace::{ExportResult, SpanData, SpanExporter}, ExportError, }, - trace::{Config, SpanEvents, SpanLinks}, + trace::{SpanEvents, SpanLinks}, InstrumentationLibrary, }; use futures_util::future::BoxFuture; @@ -14,7 +14,6 @@ use opentelemetry::trace::{ use std::fmt::{Display, Formatter}; pub fn new_test_export_span_data() -> SpanData { - let config = Config::default(); SpanData { span_context: SpanContext::new( TraceId::from_u128(1), @@ -33,7 +32,6 @@ pub fn new_test_export_span_data() -> SpanData { events: SpanEvents::default(), links: SpanLinks::default(), status: Status::Unset, - resource: config.resource, instrumentation_lib: InstrumentationLibrary::default(), } } diff --git a/opentelemetry-sdk/src/trace/provider.rs b/opentelemetry-sdk/src/trace/provider.rs index 1c6342d3b9..560d37dae9 100644 --- a/opentelemetry-sdk/src/trace/provider.rs +++ b/opentelemetry-sdk/src/trace/provider.rs @@ -218,11 +218,16 @@ impl Builder { } } + // Create a new vector to hold the modified processors + let mut processors = self.processors; + + // Set the resource for each processor + for p in &mut processors { + p.set_resource(config.resource.as_ref()); + } + TracerProvider { - inner: Arc::new(TracerProviderInner { - processors: self.processors, - config, - }), + inner: Arc::new(TracerProviderInner { processors, config }), } } } diff --git a/opentelemetry-sdk/src/trace/span.rs b/opentelemetry-sdk/src/trace/span.rs index 2ff3079cda..d672348885 100644 --- a/opentelemetry-sdk/src/trace/span.rs +++ b/opentelemetry-sdk/src/trace/span.rs @@ -9,7 +9,6 @@ //! is possible to change its name, set its `Attributes`, and add `Links` and `Events`. //! These cannot be changed after the `Span`'s end time has been set. use crate::trace::SpanLimits; -use crate::Resource; use opentelemetry::trace::{Event, Link, SpanContext, SpanId, SpanKind, Status}; use opentelemetry::KeyValue; use std::borrow::Cow; @@ -77,11 +76,10 @@ impl Span { /// overhead. pub fn exported_data(&self) -> Option { let (span_context, tracer) = (self.span_context.clone(), &self.tracer); - let resource = self.tracer.provider()?.config().resource.clone(); self.data .as_ref() - .map(|data| build_export_data(data.clone(), span_context, resource, tracer)) + .map(|data| build_export_data(data.clone(), span_context, tracer)) } } @@ -225,17 +223,14 @@ impl Span { processor.on_end(build_export_data( data, self.span_context.clone(), - provider.config().resource.clone(), &self.tracer, )); } processors => { - let config = provider.config(); for processor in processors { processor.on_end(build_export_data( data.clone(), self.span_context.clone(), - config.resource.clone(), &self.tracer, )); } @@ -254,7 +249,6 @@ impl Drop for Span { fn build_export_data( data: SpanData, span_context: SpanContext, - resource: Cow<'static, Resource>, tracer: &crate::trace::Tracer, ) -> crate::export::trace::SpanData { crate::export::trace::SpanData { @@ -269,7 +263,6 @@ fn build_export_data( events: data.events, links: data.links, status: data.status, - resource, instrumentation_lib: tracer.instrumentation_library().clone(), } } diff --git a/opentelemetry-sdk/src/trace/span_processor.rs b/opentelemetry-sdk/src/trace/span_processor.rs index 86381119b0..30fb543768 100644 --- a/opentelemetry-sdk/src/trace/span_processor.rs +++ b/opentelemetry-sdk/src/trace/span_processor.rs @@ -35,6 +35,7 @@ //! [`TracerProvider`]: opentelemetry::trace::TracerProvider use crate::export::trace::{ExportResult, SpanData, SpanExporter}; +use crate::resource::Resource; use crate::runtime::{RuntimeChannel, TrySend}; use crate::trace::Span; use futures_channel::oneshot; @@ -50,7 +51,7 @@ use opentelemetry::{ Context, }; use std::cmp::min; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; use std::{env, fmt, str::FromStr, time::Duration}; /// Delay interval between two consecutive exports. @@ -92,6 +93,8 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug { /// Shuts down the processor. Called when SDK is shut down. This is an /// opportunity for processors to do any cleanup required. fn shutdown(&self) -> TraceResult<()>; + /// Set the resource for the log processor. + fn set_resource(&mut self, _resource: &Resource) {} } /// A [SpanProcessor] that passes finished spans to the configured @@ -147,6 +150,12 @@ impl SpanProcessor for SimpleSpanProcessor { )) } } + + fn set_resource(&mut self, resource: &Resource) { + if let Ok(mut exporter) = self.exporter.lock() { + exporter.set_resource(resource); + } + } } /// A [`SpanProcessor`] that asynchronously buffers finished spans and reports @@ -259,6 +268,13 @@ impl SpanProcessor for BatchSpanProcessor { .map_err(|err| TraceError::Other(err.into())) .and_then(|identity| identity) } + + fn set_resource(&mut self, resource: &Resource) { + let resource = Arc::new(resource.clone()); + let _ = self + .message_sender + .try_send(BatchMessage::SetResource(resource)); + } } /// Messages sent between application thread and batch span processor's work thread. @@ -275,6 +291,8 @@ enum BatchMessage { Flush(Option>), /// Shut down the worker thread, push all spans in buffer to the backend. Shutdown(oneshot::Sender), + /// Set the resource for the exporter. + SetResource(Arc), } struct BatchSpanProcessorInternal { @@ -375,8 +393,11 @@ impl BatchSpanProcessorInternal { self.exporter.shutdown(); return false; } + // propagate the resource + BatchMessage::SetResource(resource) => { + self.exporter.set_resource(&resource); + } } - true } @@ -710,7 +731,6 @@ mod tests { events: SpanEvents::default(), links: SpanLinks::default(), status: Status::Unset, - resource: Default::default(), instrumentation_lib: Default::default(), }; processor.on_end(unsampled); diff --git a/opentelemetry-stdout/src/trace/exporter.rs b/opentelemetry-stdout/src/trace/exporter.rs index 774920267a..c4d319ff31 100644 --- a/opentelemetry-stdout/src/trace/exporter.rs +++ b/opentelemetry-stdout/src/trace/exporter.rs @@ -5,6 +5,7 @@ use opentelemetry_sdk::export::{self, trace::ExportResult}; use std::io::{stdout, Write}; use crate::trace::transform::SpanData; +use opentelemetry_sdk::resource::Resource; type Encoder = Box TraceResult<()> + Send + Sync>; @@ -12,6 +13,7 @@ type Encoder = Box TraceResult<()> + Send + pub struct SpanExporter { writer: Option>, encoder: Encoder, + resource: Resource, } impl fmt::Debug for SpanExporter { @@ -36,11 +38,13 @@ impl Default for SpanExporter { impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn export(&mut self, batch: Vec) -> BoxFuture<'static, ExportResult> { let res = if let Some(writer) = &mut self.writer { - (self.encoder)(writer, crate::trace::SpanData::from(batch)).and_then(|_| { - writer - .write_all(b"\n") - .map_err(|err| TraceError::Other(Box::new(err))) - }) + (self.encoder)(writer, crate::trace::SpanData::new(batch, &self.resource)).and_then( + |_| { + writer + .write_all(b"\n") + .map_err(|err| TraceError::Other(Box::new(err))) + }, + ) } else { Err("exporter is shut down".into()) }; @@ -51,6 +55,10 @@ impl opentelemetry_sdk::export::trace::SpanExporter for SpanExporter { fn shutdown(&mut self) { self.writer.take(); } + + fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) { + self.resource = res.clone(); + } } /// Configuration for the stdout trace exporter @@ -107,6 +115,7 @@ impl SpanExporterBuilder { pub fn build(self) -> SpanExporter { SpanExporter { writer: Some(self.writer.unwrap_or_else(|| Box::new(stdout()))), + resource: Resource::empty(), encoder: self.encoder.unwrap_or_else(|| { Box::new(|writer, spans| { serde_json::to_writer(writer, &spans) diff --git a/opentelemetry-stdout/src/trace/transform.rs b/opentelemetry-stdout/src/trace/transform.rs index 66a659de07..484a222f42 100644 --- a/opentelemetry-stdout/src/trace/transform.rs +++ b/opentelemetry-stdout/src/trace/transform.rs @@ -9,17 +9,20 @@ pub struct SpanData { resource_spans: Vec, } -impl From> for SpanData { - fn from(sdk_spans: Vec) -> Self { +impl SpanData { + pub(crate) fn new( + sdk_spans: Vec, + sdk_resource: &opentelemetry_sdk::Resource, + ) -> Self { let mut resource_spans = HashMap::::new(); for sdk_span in sdk_spans { - let resource_schema_url = sdk_span.resource.schema_url().map(|s| s.to_string().into()); + let resource_schema_url = sdk_resource.schema_url().map(|s| s.to_string().into()); let schema_url = sdk_span.instrumentation_lib.schema_url.clone(); let scope = sdk_span.instrumentation_lib.clone().into(); - let resource = sdk_span.resource.as_ref().into(); + let resource: Resource = sdk_resource.into(); let rs = resource_spans - .entry(sdk_span.resource.as_ref().into()) + .entry(sdk_resource.into()) .or_insert_with(move || ResourceSpans { resource, scope_spans: Vec::with_capacity(1), diff --git a/opentelemetry-zipkin/src/exporter/model/span.rs b/opentelemetry-zipkin/src/exporter/model/span.rs index 6e21b52588..8c9c7fd5a1 100644 --- a/opentelemetry-zipkin/src/exporter/model/span.rs +++ b/opentelemetry-zipkin/src/exporter/model/span.rs @@ -60,9 +60,8 @@ mod tests { use crate::exporter::model::span::{Kind, Span}; use crate::exporter::model::{into_zipkin_span, OTEL_ERROR_DESCRIPTION, OTEL_STATUS_CODE}; use opentelemetry::trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId}; + use opentelemetry_sdk::export::trace::SpanData; use opentelemetry_sdk::trace::{SpanEvents, SpanLinks}; - use opentelemetry_sdk::{export::trace::SpanData, Resource}; - use std::borrow::Cow; use std::collections::HashMap; use std::net::Ipv4Addr; use std::time::SystemTime; @@ -166,7 +165,6 @@ mod tests { events: SpanEvents::default(), links: SpanLinks::default(), status, - resource: Cow::Owned(Resource::default()), instrumentation_lib: Default::default(), }; let local_endpoint = Endpoint::new("test".into(), None);