Skip to content

feat(data-pipeline-ffi): add functions to manipulate span from C #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions data-pipeline/src/trace_exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
use arc_swap::{ArcSwap, ArcSwapOption};
use bytes::Bytes;
use datadog_trace_utils::msgpack_decoder::{self, decode::error::DecodeError};
use datadog_trace_utils::msgpack_encoder;
use datadog_trace_utils::send_with_retry::{
send_with_retry, RetryStrategy, SendWithRetryError, SendWithRetryResult,
};
Expand Down Expand Up @@ -787,9 +788,7 @@ impl TraceExporter {

let strategy = RetryStrategy::default();
let mp_payload = match &payload {
tracer_payload::TraceChunks::V04(p) => {
rmp_serde::to_vec_named(p).map_err(TraceExporterError::Serialization)?
}
tracer_payload::TraceChunks::V04(p) => msgpack_encoder::v04::to_vec(p),
tracer_payload::TraceChunks::V05(p) => {
rmp_serde::to_vec(p).map_err(TraceExporterError::Serialization)?
}
Expand Down
6 changes: 5 additions & 1 deletion datadog-sidecar-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@ ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
ddtelemetry-ffi = { path = "../ddtelemetry-ffi", default-features = false }
datadog-remote-config = { path = "../datadog-remote-config" }
datadog-live-debugger = { path = "../datadog-live-debugger" }
dogstatsd-client = { path = "../dogstatsd-client" }
tinybytes = { path = "../tinybytes" }
paste = "1"
libc = "0.2"
dogstatsd-client = { path = "../dogstatsd-client" }
tracing = { version = "0.1", default-features = false }
rmp-serde = "1.1.1"


[target.'cfg(windows)'.dependencies]
datadog-crashtracker-ffi = { path = "../datadog-crashtracker-ffi", features = ["collector", "collector_windows"] }
Expand Down
6 changes: 6 additions & 0 deletions datadog-sidecar-ffi/cbindgen.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ language = "C"
tab_width = 2
header = """// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
typedef struct ddog_SpanBytes ddog_SpanBytes;
typedef struct ddog_SpanLinkBytes ddog_SpanLinkBytes;
typedef struct ddog_SpanEventBytes ddog_SpanEventBytes;
typedef struct ddog_AttributeAnyValueBytes ddog_AttributeAnyValueBytes;
typedef struct ddog_AttributeArrayValueBytes ddog_AttributeArrayValueBytes;
"""
include_guard = "DDOG_SIDECAR_H"
style = "both"
Expand Down
99 changes: 99 additions & 0 deletions datadog-sidecar-ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#![cfg_attr(not(test), deny(clippy::todo))]
#![cfg_attr(not(test), deny(clippy::unimplemented))]

pub mod span;

use crate::span::TracesBytes;
#[cfg(windows)]
use datadog_crashtracker_ffi::Metadata;
use datadog_ipc::platform::{
Expand All @@ -28,6 +31,7 @@ use datadog_sidecar::service::{
InstanceId, QueueId, RuntimeMetadata, SerializedTracerHeaderTags, SessionConfig, SidecarAction,
};
use datadog_sidecar::shm_remote_config::{path_for_remote_config, RemoteConfigReader};
use datadog_trace_utils::msgpack_encoder;
use ddcommon::tag::Tag;
use ddcommon::Endpoint;
use ddcommon_ffi as ffi;
Expand Down Expand Up @@ -995,6 +999,101 @@ pub unsafe extern "C" fn ddog_get_agent_info_env<'a>(
.unwrap_or(ffi::CharSlice::empty())
}

#[macro_export]
macro_rules! check {
($failable:expr, $msg:expr) => {
match $failable {
Ok(o) => o,
Err(e) => {
tracing::error!("{}: {}", $msg, e);
return;
}
}
};
}

#[repr(C)]
#[derive()]
pub struct SenderParameters {
pub tracer_headers_tags: TracerHeaderTags<'static>,
pub transport: Box<SidecarTransport>,
pub instance_id: Box<InstanceId>,
pub limit: usize,
pub n_requests: i64,
pub buffer_size: i64,
pub url: CharSlice<'static>,
}

#[no_mangle]
#[allow(clippy::missing_safety_doc)]
pub unsafe extern "C" fn ddog_send_traces_to_sidecar(
traces: &mut TracesBytes,
parameters: &mut SenderParameters,
) {
let size: usize = traces.iter().map(|trace| trace.len()).sum();

// Create and map shared memory
let shm = check!(
ShmHandle::new(parameters.limit),
"Failed to create shared memory"
);

let mut mapped_shm = check!(shm.clone().map(), "Failed to map shared memory");

// Write traces to the shared memory
let mut shm_slice = mapped_shm.as_slice_mut();
let shm_slice_len = shm_slice.len();
let written = match msgpack_encoder::v04::to_slice(&mut shm_slice, traces) {
Ok(()) => shm_slice_len - shm_slice.len(),
Err(_) => {
tracing::error!("Failed serializing the traces");
return;
}
};

// Send traces to the sidecar via the shared memory handler
let mut size_hint = written;
if parameters.n_requests > 0 {
size_hint = size_hint.max((parameters.buffer_size / parameters.n_requests + 1) as usize);
}

let send_error = blocking::send_trace_v04_shm(
&mut parameters.transport,
&parameters.instance_id,
shm,
size_hint,
check!(
(&parameters.tracer_headers_tags).try_into(),
"Failed to convert tracer headers tags"
),
);

// Retry sending traces via bytes if there was an error
if send_error.is_err() {
match blocking::send_trace_v04_bytes(
&mut parameters.transport,
&parameters.instance_id,
msgpack_encoder::v04::to_vec_with_capacity(traces, written as u32),
check!(
(&parameters.tracer_headers_tags).try_into(),
"Failed to convert tracer headers tags"
),
) {
Ok(_) => {}
Err(_) => tracing::debug!(
"Failed sending traces via shm to sidecar: {}",
send_error.err().unwrap_unchecked().to_string()
),
};
}

tracing::info!(
"Flushing traces of size {} to send-queue for {}",
size,
parameters.url
);
}

/// Drops the agent info reader.
#[no_mangle]
#[allow(clippy::missing_safety_doc)]
Expand Down
Loading
Loading