Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to hyper/http v1.0 #1674

Merged
merged 33 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e52ba2c
Upgrade to hyper v1
aumetra Apr 22, 2024
6c75e7c
Fix hyper feature
aumetra Apr 22, 2024
e22bb0b
Fix compilation
aumetra Apr 22, 2024
adda11d
Ignore doctest for now
aumetra Apr 22, 2024
0664b64
Enable HTTP 1 and 2
aumetra Apr 22, 2024
75b3259
Fix some lints
aumetra May 3, 2024
5944e90
Update test
aumetra May 3, 2024
45903d8
Remove unused dependencies/features
aumetra May 3, 2024
650e688
Prefer `::from`
aumetra May 3, 2024
832b1cb
Remove some features
aumetra May 3, 2024
40fa576
Use pinned PR version of external types
aumetra May 3, 2024
1a3d711
Add CHANGELOG entries
aumetra May 3, 2024
8e6c5d2
Add `serde_json` dependencies back
aumetra May 3, 2024
8ad764a
Replace `Either` usage with `BoxBody`
aumetra May 5, 2024
6f34ee3
Add allow over generated module
aumetra Jul 9, 2024
70a9bc7
Fix hyper example
aumetra Jul 9, 2024
95608c5
Regenerate protobuf types
aumetra Jul 9, 2024
318bb62
Pin cc version
aumetra Jul 9, 2024
c93e43f
Mention tonic in changelog
aumetra Jul 9, 2024
ab7a3a4
Remove mention from appender-log changelog
aumetra Jul 9, 2024
af97e0e
Add back entry
aumetra Jul 9, 2024
eb2ae32
Remove git patch for CI
aumetra Jul 9, 2024
23c09c7
Remove allow annotation
aumetra Jul 9, 2024
f5c73eb
Update zipkin changelog
aumetra Jul 10, 2024
a87360b
Remove cc pin
aumetra Jul 10, 2024
88ad416
Revert submodule update
aumetra Jul 11, 2024
7d1c3a1
Reorder dependencies
aumetra Jul 11, 2024
1678306
Update changelogs
aumetra Jul 11, 2024
19ddbd8
Merge branch 'main' into http-1.0
aumetra Jul 11, 2024
25f9213
Add opaque wrapper around Full body, use wrapper
aumetra Jul 11, 2024
c840978
Move body impl, use box dyn error type
aumetra Jul 11, 2024
1d1cef9
Fix compile error
aumetra Jul 11, 2024
7d9670f
Merge branch 'main' into http-1.0
lalitb Jul 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ criterion = "0.5"
futures-core = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false }
hyper = { version = "0.14", default-features = false }
http = { version = "0.2", default-features = false }
http = { version = "1.1", default-features = false, features = ["std"] }
aumetra marked this conversation as resolved.
Show resolved Hide resolved
http-body-util = "0.1"
hyper = { version = "1.3", default-features = false }
hyper-util = "0.1"
log = "0.4.21"
once_cell = "1.13"
ordered-float = "4.0"
pin-project-lite = "0.2"
prost = "0.12"
prost-build = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-build = "0.13"
prost-types = "0.13"
rand = { version = "0.8", default-features = false }
reqwest = { version = "0.11", default-features = false }
reqwest = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false }
serde_json = "1.0"
temp-env = "0.3.6"
thiserror = { version = "1", default-features = false }
tonic = { version = "0.11", default-features = false }
tonic-build = "0.11"
tonic = { version = "0.12", default-features = false }
tonic-build = "0.12"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.1"
tracing = { version = "0.1", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ path = "src/client.rs"
doc = false

[dependencies]
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
Expand Down
9 changes: 5 additions & 4 deletions examples/tracing-http-propagator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use hyper::{body::Body, Client};
use http_body_util::Full;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::{
global,
trace::{SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderInjector;
use opentelemetry_http::{Bytes, HeaderInjector};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_stdout::SpanExporter;

Expand All @@ -24,7 +25,7 @@ async fn send_request(
body_content: &str,
span_name: &str,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = Client::new();
let client = Client::builder(TokioExecutor::new()).build_http();
let tracer = global::tracer("example/client");
let span = tracer
.span_builder(String::from(span_name))
Expand All @@ -37,7 +38,7 @@ async fn send_request(
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
});
let res = client
.request(req.body(Body::from(String::from(body_content)))?)
.request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
.await?;

cx.span().add_event(
Expand Down
56 changes: 37 additions & 19 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,64 @@
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use http_body_util::{combinators::BoxBody, BodyExt, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::{
global,
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_http::{Bytes, HeaderExtractor};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::SpanExporter;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

// Utility function to extract the context from the incoming request headers
fn extract_context_from_request(req: &Request<Body>) -> Context {
fn extract_context_from_request(req: &Request<Incoming>) -> Context {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
})
}

// Separate async function for the handle endpoint
async fn handle_health_check(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_health_check(
_req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("health_check")
.with_kind(SpanKind::Internal)
.start(&tracer);
span.add_event("Health check accessed", vec![]);
let res = Response::new(Body::from("Server is up and running!"));

let res = Response::new(
Full::new(Bytes::from_static(b"Server is up and running!"))
.map_err(|err| match err {})
.boxed(),
);

Ok(res)
}

// Separate async function for the echo endpoint
async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_echo(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("echo")
.with_kind(SpanKind::Internal)
.start(&tracer);
span.add_event("Echoing back the request", vec![]);
let res = Response::new(req.into_body());

let res = Response::new(req.into_body().boxed());

Ok(res)
}

async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn router(
req: Request<Incoming>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, Infallible> {
// Extract the context from the incoming request headers
let parent_cx = extract_context_from_request(&req);
let response = {
Expand All @@ -64,12 +78,13 @@ async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
_ => {
cx.span()
.set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404));
let mut not_found = Response::default();
let mut not_found = Response::new(BoxBody::default());
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
};

response
}

Expand All @@ -87,15 +102,18 @@ fn init_tracer() {

#[tokio::main]
async fn main() {
use hyper_util::server::conn::auto::Builder;

init_tracer();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await.unwrap();

let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) });

let server = Server::bind(&addr).serve(make_svc);

println!("Listening on {addr}");
if let Err(e) = server.await {
eprintln!("server error: {e}");
while let Ok((stream, _addr)) = listener.accept().await {
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(stream), service_fn(router))
.await
{
eprintln!("{err}");
}
}
}
1 change: 0 additions & 1 deletion opentelemetry-appender-tracing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.65"

[dependencies]
log = { workspace = true, optional = true }
once_cell = { workspace = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["logs"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions opentelemetry-http/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- **Breaking** Correct the misspelling of "webkpi" to "webpki" in features [#1842](https://github.com/open-telemetry/opentelemetry-rust/pull/1842)
- **Breaking** Remove support for the `isahc` HTTP client [#1924](https://github.com/open-telemetry/opentelemetry-rust/pull/1924)
- Update to `http` v1 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)

## v0.12.0

Expand Down
5 changes: 4 additions & 1 deletion opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@ edition = "2021"
rust-version = "1.65"

[features]
hyper = ["dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:tokio"]
reqwest-rustls = ["reqwest", "reqwest/rustls-tls-native-roots"]
reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]

[dependencies]
async-trait = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, optional = true }
hyper-util = { workspace = true, features = ["client-legacy", "http2"], optional = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
44 changes: 37 additions & 7 deletions opentelemetry-http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,24 @@

use super::{async_trait, Bytes, HttpClient, HttpError, Request, Response};
use http::HeaderValue;
use hyper::client::connect::Connect;
use hyper::Client;
use http_body_util::{BodyExt, Full};
use hyper::body::{Body as HttpBody, Frame};
use hyper_util::client::legacy::{connect::Connect, Client};
use std::fmt::Debug;
use std::pin::Pin;
use std::task::{self, Poll};
use std::time::Duration;
use tokio::time;

#[derive(Debug, Clone)]
pub struct HyperClient<C> {
inner: Client<C>,
inner: Client<C, Body>,
timeout: Duration,
authorization: Option<HeaderValue>,
}

impl<C> HyperClient<C> {
pub fn new_with_timeout(inner: Client<C>, timeout: Duration) -> Self {
pub fn new_with_timeout(inner: Client<C, Body>, timeout: Duration) -> Self {

Check warning on line 125 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L125

Added line #L125 was not covered by tests
Self {
inner,
timeout,
Expand All @@ -128,7 +131,7 @@
}

pub fn new_with_timeout_and_authorization_header(
inner: Client<C>,
inner: Client<C, Body>,

Check warning on line 134 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L134

Added line #L134 was not covered by tests
timeout: Duration,
authorization: HeaderValue,
) -> Self {
Expand All @@ -147,22 +150,49 @@
{
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let (parts, body) = request.into_parts();
let mut request = Request::from_parts(parts, body.into());
let mut request = Request::from_parts(parts, Body(Full::from(body)));

Check warning on line 153 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L153

Added line #L153 was not covered by tests
if let Some(ref authorization) = self.authorization {
request
.headers_mut()
.insert(http::header::AUTHORIZATION, authorization.clone());
}
let mut response = time::timeout(self.timeout, self.inner.request(request)).await??;
let headers = std::mem::take(response.headers_mut());

Check warning on line 161 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L161

Added line #L161 was not covered by tests
let mut http_response = Response::builder()
.status(response.status())
.body(hyper::body::to_bytes(response.into_body()).await?)?;
.body(response.into_body().collect().await?.to_bytes())?;

Check warning on line 164 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L164

Added line #L164 was not covered by tests
*http_response.headers_mut() = headers;

Ok(http_response.error_for_status()?)
}
}

pub struct Body(Full<Bytes>);

impl HttpBody for Body {
type Data = Bytes;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;

#[inline]
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let inner_body = unsafe { self.map_unchecked_mut(|b| &mut b.0) };
inner_body.poll_frame(cx).map_err(Into::into)
}

Check warning on line 184 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L178-L184

Added lines #L178 - L184 were not covered by tests

#[inline]
fn is_end_stream(&self) -> bool {
self.0.is_end_stream()
}

Check warning on line 189 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L187-L189

Added lines #L187 - L189 were not covered by tests

#[inline]
fn size_hint(&self) -> hyper::body::SizeHint {
self.0.size_hint()
}

Check warning on line 194 in opentelemetry-http/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-http/src/lib.rs#L192-L194

Added lines #L192 - L194 were not covered by tests
}
}

/// Methods to make working with responses from the [`HttpClient`] trait easier.
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ now use `.with_resource(RESOURCE::default())` to configure Resource when using
previous release.
- **Breaking** [1869](https://github.com/open-telemetry/opentelemetry-rust/pull/1869) The OTLP logs exporter now overrides the [InstrumentationScope::name](https://github.com/open-telemetry/opentelemetry-proto/blob/b3060d2104df364136d75a35779e6bd48bac449a/opentelemetry/proto/common/v1/common.proto#L73) field with the `target` from `LogRecord`, if target is populated.
- Groups batch of `LogRecord` and `Span` by their resource and instrumentation scope before exporting, for better efficiency [#1873](https://github.com/open-telemetry/opentelemetry-rust/pull/1873).

- **Breaking** Update to `http` v1 and `tonic` v0.12 [#1674](https://github.com/open-telemetry/opentelemetry-rust/pull/1674)

## v0.16.0

Expand Down
4 changes: 3 additions & 1 deletion opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false
[features]
default = ["reqwest"]
reqwest = ["opentelemetry-otlp/reqwest-client"]
hyper = ["dep:async-trait", "dep:http", "dep:hyper", "dep:opentelemetry-http", "dep:bytes"]
hyper = ["dep:async-trait", "dep:http", "dep:http-body-util", "dep:hyper", "dep:hyper-util", "dep:opentelemetry-http", "dep:bytes"]


[dependencies]
Expand All @@ -23,7 +23,9 @@ opentelemetry-semantic-conventions = { path = "../../../opentelemetry-semantic-c
async-trait = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
http = { workspace = true, optional = true }
http-body-util = { workspace = true, optional = true }
hyper = { workspace = true, features = ["client"], optional = true }
hyper-util = { workspace = true, features = ["client-legacy"], optional = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true, features = ["std"]}
tracing-core = { workspace = true }
Expand Down
18 changes: 11 additions & 7 deletions opentelemetry-otlp/examples/basic-otlp-http/src/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
use async_trait::async_trait;
use bytes::Bytes;
use http::{Request, Response};
use hyper::{
client::{connect::Connect, HttpConnector},
Body, Client,
use http_body_util::{BodyExt, Full};
use hyper_util::{
client::legacy::{
connect::{Connect, HttpConnector},
Client,
},
rt::TokioExecutor,
};
use opentelemetry_http::{HttpClient, HttpError, ResponseExt};

pub struct HyperClient<C> {
inner: hyper::Client<C>,
inner: hyper_util::client::legacy::Client<C, Full<Bytes>>,
}

impl Default for HyperClient<HttpConnector> {
fn default() -> Self {
Self {
inner: Client::new(),
inner: Client::builder(TokioExecutor::new()).build_http(),
}
}
}
Expand All @@ -30,15 +34,15 @@ impl<C> std::fmt::Debug for HyperClient<C> {
#[async_trait]
impl<C: Connect + Clone + Send + Sync + 'static> HttpClient for HyperClient<C> {
async fn send(&self, request: Request<Vec<u8>>) -> Result<Response<Bytes>, HttpError> {
let request = request.map(Body::from);
let request = request.map(|body| Full::new(Bytes::from(body)));

let (parts, body) = self
.inner
.request(request)
.await?
.error_for_status()?
.into_parts();
let body = hyper::body::to_bytes(body).await?;
let body = body.collect().await?.to_bytes();

Ok(Response::from_parts(parts, body))
}
Expand Down
Loading
Loading