Skip to content

feat(apm): aggregate and flush data accordingly for DSM, Profiling, LLMObs, and Live Debugger #737

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 69 additions & 8 deletions bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ use bottlecap::{
listener::TelemetryListener,
},
traces::{
proxy_aggregator,
proxy_flusher::Flusher as ProxyFlusher,
stats_aggregator::StatsAggregator,
stats_flusher::{self, StatsFlusher},
stats_processor, trace_agent, trace_aggregator,
Expand Down Expand Up @@ -95,6 +97,7 @@ struct PendingFlushHandles {
trace_flush_handles: FuturesOrdered<JoinHandle<Vec<SendData>>>,
log_flush_handles: FuturesOrdered<JoinHandle<Vec<reqwest::RequestBuilder>>>,
metric_flush_handles: FuturesOrdered<JoinHandle<MetricsRetryBatch>>,
proxy_flush_handles: FuturesOrdered<JoinHandle<Vec<reqwest::RequestBuilder>>>,
}

struct MetricsRetryBatch {
Expand All @@ -109,6 +112,7 @@ impl PendingFlushHandles {
trace_flush_handles: FuturesOrdered::new(),
log_flush_handles: FuturesOrdered::new(),
metric_flush_handles: FuturesOrdered::new(),
proxy_flush_handles: FuturesOrdered::new(),
}
}

Expand All @@ -117,6 +121,7 @@ impl PendingFlushHandles {
logs_flusher: &LogsFlusher,
trace_flusher: &ServerlessTraceFlusher,
metrics_flushers: &Arc<TokioMutex<Vec<MetricsFlusher>>>,
proxy_flusher: &Arc<ProxyFlusher>,
) -> bool {
let mut joinset = tokio::task::JoinSet::new();
let mut flush_error = false;
Expand Down Expand Up @@ -190,6 +195,24 @@ impl PendingFlushHandles {
}
}

while let Some(retries) = self.proxy_flush_handles.next().await {
match retries {
Ok(batch) => {
if !batch.is_empty() {
debug!("Redriving {:?} APM proxy payloads", batch.len());
}

let pf = proxy_flusher.clone();
joinset.spawn(async move {
pf.flush(Some(batch)).await;
});
}
Err(e) => {
error!("Redrive error in APM proxy: {e:?}");
}
}
}

// Wait for all flush join operations to complete
while let Some(result) = joinset.join_next().await {
if let Err(e) = result {
Expand Down Expand Up @@ -472,6 +495,7 @@ async fn extension_loop_active(
trace_flusher,
trace_processor,
stats_flusher,
proxy_flusher,
trace_agent_shutdown_token,
) = start_trace_agent(
config,
Expand Down Expand Up @@ -548,6 +572,7 @@ async fn extension_loop_active(
&mut locked_metrics,
&*trace_flusher,
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
)
Expand All @@ -562,6 +587,7 @@ async fn extension_loop_active(
&mut locked_metrics,
&*trace_flusher,
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
)
Expand All @@ -575,18 +601,23 @@ async fn extension_loop_active(
let tf = trace_flusher.clone();
// Await any previous flush handles. This
last_continuous_flush_error = pending_flush_handles
.await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers)
.await_flush_handles(
&logs_flusher.clone(),
&tf,
&metrics_flushers,
&proxy_flusher,
)
.await;

let val = logs_flusher.clone();
let lf = logs_flusher.clone();
pending_flush_handles
.log_flush_handles
.push_back(tokio::spawn(async move { val.flush(None).await }));
let traces_val = trace_flusher.clone();
.push_back(tokio::spawn(async move { lf.flush(None).await }));
let tf = trace_flusher.clone();
pending_flush_handles
.trace_flush_handles
.push_back(tokio::spawn(async move {
traces_val.flush(None).await.unwrap_or_default()
tf.flush(None).await.unwrap_or_default()
}));
let (metrics_flushers_copy, series, sketches) = {
let locked_metrics = metrics_flushers.lock().await;
Expand All @@ -613,6 +644,14 @@ async fn extension_loop_active(
});
pending_flush_handles.metric_flush_handles.push_back(handle);
}

let pf = proxy_flusher.clone();
pending_flush_handles
.proxy_flush_handles
.push_back(tokio::spawn(async move {
pf.flush(None).await.unwrap_or_default()
}));

race_flush_interval.reset();
} else if current_flush_decision == FlushDecision::Periodic {
let mut locked_metrics = metrics_flushers.lock().await;
Expand All @@ -621,6 +660,7 @@ async fn extension_loop_active(
&mut locked_metrics,
&*trace_flusher,
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
)
Expand Down Expand Up @@ -660,6 +700,7 @@ async fn extension_loop_active(
&mut locked_metrics,
&*trace_flusher,
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
)
Expand All @@ -676,7 +717,12 @@ async fn extension_loop_active(
// Redrive/block on any failed payloads
let tf = trace_flusher.clone();
pending_flush_handles
.await_flush_handles(&logs_flusher.clone(), &tf, &metrics_flushers)
.await_flush_handles(
&logs_flusher.clone(),
&tf,
&metrics_flushers,
&proxy_flusher,
)
.await;
// The Shutdown event we get during a timeout will
// never include a report log
Expand Down Expand Up @@ -712,6 +758,7 @@ async fn extension_loop_active(
&mut locked_metrics,
&*trace_flusher,
&*stats_flusher,
&proxy_flusher,
&mut race_flush_interval,
&metrics_aggr,
)
Expand All @@ -726,6 +773,7 @@ async fn blocking_flush_all(
metrics_flushers: &mut [MetricsFlusher],
trace_flusher: &impl TraceFlusher,
stats_flusher: &impl StatsFlusher,
proxy_flusher: &ProxyFlusher,
race_flush_interval: &mut tokio::time::Interval,
metrics_aggr: &Arc<Mutex<MetricsAggregator>>,
) {
Expand All @@ -745,7 +793,8 @@ async fn blocking_flush_all(
logs_flusher.flush(None),
futures::future::join_all(metrics_futures),
trace_flusher.flush(None),
stats_flusher.flush()
stats_flusher.flush(),
proxy_flusher.flush(None),
);
race_flush_interval.reset();
}
Expand Down Expand Up @@ -969,6 +1018,7 @@ fn start_metrics_flushers(
flushers
}

#[allow(clippy::type_complexity)]
fn start_trace_agent(
config: &Arc<Config>,
resolved_api_key: String,
Expand All @@ -980,6 +1030,7 @@ fn start_trace_agent(
Arc<trace_flusher::ServerlessTraceFlusher>,
Arc<trace_processor::ServerlessTraceProcessor>,
Arc<stats_flusher::ServerlessStatsFlusher>,
Arc<ProxyFlusher>,
tokio_util::sync::CancellationToken,
) {
// Stats
Expand Down Expand Up @@ -1012,15 +1063,24 @@ fn start_trace_agent(
resolved_api_key: resolved_api_key.clone(),
});

// Proxy
let proxy_aggregator = Arc::new(TokioMutex::new(proxy_aggregator::Aggregator::default()));
let proxy_flusher = Arc::new(ProxyFlusher::new(
resolved_api_key,
Arc::clone(&proxy_aggregator),
Arc::clone(tags_provider),
Arc::clone(config),
));

let trace_agent = trace_agent::TraceAgent::new(
Arc::clone(config),
trace_aggregator,
trace_processor.clone(),
stats_aggregator,
stats_processor,
proxy_aggregator,
invocation_processor,
Arc::clone(tags_provider),
resolved_api_key,
);
let trace_agent_channel = trace_agent.get_sender_copy();
let shutdown_token = trace_agent.shutdown_token();
Expand All @@ -1037,6 +1097,7 @@ fn start_trace_agent(
trace_flusher,
trace_processor,
stats_flusher,
proxy_flusher,
shutdown_token,
)
}
Expand Down
4 changes: 2 additions & 2 deletions bottlecap/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::sync::Arc;
use tracing::error;

#[must_use]
pub fn get_client(config: Arc<config::Config>) -> reqwest::Client {
pub fn get_client(config: &Arc<config::Config>) -> reqwest::Client {
build_client(config).unwrap_or_else(|e| {
error!(
"Unable to parse proxy configuration: {}, no proxy will be used",
Expand All @@ -23,7 +23,7 @@ pub fn get_client(config: Arc<config::Config>) -> reqwest::Client {
})
}

fn build_client(config: Arc<config::Config>) -> Result<reqwest::Client, Box<dyn Error>> {
fn build_client(config: &Arc<config::Config>) -> Result<reqwest::Client, Box<dyn Error>> {
let mut client = create_reqwest_client_builder()?
.timeout(Duration::from_secs(config.flush_timeout))
.pool_idle_timeout(Some(Duration::from_secs(270)))
Expand Down
2 changes: 1 addition & 1 deletion bottlecap/src/logs/flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Flusher {
aggregator: Arc<Mutex<Aggregator>>,
config: Arc<config::Config>,
) -> Self {
let client = get_client(config.clone());
let client = get_client(&config);
let mut headers = HeaderMap::new();
headers.insert(
"DD-API-KEY",
Expand Down
9 changes: 9 additions & 0 deletions bottlecap/src/traces/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

pub mod context;
pub mod propagation;
pub mod proxy_aggregator;
pub mod proxy_flusher;
pub mod span_pointers;
pub mod stats_aggregator;
pub mod stats_flusher;
Expand Down Expand Up @@ -32,3 +34,10 @@ const AWS_XRAY_DAEMON_ADDRESS_URL_PREFIX: &str = "169.254.79.129";

// Name of the placeholder invocation span set by Java and Go tracers
const INVOCATION_SPAN_RESOURCE: &str = "dd-tracer-serverless-span";

#[allow(clippy::doc_markdown)]
/// Header used for additional tags when sending APM data to the Datadog intake
///
/// Used when we are appending Lambda specific tags to incoming APM data from
/// products like DSM, Profiling, LLMObs, Live Debugger, and more.
const DD_ADDITIONAL_TAGS_HEADER: &str = "X-Datadog-Additional-Tags";
30 changes: 30 additions & 0 deletions bottlecap/src/traces/proxy_aggregator.rs
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like how simple this aggregator is, its just a wrapper for a queue, but I think so far we should respect the way we are following this pattern, maybe we can create a trait and consolidate how we create aggregators

Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use bytes::Bytes;
use reqwest::header::HeaderMap;

pub struct ProxyRequest {
pub headers: HeaderMap,
pub body: Bytes,
pub target_url: String,
}

pub struct Aggregator {
queue: Vec<ProxyRequest>,
}

impl Default for Aggregator {
fn default() -> Self {
Aggregator {
queue: Vec::with_capacity(128), // arbitrary capacity for request queue
}
}
}

impl Aggregator {
pub fn add(&mut self, request: ProxyRequest) {
self.queue.push(request);
}

pub fn get_batch(&mut self) -> Vec<ProxyRequest> {
std::mem::take(&mut self.queue)
}
}
Loading
Loading