Skip to content

Commit

Permalink
feat(core): metrics and tracing middleware for OpenTelemetry(OTEL) (v…
Browse files Browse the repository at this point in the history
…iz-rs#51)

* feat(core): opentelemetry
  • Loading branch information
fundon authored Sep 26, 2022
1 parent 95579e9 commit 8b11a1e
Show file tree
Hide file tree
Showing 21 changed files with 649 additions and 43 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"rustls",
"static-routes",
"routing/todos",
"otel/*",
]
exclude = ["tls", "target"]

Expand Down
3 changes: 3 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ Here you can find a lot of small crabs 🦀.
* [HTTPS/TLS - rustls](rustls)
* [Defined a static router](static-routes)
* [Todos](routing/todos)
* [Integration Opentelemetry(OTEL)](https://github.com/open-telemetry/opentelemetry-rust)
* [Tracing](otel/tracing)
* [Metrics & Prometheus](otel/metrics)

### Usage

Expand Down
14 changes: 14 additions & 0 deletions examples/otel/metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "otel-metrics"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
viz = { path = "../../../viz", features = ["otel-metrics", "otel-prometheus"] }

tokio = { version = "1.0", features = [
"rt-multi-thread",
"macros",
] }
opentelemetry = { version = "0.18.0", default-features = false, features = ["metrics"]}
53 changes: 53 additions & 0 deletions examples/otel/metrics/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#![deny(warnings)]

use std::net::SocketAddr;

use opentelemetry::{
global,
sdk::{
export::metrics::aggregation,
metrics::{controllers, processors, selectors},
},
};

use viz::{
handlers::prometheus::{ExporterBuilder, Prometheus},
middleware::otel,
Request, Result, Router, Server, ServiceMaker,
};

async fn index(_: Request) -> Result<&'static str> {
Ok("Hello, World!")
}

#[tokio::main]
async fn main() -> Result<()> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("listening on {}", addr);

let exporter = {
let controller = controllers::basic(
processors::factory(
selectors::simple::histogram([1.0, 2.0, 5.0, 10.0, 20.0, 50.0]),
aggregation::cumulative_temporality_selector(),
)
.with_memory(true),
)
.build();
ExporterBuilder::new(controller).init()
};

let meter = global::meter("viz");

let app = Router::new()
.get("/", index)
.get("/:username", index)
.get("/metrics", Prometheus::new(exporter))
.with(otel::metrics::Config::new(meter));

if let Err(err) = Server::bind(&addr).serve(ServiceMaker::from(app)).await {
println!("{}", err);
}

Ok(())
}
15 changes: 15 additions & 0 deletions examples/otel/tracing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "otel-tracing"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
viz = { path = "../../../viz", features = ["otel-tracing"] }

tokio = { version = "1.0", features = [
"rt-multi-thread",
"macros",
] }
opentelemetry = "0.18.0"
opentelemetry-jaeger = { version = "0.17.0", features = ["rt-tokio-current-thread"]}
40 changes: 40 additions & 0 deletions examples/otel/tracing/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#![deny(warnings)]

use opentelemetry::{
global,
runtime::TokioCurrentThread,
sdk::{propagation::TraceContextPropagator, trace::Tracer},
};
use std::net::SocketAddr;
use viz::{middleware::otel, Request, Result, Router, Server, ServiceMaker};

fn init_tracer() -> Tracer {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("viz")
.install_batch(TokioCurrentThread)
.unwrap()
}

async fn index(_: Request) -> Result<&'static str> {
Ok("Hello, World!")
}

