Skip to content

Commit

Permalink
Upgrade to hyper v1
Browse files Browse the repository at this point in the history
  • Loading branch information
aumetra committed Jul 10, 2024
1 parent 3882b22 commit e52ba2c
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 57 deletions.
18 changes: 10 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,25 @@ criterion = "0.5"
futures-core = "0.3"
futures-executor = "0.3"
futures-util = { version = "0.3", default-features = false }
hyper = { version = "0.14", default-features = false }
http = { version = "0.2", default-features = false }
hyper = { version = "1.3", default-features = false }
hyper-util = "0.1"
http = { version = "1.1", default-features = false }
http-body-util = "0.1"
log = "0.4.21"
once_cell = "1.13"
ordered-float = "4.0"
pin-project-lite = "0.2"
prost = "0.12"
prost-build = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-build = "0.13"
prost-types = "0.13"
rand = { version = "0.8", default-features = false }
reqwest = { version = "0.11", default-features = false }
reqwest = { version = "0.12", default-features = false }
serde = { version = "1.0", default-features = false }
serde_json = "1.0"
temp-env = "0.3.6"
thiserror = { version = "1", default-features = false }
tonic = { version = "0.11", default-features = false }
tonic-build = "0.11"
tonic = { version = "0.12", default-features = false }
tonic-build = "0.12"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.1"
tracing = { version = "0.1", default-features = false }
Expand Down
2 changes: 2 additions & 0 deletions examples/tracing-http-propagator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ path = "src/client.rs"
doc = false

[dependencies]
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }
opentelemetry = { path = "../../opentelemetry" }
opentelemetry_sdk = { path = "../../opentelemetry-sdk" }
Expand Down
9 changes: 5 additions & 4 deletions examples/tracing-http-propagator/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use hyper::{body::Body, Client};
use http_body_util::Full;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use opentelemetry::{
global,
trace::{SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderInjector;
use opentelemetry_http::{Bytes, HeaderInjector};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_stdout::SpanExporter;

Expand All @@ -24,7 +25,7 @@ async fn send_request(
body_content: &str,
span_name: &str,
) -> std::result::Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let client = Client::new();
let client = Client::builder(TokioExecutor::new()).build_http();
let tracer = global::tracer("example/client");
let span = tracer
.span_builder(String::from(span_name))
Expand All @@ -37,7 +38,7 @@ async fn send_request(
propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut().unwrap()))
});
let res = client
.request(req.body(Body::from(String::from(body_content)))?)
.request(req.body(Full::new(Bytes::from(body_content.to_string())))?)
.await?;

cx.span().add_event(
Expand Down
52 changes: 32 additions & 20 deletions examples/tracing-http-propagator/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
use hyper::{
service::{make_service_fn, service_fn},
Body, Request, Response, Server, StatusCode,
};
use http_body_util::{Either, Full};
use hyper::{body::Incoming, service::service_fn, Request, Response, StatusCode};
use hyper_util::rt::{TokioExecutor, TokioIo};
use opentelemetry::{
global,
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
Context, KeyValue,
};
use opentelemetry_http::HeaderExtractor;
use opentelemetry_http::{Bytes, HeaderExtractor};
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::TracerProvider};
use opentelemetry_semantic_conventions::trace;
use opentelemetry_stdout::SpanExporter;
use std::{convert::Infallible, net::SocketAddr};
use tokio::net::TcpListener;

// Utility function to extract the context from the incoming request headers
fn extract_context_from_request(req: &Request<Body>) -> Context {
fn extract_context_from_request(req: &Request<Incoming>) -> Context {
global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(req.headers()))
})
}

