Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions datadog-sidecar/src/service/agent_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use futures::FutureExt;
use http::uri::PathAndQuery;
use libdd_common::{Endpoint, MutexExt};
use libdd_data_pipeline::agent_info::schema::AgentInfoStruct;
use libdd_data_pipeline::agent_info::{fetch_info_with_state, FetchInfoStatus};
use libdd_data_pipeline::agent_info::{fetch_info_with_state_tokio, FetchInfoStatus};
use manual_future::ManualFuture;
use std::ffi::CString;
use std::hash::{Hash, Hasher};
Expand Down Expand Up @@ -101,7 +101,7 @@ impl AgentInfoFetcher {
parts.path_and_query = Some(PathAndQuery::from_static("/info"));
fetch_endpoint.url = hyper::Uri::from_parts(parts).unwrap();
loop {
let fetched = fetch_info_with_state(&fetch_endpoint, state.as_deref()).await;
let fetched = fetch_info_with_state_tokio(&fetch_endpoint, state.as_deref()).await;
let mut complete_fut = None;
{
let mut infos_guard = agent_infos.0.lock_or_panic();
Expand Down
3 changes: 2 additions & 1 deletion libdd-common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,15 @@ hyper-util = { workspace = true }
http = "1.0"
http-body = "1.0"
http-body-util = "0.1"
bytes = "1.6"
tower-service = "0.3"
cc = "1.1.31"
pin-project = "1"
regex = "1.5"
rustls = { version = "0.23", default-features = false, optional = true }
rustls-native-certs = { version = "0.8.1", optional = true }
thiserror = "1.0"
tokio = { version = "1.23", features = ["rt", "macros"] }
tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] }
tokio-rustls = { version = "0.26", default-features = false, optional = true }
serde = { version = "1.0", features = ["derive"] }
static_assertions = "1.1.0"
Expand Down
14 changes: 7 additions & 7 deletions libdd-common/src/hyper_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use http_body_util::BodyExt;
use hyper::body::Incoming;
use pin_project::pin_project;
// Need aliases because cbindgen is not smart enough to figure type aliases
use hyper::Request as HyperRequest;
use http::Request as HyperRequest;

/// Create a new default configuration hyper client for fixed interval sending.
///
Expand All @@ -31,12 +31,12 @@ pub fn new_default_client() -> GenericHttpClient<Connector> {
.build(Connector::default())
}

pub type HttpResponse = hyper::Response<Body>;
pub type HttpResponse = http::Response<Body>;
pub type HttpRequest = HyperRequest<Body>;
pub type ClientError = hyper_util::client::legacy::Error;
pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;

pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
pub fn into_response(response: http::Response<Incoming>) -> HttpResponse {
response.map(Body::Incoming)
}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl std::error::Error for Error {}

pub fn mock_response(
builder: http::response::Builder,
body: hyper::body::Bytes,
body: bytes::Bytes,
) -> anyhow::Result<HttpResponse> {
Ok(builder.body(Body::from_bytes(body))?)
}
Expand Down Expand Up @@ -163,8 +163,8 @@ impl From<String> for Body {
}
}

impl hyper::body::Body for Body {
type Data = hyper::body::Bytes;
impl http_body::Body for Body {
type Data = bytes::Bytes;

type Error = Error;

Expand All @@ -182,7 +182,7 @@ impl hyper::body::Body for Body {
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
};
Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
Poll::Ready(Some(Ok(http_body::Frame::data(data))))
}
BodyProj::Incoming(pin) => pin.poll_frame(cx).map_err(Error::Hyper),
}
Expand Down
1 change: 1 addition & 0 deletions libdd-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod config;
pub mod error;
pub mod hyper_migration;
pub mod rate_limiter;
pub mod runtime;
pub mod tag;
pub mod timeout;
pub mod unix_utils;
Expand Down
78 changes: 78 additions & 0 deletions libdd-common/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{fmt::Debug, future::Future, io, time::Duration};

use hyper_util::client::legacy::connect::Connect;

pub trait Runtime: Debug + Send + Sync + 'static {
type JoinError;
type JoinHandle<R: Send + 'static>: FutureHandle<R, Self::JoinError> + Unpin;

fn new() -> io::Result<Self>
where
Self: Sized;

fn spawn_ref<Fut: Future<Output = R> + Send + 'static, R: Send + 'static>(
&self,
f: Fut,
) -> Self::JoinHandle<R>;

fn sleep(time: Duration) -> impl Future<Output = ()> + Send;

type HttpClient: HttpClient;

fn http_client() -> Self::HttpClient;
}

pub trait HttpClient: Send + Sync + Clone + Debug {
fn request(
&self,
req: http::Request<crate::hyper_migration::Body>,
) -> impl Future<Output = io::Result<http::Response<crate::hyper_migration::Body>>> + Send + 'static;
}

impl<C: Connect + Send + Sync + Clone + 'static> HttpClient for crate::GenericHttpClient<C> {
fn request(
&self,
req: http::Request<crate::hyper_migration::Body>,
) -> impl Future<Output = io::Result<http::Response<crate::hyper_migration::Body>>> + Send + 'static
{
let res = self.request(req);
async {
res.await
.map_err(io::Error::other)
.map(|b| b.map(crate::hyper_migration::Body::incoming))
}
}
}

pub trait FutureHandle<Ok, Err>: Future<Output = Result<Ok, Err>> {}

impl<Ok> FutureHandle<Ok, tokio::task::JoinError> for tokio::task::JoinHandle<Ok> {}

impl Runtime for tokio::runtime::Runtime {
type JoinError = tokio::task::JoinError;
type JoinHandle<R: Send + 'static> = tokio::task::JoinHandle<R>;

fn new() -> io::Result<Self> {
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
}

fn spawn_ref<Fut: Future<Output = R> + Send + 'static, R: Send + 'static>(
&self,
f: Fut,
) -> Self::JoinHandle<R> {
self.spawn(f)
}

fn sleep(time: Duration) -> impl Future<Output = ()> + Send {
tokio::time::sleep(time)
}

type HttpClient = crate::HttpClient;

fn http_client() -> Self::HttpClient {
crate::hyper_migration::new_default_client()
}
}
2 changes: 1 addition & 1 deletion libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
builder.enable_health_metrics();
}

match builder.build() {
match builder.build_tokio() {
Ok(exporter) => {
out_handle.as_ptr().write(Box::new(exporter));
None
Expand Down
4 changes: 3 additions & 1 deletion libdd-data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ fn main() {
.set_output_format(TraceExporterOutputFormat::V04)
.enable_telemetry(telemetry_cfg)
.enable_stats(Duration::from_secs(10));
let exporter = builder.build().expect("Failed to build TraceExporter");
let exporter = builder
.build_tokio()
.expect("Failed to build TraceExporter");
let now = UNIX_EPOCH
.elapsed()
.expect("Failed to get time since UNIX_EPOCH")
Expand Down
Loading
Loading