#[tokio::main]
async fn main() -> Result<()> {
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
println!("listening on {}", addr);

let tracer = init_tracer();

let app = Router::new()
.get("/", index)
.get("/:username", index)
.with(otel::tracing::Config::new(tracer));

if let Err(err) = Server::bind(&addr).serve(ServiceMaker::from(app)).await {
println!("{}", err);
}

Ok(())
}
6 changes: 3 additions & 3 deletions examples/static-routes/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ async fn main() -> Result<()> {
pub async fn serve(mut req: Request, mut addr: Option<SocketAddr>) -> Result<Response, Infallible> {
let method = req.method().to_owned();
let path = req.path().to_owned();

Ok(
let responed = Ok(
match TREE.find(&method, &path).or_else(|| {
if method == Method::HEAD {
TREE.find(&Method::GET, &path)
Expand All @@ -63,5 +62,6 @@ pub async fn serve(mut req: Request, mut addr: Option<SocketAddr>) -> Result<Res
}
None => StatusCode::NOT_FOUND.into_response(),
},
)
);
responed
}
11 changes: 10 additions & 1 deletion viz-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ default = [
"multipart",
"websocket",
"cookie",
"session",
"session"
]

state = []
Expand All @@ -46,6 +46,10 @@ session = ["cookie", "json", "dep:sessions-core"]
csrf = ["cookie", "dep:base64", "dep:getrandom"]
cors = []

otel = ["dep:opentelemetry", "dep:opentelemetry-semantic-conventions"]
otel-tracing = ["otel", "opentelemetry?/trace"]
otel-metrics = ["otel", "opentelemetry?/metrics"]

[dependencies]
async-trait = "0.1.56"
bytes = "1.1"
Expand All @@ -69,10 +73,15 @@ sessions-core = { version = "0.3.4", optional = true }
getrandom = { version = "0.2.7", optional = true }
base64 = { version = "0.13.0", optional = true }

# OpenTelemetry
opentelemetry = { version = "0.18.0", default-features = false, optional = true }
opentelemetry-semantic-conventions = { version = "0.10.0", optional = true }

tokio = { version = "1.21", optional = true }
tokio-tungstenite = { version = "0.17.2", optional = true }
tokio-stream = { version = "0.1.10", optional = true }
tokio-util = { version = "0.7.4", optional = true }
rfc7239 = "0.1.0"

[dev-dependencies]
tokio = { version = "1.21", features = ["rt-multi-thread", "macros"] }
Expand Down
6 changes: 4 additions & 2 deletions viz-core/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,16 @@ Middleware | Description
[csrf][m:csrf] | CSRF
[limits][m:limits] | Limits
[session][m:session] | Session
[opentelemetry][m:opentelemetry] | OpenTelemetry
[otel::tracing][m:otel::tracing] | OpenTelemetry Tracing
[otel::metrics][m:otel::metrics] | OpenTelemetry Metrics

[m:cookie]: https://docs.rs/viz-core/latest/viz_core/middleware/cookie
[m:cors]: https://docs.rs/viz-core/latest/viz_core/middleware/cors
[m:csrf]: https://docs.rs/viz-core/latest/viz_core/middleware/csrf
[m:limits]: https://docs.rs/viz-core/latest/viz_core/middleware/limits
[m:session]: https://docs.rs/viz-core/latest/viz_core/middleware/session
[m:opentelemetry]: https://docs.rs/viz-core/latest/viz_core/middleware/opentelemetry
[m:otel::tracing]: https://docs.rs/viz-core/latest/viz_core/middleware/otel/tracing
[m:otel::metrics]: https://docs.rs/viz-core/latest/viz_core/middleware/otel/metrics


## License
Expand Down
3 changes: 3 additions & 0 deletions viz-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ pub mod limits;
#[cfg(feature = "session")]
pub mod session;

#[cfg(feature = "otel")]
pub mod otel;

#[cfg(feature = "cookie")]
pub mod helper;
168 changes: 168 additions & 0 deletions viz-core/src/middleware/otel/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
//! Request metrics middleware with [OpenTelemetry].
//!
//! [OpenTelemetry]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/semantic_conventions/http-metrics.md
use std::time::SystemTime;

use http::uri::Scheme;
use opentelemetry::{
metrics::{Histogram, Meter, Unit, UpDownCounter},
Context, KeyValue,
};
use opentelemetry_semantic_conventions::trace::{
HTTP_CLIENT_IP,
HTTP_FLAVOR,
HTTP_HOST,
HTTP_METHOD,
// HTTP_RESPONSE_CONTENT_LENGTH,
HTTP_ROUTE,
HTTP_SCHEME, // , HTTP_SERVER_NAME
HTTP_STATUS_CODE,
HTTP_TARGET,
HTTP_USER_AGENT,
NET_HOST_PORT,
NET_PEER_IP,
};

use crate::{
async_trait, header::USER_AGENT, types::RealIp, Handler, IntoResponse, Request, RequestExt,
Response, Result, Transform,
};

const HTTP_SERVER_ACTIVE_REQUESTS: &str = "http.server.active_requests";
const HTTP_SERVER_DURATION: &str = "http.server.duration";

/// Request metrics middleware config.
#[derive(Clone, Debug)]
pub struct Config {
active_requests: UpDownCounter<i64>,
duration: Histogram<f64>,
}

impl Config {
/// Creates a new Config
pub fn new(meter: Meter) -> Self {
let active_requests = meter
.i64_up_down_counter(HTTP_SERVER_ACTIVE_REQUESTS)
.with_description("HTTP concurrent in-flight requests per route")
.init();

let duration = meter
.f64_histogram(HTTP_SERVER_DURATION)
.with_description("HTTP inbound request duration per route")
.with_unit(Unit::new("ms"))
.init();

Config {
active_requests,
duration,
}
}
}

impl<H> Transform<H> for Config {
type Output = MetricsMiddleware<H>;

fn transform(&self, h: H) -> Self::Output {
MetricsMiddleware {
h,
active_requests: self.active_requests.clone(),
duration: self.duration.clone(),
}
}
}

/// Request metrics middleware with OpenTelemetry.
#[derive(Debug, Clone)]
pub struct MetricsMiddleware<H> {
h: H,
active_requests: UpDownCounter<i64>,
duration: Histogram<f64>,
}

#[async_trait]
impl<H, O> Handler<Request> for MetricsMiddleware<H>
where
O: IntoResponse,
H: Handler<Request, Output = Result<O>> + Clone,
{
type Output = Result<Response>;

async fn call(&self, req: Request) -> Self::Output {
let timer = SystemTime::now();
let cx = Context::current();
let mut attributes = build_attributes(&req, &req.route().path);

self.active_requests.add(&cx, 1, &attributes);

let res = self
.h
.call(req)
.await
.map(IntoResponse::into_response)
.map(|resp| {
self.active_requests.add(&cx, -1, &attributes);

attributes.push(HTTP_STATUS_CODE.i64(resp.status().as_u16() as i64));

resp
});

self.duration.record(
&cx,
timer
.elapsed()
.map(|t| t.as_secs_f64() * 1000.0)
.unwrap_or_default(),
&attributes,
);

res
}
}

fn build_attributes(req: &Request, http_route: &String) -> Vec<KeyValue> {
let mut attributes = Vec::with_capacity(10);
attributes.push(
HTTP_SCHEME.string(
req.schema()
.or(Some(&Scheme::HTTP))
.map(ToString::to_string)
.unwrap(),
),
);
attributes.push(HTTP_FLAVOR.string(format!("{:?}", req.version())));
attributes.push(HTTP_METHOD.string(req.method().to_string()));
attributes.push(HTTP_ROUTE.string(http_route.to_owned()));
if let Some(path_and_query) = req.uri().path_and_query() {
attributes.push(HTTP_TARGET.string(path_and_query.as_str().to_string()));
}
if let Some(host) = req.uri().host() {
attributes.push(HTTP_HOST.string(host.to_string()));
}
if let Some(user_agent) = req.headers().get(USER_AGENT).and_then(|s| s.to_str().ok()) {
attributes.push(HTTP_USER_AGENT.string(user_agent.to_string()));
}
let realip = RealIp::parse(req);
if let Some(realip) = realip {
attributes.push(HTTP_CLIENT_IP.string(realip.0.to_string()));
}
// if server_name != host {
// attributes.insert(HTTP_SERVER_NAME, server_name.to_string().into());
// }
if let Some(remote_ip) = req.remote_addr().map(|add| add.ip()) {
if realip.map(|realip| realip.0 != remote_ip).unwrap_or(true) {
// Client is going through a proxy
attributes.push(NET_PEER_IP.string(remote_ip.to_string()));
}
}
if let Some(port) = req
.uri()
.port_u16()
.filter(|port| *port != 80 || *port != 443)
{
attributes.push(NET_HOST_PORT.i64(port as i64));
}

attributes
}
Loading

0 comments on commit 8b11a1e

Please sign in to comment.