// Separate async function for the handle endpoint
async fn handle_health_check(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_health_check(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("health_check")
.with_kind(SpanKind::Internal)
.start(&tracer);
span.add_event("Health check accessed", vec![]);
let res = Response::new(Body::from("Server is up and running!"));
let res = Response::new(Full::new(Bytes::from_static(b"Server is up and running!")));
Ok(res)
}

// Separate async function for the echo endpoint
async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn handle_echo(req: Request<Incoming>) -> Result<Response<Incoming>, Infallible> {
let tracer = global::tracer("example/server");
let mut span = tracer
.span_builder("echo")
Expand All @@ -44,7 +44,9 @@ async fn handle_echo(req: Request<Body>) -> Result<Response<Body>, Infallible> {
Ok(res)
}

async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {
async fn router(
req: Request<Incoming>,
) -> Result<Response<Either<Full<Bytes>, Incoming>>, Infallible> {
// Extract the context from the incoming request headers
let parent_cx = extract_context_from_request(&req);
let response = {
Expand All @@ -59,17 +61,24 @@ async fn router(req: Request<Body>) -> Result<Response<Body>, Infallible> {

let cx = Context::default().with_span(span);
match (req.method(), req.uri().path()) {
(&hyper::Method::GET, "/health") => handle_health_check(req).with_context(cx).await,
(&hyper::Method::GET, "/echo") => handle_echo(req).with_context(cx).await,
(&hyper::Method::GET, "/health") => handle_health_check(req)
.with_context(cx)
.await
.map(|response| response.map(Either::Left)),
(&hyper::Method::GET, "/echo") => handle_echo(req)
.with_context(cx)
.await
.map(|response| response.map(Either::Right)),
_ => {
cx.span()
.set_attribute(KeyValue::new(trace::HTTP_RESPONSE_STATUS_CODE, 404));
let mut not_found = Response::default();
let mut not_found = Response::new(Either::Left(Full::default()));
*not_found.status_mut() = StatusCode::NOT_FOUND;
Ok(not_found)
}
}
};

response
}

Expand All @@ -87,15 +96,18 @@ fn init_tracer() {

#[tokio::main]
async fn main() {
use hyper_util::server::conn::auto::Builder;

init_tracer();
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = TcpListener::bind(addr).await.unwrap();

let make_svc = make_service_fn(|_conn| async { Ok::<_, Infallible>(service_fn(router)) });

let server = Server::bind(&addr).serve(make_svc);

println!("Listening on {addr}");
if let Err(e) = server.await {
eprintln!("server error: {e}");
while let Ok((stream, _addr)) = listener.accept().await {
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection(TokioIo::new(stream), service_fn(router))
.await
{
eprintln!("{err}");
}
}
}
2 changes: 1 addition & 1 deletion opentelemetry-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ reqwest-rustls-webpki-roots = ["reqwest", "reqwest/rustls-tls-webpki-roots"]
async-trait = { workspace = true }
bytes = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, features = ["http2", "client", "tcp"], optional = true }
hyper = { workspace = true, features = ["http2", "client"], optional = true }
opentelemetry = { version = "0.23", path = "../opentelemetry", features = ["trace"] }
reqwest = { workspace = true, features = ["blocking"], optional = true }
tokio = { workspace = true, features = ["time"], optional = true }
2 changes: 2 additions & 0 deletions opentelemetry-prometheus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ protobuf = "2.14"

[dev-dependencies]
opentelemetry-semantic-conventions = { version = "0.15" }
http-body-util = { workspace = true }
hyper = { workspace = true, features = ["full"] }
hyper-util = { workspace = true, features = ["full"] }
tokio = { workspace = true, features = ["full"] }

[features]
Expand Down
49 changes: 27 additions & 22 deletions opentelemetry-prometheus/examples/hyper.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
use http_body_util::Full;
use hyper::{
body::{Bytes, Incoming},
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Method, Request, Response, Server,
service::service_fn,
Method, Request, Response,
};
use hyper_util::rt::{TokioExecutor, TokioIo};
use once_cell::sync::Lazy;
use opentelemetry::{
metrics::{Counter, Histogram, MeterProvider as _, Unit},
KeyValue,
};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use prometheus::{Encoder, Registry, TextEncoder};
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::net::TcpListener;

static HANDLER_ALL: Lazy<[KeyValue; 1]> = Lazy::new(|| [KeyValue::new("handler", "all")]);

async fn serve_req(
req: Request<Body>,
req: Request<Incoming>,
state: Arc<AppState>,
) -> Result<Response<Body>, hyper::Error> {
) -> Result<Response<Full<Bytes>>, hyper::Error> {
println!("Receiving request at path {}", req.uri());
let request_start = SystemTime::now();

Expand All @@ -38,16 +42,16 @@ async fn serve_req(
Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.body(Full::new(Bytes::from(buffer)))
.unwrap()
}
(&Method::GET, "/") => Response::builder()
.status(200)
.body(Body::from("Hello World"))
.body(Full::new("Hello World".into()))
.unwrap(),
_ => Response::builder()
.status(404)
.body(Body::from("Missing Page"))
.body(Full::new("Missing Page".into()))
.unwrap(),
};

Expand All @@ -67,6 +71,8 @@ struct AppState {

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use hyper_util::server::conn::auto::Builder;

let registry = Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
Expand All @@ -92,23 +98,22 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.init(),
});

// For every connection, we must make a `Service` to handle all
// incoming HTTP requests on said connection.
let make_svc = make_service_fn(move |_conn| {
let state = state.clone();
// This is the `Service` that will handle the connection.
// `service_fn` is a helper to convert a function that
// returns a Response into a `Service`.
async move { Ok::<_, Infallible>(service_fn(move |req| serve_req(req, state.clone()))) }
});

let addr = ([127, 0, 0, 1], 3000).into();

let server = Server::bind(&addr).serve(make_svc);
let addr: SocketAddr = ([127, 0, 0, 1], 3000).into();
let listener = TcpListener::bind(addr).await.unwrap();

println!("Listening on http://{addr}");

server.await?;
while let Ok((stream, _addr)) = listener.accept().await {
if let Err(err) = Builder::new(TokioExecutor::new())
.serve_connection(
TokioIo::new(stream),
service_fn(|req| serve_req(req, state.clone())),
)
.await
{
eprintln!("{err}");
}
}

Ok(())
}
2 changes: 1 addition & 1 deletion opentelemetry-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@
#![doc(
html_logo_url = "https://raw.githubusercontent.com/open-telemetry/opentelemetry-rust/main/assets/logo.svg"
)]
#![cfg_attr(test, deny(warnings))]
//#![cfg_attr(test, deny(warnings))]

pub mod export;
mod instrumentation;
Expand Down

0 comments on commit e52ba2c

Please sign in to comment.