Skip to content

Commit

Permalink
feat(foundations): improve http
Browse files Browse the repository at this point in the history
- Allow custom tls alpn
- Expose socket type
- Fix formatting
- bump deps
  • Loading branch information
TroyKomodo committed Aug 21, 2024
1 parent 5b18cf7 commit 1b96151
Show file tree
Hide file tree
Showing 14 changed files with 449 additions and 553 deletions.
828 changes: 391 additions & 437 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ rand = { version = "0.8", optional = true }
tracing = { version = "0.1", optional = true }
tracing-subscriber = { version = "0.3", optional = true }

opentelemetry = { version = "0.23", optional = true }
opentelemetry_sdk = { version = "0.23", optional = true }
opentelemetry-otlp = { version = "0.16", optional = true, features = ["http-proto"]}
opentelemetry = { version = "0.24", optional = true }
opentelemetry_sdk = { version = "0.24", optional = true }
opentelemetry-otlp = { version = "0.17", optional = true, features = ["http-proto"]}

anyhow = { version = "1" }

Expand Down Expand Up @@ -50,7 +50,7 @@ humantime-serde = { version = "1", optional = true }
axum = { version = "0.7", optional = true }
humantime = { version = "2", optional = true }

tower = { version = "0.4", optional = true }
tower = { version = "0.5", optional = true }

hyper = { version = "1", optional = true }
hyper-util = { version = "0.1", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions foundations/examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1", features = ["tokio", "http2"] }
socket2 = "0.5"
http-body-util = "0.1"
opentelemetry = { version = "0.23" }
opentelemetry = { version = "0.24" }
rand = "0.8"
rustls-pemfile = { version = "2" }
rustls = "0.23"
futures = "0.3.21"
tower = "0.4"
tower = "0.5"
quinn = "0.11"
axum = "0.7"
h3 = "0.0.6"
Expand Down
13 changes: 9 additions & 4 deletions foundations/src/http/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,15 @@ impl ServerBuilder {
insecure_bind: self.insecure_bind,
#[cfg(feature = "http-tls")]
tls: self.tls.map(|mut tls| {
tls.alpn_protocols.clear();
tls.alpn_protocols = vec![b"http/1.1".to_vec()];
if cfg!(feature = "http2") {
tls.alpn_protocols.push(b"h2".to_vec());
if tls.alpn_protocols.is_empty() {
tls.alpn_protocols = vec![b"http/1.1".to_vec()];
if cfg!(feature = "http2") {
tls.alpn_protocols.push(b"h2".to_vec());
}
#[cfg(feature = "http3")]
if self.quic.is_some() {
tls.alpn_protocols.push(b"h3".to_vec());
}
}

Arc::new(tls)
Expand Down
2 changes: 2 additions & 0 deletions foundations/src/http/server/stream/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ impl<S: ServiceHandler> Connection<S> {
stream: stream.clone(),
});

request.extensions_mut().insert(SocketKind::Quic);

request.extensions_mut().insert(ctx.clone());

let connection_context = connection_handle.context();
Expand Down
1 change: 1 addition & 0 deletions foundations/src/http/server/stream/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl<S: ServiceHandler> Connection<S> {
async move {
let ctx = make_ctx();
req.extensions_mut().insert(ctx.clone());
req.extensions_mut().insert(SocketKind::Tcp);
let resp = service.on_request(req.map(Body::new)).await.into_response();
drop(ctx);
Ok::<_, Infallible>(resp)
Expand Down
1 change: 1 addition & 0 deletions foundations/src/http/server/stream/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl<S: ServiceHandler> Connection<S> {
async move {
let ctx = make_ctx();
req.extensions_mut().insert(ctx.clone());
req.extensions_mut().insert(SocketKind::TlsTcp);
let resp = service.on_request(req.map(Body::new)).await.into_response();
drop(ctx);
Ok::<_, Infallible>(resp)
Expand Down
2 changes: 1 addition & 1 deletion foundations/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub use env_filter::{EnvFilter, EnvFilterBuilder};
#[cfg(feature = "opentelemetry")]
pub mod opentelemetry;

#[cfg(any(feature = "pprof-cpu", feature = "pprof-heap",))]
#[cfg(any(feature = "pprof-cpu"))]
pub mod pprof;

#[cfg(feature = "metrics")]
Expand Down
23 changes: 1 addition & 22 deletions foundations/src/telemetry/opentelemetry/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::sync::Arc;
use ::opentelemetry::trace::TraceError;
use itertools::Itertools;
use opentelemetry_otlp::SpanExporter;
use opentelemetry_sdk::Resource;
use thread_local::ThreadLocal;
use tokio::sync::{Mutex, OwnedSemaphorePermit};
#[cfg(not(feature = "runtime"))]
Expand Down Expand Up @@ -34,7 +33,6 @@ mod opentelemetry {

pub struct BatchExporter {
pub interval: tokio::time::Duration,
pub resource: Resource,
pub batch_size: usize,
pub max_concurrent_exports: usize,
pub max_pending_exports: usize,
Expand Down Expand Up @@ -75,11 +73,6 @@ impl BatchExporter {
self
}

pub fn with_resource(&mut self, resource: Resource) -> &mut Self {
self.resource = resource;
self
}

pub fn with_batch_size(&mut self, batch_size: usize) -> &mut Self {
self.batch_size = batch_size;
self
Expand All @@ -95,15 +88,6 @@ impl BatchExporter {
self
}

pub fn with_service_info(&mut self, info: crate::ServiceInfo) -> &mut Self {
self.resource.merge(&Resource::new(vec![
::opentelemetry::KeyValue::new("service.name", info.metric_name),
::opentelemetry::KeyValue::new("service.version", info.version),
]));

self
}

pub fn build(&mut self) -> Self {
std::mem::take(self)
}
Expand All @@ -113,7 +97,6 @@ impl std::fmt::Debug for BatchExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExporterConfig")
.field("interval", &self.interval)
.field("resource", &self.resource)
.field("batch_size", &self.batch_size)
.field("max_concurrent_exports", &self.max_concurrent_exports)
.field("max_pending_exports", &self.max_pending_exports)
Expand All @@ -125,7 +108,6 @@ impl Default for BatchExporter {
fn default() -> Self {
Self {
interval: tokio::time::Duration::from_secs(2),
resource: Resource::empty(),
batch_size: 10_000,
max_concurrent_exports: 10,
max_pending_exports: 15,
Expand Down Expand Up @@ -164,10 +146,7 @@ fn export_batch(internal: Arc<ExportInternal>, batch: Vec<SpanNode>, pending_per
let _permit = internal.concurrent_semaphore.acquire().await.unwrap();
drop(pending_permit);

let batch = batch
.into_iter()
.map(|data| data.into_data(internal.config.resource.clone()))
.collect_vec();
let batch = batch.into_iter().map(|data| data.into_data()).collect_vec();

let size = batch.len();

Expand Down
5 changes: 1 addition & 4 deletions foundations/src/telemetry/opentelemetry/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ pub use opentelemetry::trace::{Link, SpanContext, SpanId, SpanKind, Status, Trac
pub use opentelemetry::KeyValue;
pub use opentelemetry_sdk::export::trace::SpanData;
pub use opentelemetry_sdk::trace::SpanEvents;
pub use opentelemetry_sdk::Resource;
use rand::Rng;
use spin::Lazy;
use tracing::{span, Metadata};
Expand Down Expand Up @@ -231,7 +230,7 @@ impl SpanNode {
std::iter::once(self).chain(children)
}

pub fn into_data(mut self, resource: Resource) -> SpanData {
pub fn into_data(mut self) -> SpanData {
static DEFAULT_SPAN: Lazy<SpanData> = Lazy::new(|| SpanData {
start_time: std::time::SystemTime::UNIX_EPOCH,
end_time: std::time::SystemTime::UNIX_EPOCH,
Expand All @@ -242,7 +241,6 @@ impl SpanNode {
events: SpanEvents::default(),
links: Default::default(),
span_kind: SpanKind::Internal,
resource: Cow::Owned(Resource::empty()),
attributes: Vec::new(),
parent_span_id: SpanId::INVALID,
span_context: SpanContext::new(
Expand Down Expand Up @@ -277,7 +275,6 @@ impl SpanNode {
span.dropped_attributes_count = 0;
span.name = self.metadata.name().into();
span.attributes = self.attributes;
span.resource = Cow::Owned(resource.clone());
span.parent_span_id = self.mapped_parent_id.unwrap_or(SpanId::INVALID);
span.span_context = SpanContext::new(self.trace_id, self.mapped_id, TraceFlags::SAMPLED, false, TraceState::NONE);
span.events.events = self.events.into_iter().map(|e| e.into_data()).collect();
Expand Down
35 changes: 0 additions & 35 deletions foundations/src/telemetry/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ pub struct ServerSettings {
pub builder: crate::http::server::ServerBuilder,
#[cfg(feature = "pprof-cpu")]
pub pprof_cpu_path: Option<String>,
#[cfg(feature = "pprof-heap")]
pub pprof_heap_path: Option<String>,
#[cfg(feature = "metrics")]
pub metrics_path: Option<String>,
#[cfg(feature = "health-check")]
Expand All @@ -25,8 +23,6 @@ impl Default for ServerSettings {
builder: SocketAddr::from(([127, 0, 0, 1], 9000)).into(),
#[cfg(feature = "pprof-cpu")]
pprof_cpu_path: Some("/debug/pprof/profile".into()),
#[cfg(feature = "pprof-heap")]
pprof_heap_path: Some("/debug/pprof/heap".into()),
#[cfg(feature = "metrics")]
metrics_path: Some("/metrics".into()),
#[cfg(feature = "health-check")]
Expand Down Expand Up @@ -104,32 +100,6 @@ async fn pprof_cpu(
}
}

#[cfg(feature = "pprof-heap")]
async fn pprof_heap() -> axum::response::Response<axum::body::Body> {
match tokio::task::spawn_blocking(|| crate::telemetry::pprof::Heap::new().capture()).await {
Ok(Ok(contents)) => axum::response::Response::builder()
.status(axum::http::StatusCode::OK)
.header("content-type", "application/octet-stream")
.header("content-disposition", "attachment; filename=\"heap.pb.gz\"")
.body(contents.into())
.unwrap(),
Ok(Err(err)) => {
tracing::error!(%err, "failed to capture pprof heap profile");
axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body("failed to capture pprof heap profile".into())
.unwrap()
}
Err(err) => {
tracing::error!(%err, "failed to spawn blocking task");
axum::response::Response::builder()
.status(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
.body("failed to spawn blocking task".into())
.unwrap()
}
}
}

#[derive(serde::Deserialize, Default)]
#[serde(default)]
struct MetricsQuery {
Expand Down Expand Up @@ -298,11 +268,6 @@ pub async fn init(settings: ServerSettings) -> anyhow::Result<()> {
router = router.route(path, axum::routing::get(pprof_cpu));
}

#[cfg(feature = "pprof-heap")]
if let Some(path) = &settings.pprof_heap_path {
router = router.route(path, axum::routing::get(pprof_heap));
}

#[cfg(feature = "metrics")]
if let Some(path) = &settings.metrics_path {
router = router.route(path, axum::routing::get(metrics));
Expand Down
68 changes: 30 additions & 38 deletions foundations/src/telemetry/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,7 @@ pub enum LoggingSettingsFormat {
Compact,
}

#[cfg(all(
any(feature = "pprof-cpu", feature = "pprof-heap", feature = "metrics",),
feature = "telemetry-server"
))]
#[cfg(all(any(feature = "pprof-cpu", feature = "metrics"), feature = "telemetry-server"))]
#[crate::settings::auto_settings(crate_path = "crate")]
#[serde(default)]
pub struct ServerSettings {
Expand All @@ -252,11 +249,6 @@ pub struct ServerSettings {
/// The address to bind the server to.
#[settings(default = SocketAddr::from(([127, 0, 0, 1], 9000)))]
pub bind: SocketAddr,
/// The path to the pprof heap endpoint. If `None`, the endpoint is
/// disabled.
#[cfg(feature = "pprof-heap")]
#[settings(default = Some("/debug/pprof/heap".into()))]
pub pprof_heap_path: Option<String>,
/// The path to the pprof CPU endpoint. If `None`, the endpoint is disabled.
#[cfg(feature = "pprof-cpu")]
#[settings(default = Some("/debug/pprof/profile".into()))]
Expand Down Expand Up @@ -310,27 +302,6 @@ pub async fn init(info: crate::ServiceInfo, settings: TelemetrySettings) {
interval: settings.opentelemetry.interval,
#[cfg(feature = "metrics")]
metrics: settings.opentelemetry.metrics,
resource: {
let mut kv = vec![];

if !settings.opentelemetry.labels.contains_key("service.name") {
kv.push(opentelemetry::KeyValue::new("service.name", info.metric_name));
}

if !settings.opentelemetry.labels.contains_key("service.version") {
kv.push(opentelemetry::KeyValue::new("service.version", info.version));
}

kv.extend(
settings
.opentelemetry
.labels
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())),
);

Resource::new(kv)
},
drop_handler: {
const DROPPED_SPANS_ERROR: &str = "opentelemetry exporter dropped spans due to backpressure";

Expand Down Expand Up @@ -399,7 +370,7 @@ pub async fn init(info: crate::ServiceInfo, settings: TelemetrySettings) {
},
},
{
match settings.opentelemetry.otlp_method {
let mut exporter = match settings.opentelemetry.otlp_method {
OpentelemetrySettingsExportMethod::Grpc => opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(settings.opentelemetry.otlp_endpoint.clone())
Expand All @@ -411,7 +382,33 @@ pub async fn init(info: crate::ServiceInfo, settings: TelemetrySettings) {
.with_timeout(settings.opentelemetry.otlp_timeout)
.build_span_exporter(),
}
.expect("failed to build otlp exporter")
.expect("failed to build otlp exporter");

use opentelemetry_sdk::export::trace::SpanExporter;

exporter.set_resource(&{
let mut kv = vec![];

if !settings.opentelemetry.labels.contains_key("service.name") {
kv.push(opentelemetry::KeyValue::new("service.name", info.metric_name));
}

if !settings.opentelemetry.labels.contains_key("service.version") {
kv.push(opentelemetry::KeyValue::new("service.version", info.version));
}

kv.extend(
settings
.opentelemetry
.labels
.iter()
.map(|(k, v)| opentelemetry::KeyValue::new(k.clone(), v.clone())),
);

Resource::new(kv)
});

exporter
},
)
.with_filter(super::LevelFilter::new(&settings.opentelemetry.level).filter()),
Expand Down Expand Up @@ -454,10 +451,7 @@ pub async fn init(info: crate::ServiceInfo, settings: TelemetrySettings) {
registry.init();
}

#[cfg(all(
any(feature = "pprof-cpu", feature = "pprof-heap", feature = "metrics",),
feature = "telemetry-server"
))]
#[cfg(all(any(feature = "pprof-cpu", feature = "metrics"), feature = "telemetry-server"))]
if settings.server.enabled {
#[cfg(not(feature = "runtime"))]
use tokio::spawn;
Expand All @@ -472,8 +466,6 @@ pub async fn init(info: crate::ServiceInfo, settings: TelemetrySettings) {
metrics_path: settings.server.metrics_path,
#[cfg(feature = "pprof-cpu")]
pprof_cpu_path: settings.server.pprof_cpu_path,
#[cfg(feature = "pprof-heap")]
pprof_heap_path: settings.server.pprof_heap_path,
#[cfg(feature = "health-check")]
health_path: settings.server.health_path,
#[cfg(feature = "health-check")]
Expand Down
2 changes: 1 addition & 1 deletion image-processor/src/management/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl ManagementServer {
.serve_with_shutdown(addr, scuffle_foundations::context::Context::global().into_done());

tracing::info!("gRPC management server listening on {}", addr);

server.await
}
}
Expand Down
10 changes: 5 additions & 5 deletions image-processor/src/management/utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
pub async fn true_bind(addr: std::net::SocketAddr) -> std::io::Result<std::net::SocketAddr> {
if addr.port() == 0 {
let bind = tokio::net::TcpListener::bind(addr).await?;
bind.local_addr()
} else {
Ok(addr)
}
let bind = tokio::net::TcpListener::bind(addr).await?;
bind.local_addr()
} else {
Ok(addr)
}
}

0 comments on commit 1b96151

Please sign in to comment.