Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f4e5ae2d44c6e580a5a9a7cc5a80b07c69c95840
71f26703aeb326cc03ccf2d200a1784c915ffb49
30 changes: 21 additions & 9 deletions pingora-core/src/listeners/tls/rustls/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use log::debug;
use pingora_error::ErrorType::InternalError;
use pingora_error::{Error, OrErr, Result};
use pingora_rustls::load_certs_and_key_files;
use pingora_rustls::ClientCertVerifier;
use pingora_rustls::ServerConfig;
use pingora_rustls::{version, TlsAcceptor as RusTlsAcceptor};

Expand All @@ -30,6 +31,7 @@ pub struct TlsSettings {
alpn_protocols: Option<Vec<Vec<u8>>>,
cert_path: String,
key_path: String,
client_cert_verifier: Option<Arc<dyn ClientCertVerifier>>,
}

pub struct Acceptor {
Expand All @@ -54,15 +56,19 @@ impl TlsSettings {
)
};

// TODO - Add support for client auth & custom CA support
let mut config =
ServerConfig::builder_with_protocol_versions(&[&version::TLS12, &version::TLS13])
.with_no_client_auth()
.with_single_cert(certs, key)
.explain_err(InternalError, |e| {
format!("Failed to create server listener config: {e}")
})
.unwrap();
let builder =
ServerConfig::builder_with_protocol_versions(&[&version::TLS12, &version::TLS13]);
let builder = if let Some(verifier) = self.client_cert_verifier {
builder.with_client_cert_verifier(verifier)
} else {
builder.with_no_client_auth()
};
let mut config = builder
.with_single_cert(certs, key)
.explain_err(InternalError, |e| {
format!("Failed to create server listener config: {e}")
})
.unwrap();

if let Some(alpn_protocols) = self.alpn_protocols {
config.alpn_protocols = alpn_protocols;
Expand All @@ -84,6 +90,11 @@ impl TlsSettings {
self.alpn_protocols = Some(alpn.to_wire_protocols());
}

/// Configure mTLS by providing a rustls client certificate verifier.
pub fn set_client_cert_verifier(&mut self, verifier: Arc<dyn ClientCertVerifier>) {
self.client_cert_verifier = Some(verifier);
}

pub fn intermediate(cert_path: &str, key_path: &str) -> Result<Self>
where
Self: Sized,
Expand All @@ -92,6 +103,7 @@ impl TlsSettings {
alpn_protocols: None,
cert_path: cert_path.to_string(),
key_path: key_path.to_string(),
client_cert_verifier: None,
})
}

Expand Down
6 changes: 6 additions & 0 deletions pingora-core/src/protocols/http/custom/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ pub trait Session: Send + Sync + Unpin + 'static {

fn response_header(&self) -> Option<&ResponseHeader>;

fn was_upgraded(&self) -> bool;

fn digest(&self) -> Option<&Digest>;

fn digest_mut(&mut self) -> Option<&mut Digest>;
Expand Down Expand Up @@ -118,6 +120,10 @@ impl Session for () {
unreachable!("client session: response_header")
}

fn was_upgraded(&self) -> bool {
unreachable!("client session: was upgraded")
}

fn digest(&self) -> Option<&Digest> {
unreachable!("client session: digest")
}
Expand Down
2 changes: 2 additions & 0 deletions pingora-core/src/protocols/http/custom/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ impl CustomMessageWrite for () {
pub trait BodyWrite: Send + Sync + Unpin + 'static {
async fn write_all_buf(&mut self, data: &mut Bytes) -> Result<()>;
async fn finish(&mut self) -> Result<()>;
async fn cleanup(&mut self) -> Result<()>;
fn upgrade_body_writer(&mut self);
}

pub async fn drain_custom_messages(
Expand Down
6 changes: 5 additions & 1 deletion pingora-core/src/protocols/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ pub const SERVER_NAME: &[u8; 7] = b"Pingora";
pub enum HttpTask {
/// the response header and the boolean end of response flag
Header(Box<pingora_http::ResponseHeader>, bool),
/// A piece of response body and the end of response boolean flag
/// A piece of request or response body and the end of request/response boolean flag.
Body(Option<bytes::Bytes>, bool),
/// Request or response body bytes that have been upgraded on H1.1, and EOF bool flag.
UpgradedBody(Option<bytes::Bytes>, bool),
/// HTTP response trailer
Trailer(Option<Box<http::HeaderMap>>),
/// Signal that the response is already finished
Expand All @@ -53,6 +55,7 @@ impl HttpTask {
match self {
HttpTask::Header(_, end) => *end,
HttpTask::Body(_, end) => *end,
HttpTask::UpgradedBody(_, end) => *end,
HttpTask::Trailer(_) => true,
HttpTask::Done => true,
HttpTask::Failed(_) => true,
Expand All @@ -64,6 +67,7 @@ impl HttpTask {
match self {
HttpTask::Header(..) => "Header",
HttpTask::Body(..) => "Body",
HttpTask::UpgradedBody(..) => "UpgradedBody",
HttpTask::Trailer(_) => "Trailer",
HttpTask::Done => "Done",
HttpTask::Failed(_) => "Failed",
Expand Down
29 changes: 27 additions & 2 deletions pingora-core/src/protocols/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::protocols::{Digest, SocketAddr, Stream};
use bytes::Bytes;
use http::HeaderValue;
use http::{header::AsHeaderName, HeaderMap};
use pingora_error::Result;
use pingora_error::{Error, Result};
use pingora_http::{RequestHeader, ResponseHeader};
use std::time::Duration;

Expand Down Expand Up @@ -252,6 +252,21 @@ impl Session {
}
}

/// Callback for cleanup logic on downstream specifically when we fail to proxy the session
/// other than cleanup via finish().
///
/// If caching the downstream failure may be independent of (and precede) an upstream error in
/// which case this function may be called more than once.
pub fn on_proxy_failure(&mut self, e: Box<Error>) {
match self {
Self::H1(_) | Self::H2(_) | Self::Custom(_) => {
// all cleanup logic handled in finish(),
// stream and resources dropped when session dropped
}
Self::Subrequest(ref mut s) => s.on_proxy_failure(e),
}
}

pub async fn response_duplex_vec(&mut self, tasks: Vec<HttpTask>) -> Result<bool> {
match self {
Self::H1(s) => s.response_duplex_vec(tasks).await,
Expand Down Expand Up @@ -648,7 +663,7 @@ impl Session {
}
}

/// Whether this request is for upgrade (e.g., websocket)
/// Whether this request is for upgrade (e.g., websocket).
pub fn is_upgrade_req(&self) -> bool {
match self {
Self::H1(s) => s.is_upgrade_req(),
Expand All @@ -658,6 +673,16 @@ impl Session {
}
}

/// Whether this session was fully upgraded (completed Upgrade handshake).
pub fn was_upgraded(&self) -> bool {
match self {
Self::H1(s) => s.was_upgraded(),
Self::H2(_) => false,
Self::Subrequest(s) => s.was_upgraded(),
Self::Custom(_) => false,
}
}

/// Return how many response body bytes (application, not wire) already sent downstream
pub fn body_bytes_sent(&self) -> usize {
match self {
Expand Down
11 changes: 11 additions & 0 deletions pingora-core/src/protocols/http/subrequest/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ impl BodyReader {
self.body_state = PS::UntilClose(0);
}

// Convert how we interpret the remainder of the body as pass through.
pub fn convert_to_until_close(&mut self) {
if matches!(self.body_state, PS::UntilClose(_)) {
// nothing to do
return;
}

// reset body counter
self.body_state = PS::UntilClose(0);
}

pub fn body_done(&self) -> bool {
matches!(self.body_state, PS::Complete(_) | PS::Done(_))
}
Expand Down
104 changes: 82 additions & 22 deletions pingora-core/src/protocols/http/subrequest/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct HttpSession {
// Currently subrequest session is initialized via a dummy SessionV1 only
// TODO: need to be able to indicate H2 / other HTTP versions here
v1_inner: Box<SessionV1>,
proxy_error: Option<oneshot::Sender<Box<Error>>>, // option to consume the sender
read_req_header: bool,
response_written: Option<ResponseHeader>,
read_timeout: Option<Duration>,
Expand All @@ -84,8 +85,9 @@ pub struct SubrequestHandle {
/// Channel receiver (for subrequest output)
pub rx: mpsc::Receiver<HttpTask>,
/// Indicates when subrequest wants to start reading body input
// TODO: use when piping subrequest input/output
pub subreq_wants_body: oneshot::Receiver<()>,
/// Any final or downstream error that was encountered while proxying
pub subreq_proxy_error: oneshot::Receiver<Box<Error>>,
}

impl SubrequestHandle {
Expand All @@ -111,11 +113,13 @@ impl HttpSession {
let (downstream_tx, downstream_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let (upstream_tx, upstream_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let (wants_body_tx, wants_body_rx) = oneshot::channel();
let (proxy_error_tx, proxy_error_rx) = oneshot::channel();
(
HttpSession {
v1_inner: Box::new(v1_inner),
tx: Some(upstream_tx),
rx: Some(downstream_rx),
proxy_error: Some(proxy_error_tx),
body_reader: BodyReader::new(Some(wants_body_tx)),
body_writer: BodyWriter::new(),
read_req_header: false,
Expand All @@ -134,6 +138,7 @@ impl HttpSession {
tx: downstream_tx,
rx: upstream_rx,
subreq_wants_body: wants_body_rx,
subreq_proxy_error: proxy_error_rx,
},
)
}
Expand Down Expand Up @@ -321,11 +326,25 @@ impl HttpSession {
// a peer discards any further data received.
// https://www.rfc-editor.org/rfc/rfc6455#section-1.4
self.upgraded = true;
// Now that the upgrade was successful, we need to change
// how we interpret the rest of the body as pass-through.
if self.body_reader.need_init() {
self.init_body_reader();
} else {
// already initialized
// immediately start reading the rest of the body as upgraded
// (in theory most upgraded requests shouldn't have any body)
//
// TODO: https://datatracker.ietf.org/doc/html/rfc9110#name-upgrade
// the most spec-compliant behavior is to switch interpretation
// after sending the former body. For now we immediately
// switch interpretation to match nginx behavior.
// TODO: this has no effect resetting the body counter of TE chunked
self.body_reader.convert_to_until_close();
}
} else {
debug!("bad upgrade handshake!");
// reset request body buf and mark as done
// safe to reset an upgrade because it doesn't have body
self.body_reader.init_content_length(0);
// continue to read body as-is, this is now just a regular request
}
}
self.init_body_writer(&header);
Expand Down Expand Up @@ -360,6 +379,16 @@ impl HttpSession {
self.v1_inner.is_upgrade(header)
}

/// Was this request successfully turned into an upgraded connection?
///
/// Both the request had to have been an `Upgrade` request
/// and the response had to have been a `101 Switching Protocols`.
// XXX: this should only be valid if subrequest is standing in for
// a v1 session.
pub fn was_upgraded(&self) -> bool {
self.upgraded
}

fn init_body_writer(&mut self, header: &ResponseHeader) {
use http::StatusCode;
/* the following responses don't have body 204, 304, and HEAD */
Expand Down Expand Up @@ -453,6 +482,21 @@ impl HttpSession {
Ok(res)
}

/// Signal to error listener held by SubrequestHandle that a proxy error was encountered,
/// and pass along what that error was.
///
/// This is helpful to signal what errors were encountered outside of the proxy state machine,
/// e.g. during subrequest request filters.
///
/// Note: in the case of multiple proxy failures e.g. when caching, only the first error will
/// be propagated (i.e. downstream error first if it goes away before upstream).
pub fn on_proxy_failure(&mut self, e: Box<Error>) {
// fine if handle is gone
if let Some(sender) = self.proxy_error.take() {
let _ = sender.send(e);
}
}

/// Return how many response body bytes (application, not wire) already sent downstream
pub fn body_bytes_sent(&self) -> usize {
self.body_bytes_sent
Expand Down Expand Up @@ -659,6 +703,24 @@ impl HttpSession {
Ok(())
}

async fn write_non_empty_body(&mut self, data: Option<Bytes>, upgraded: bool) -> Result<()> {
if upgraded != self.upgraded {
if upgraded {
panic!("Unexpected UpgradedBody task received on un-upgraded downstream session (subrequest)");
} else {
panic!("Unexpected Body task received on upgraded downstream session (subrequest)");
}
}
let Some(d) = data else {
return Ok(());
};
if d.is_empty() {
return Ok(());
}
self.write_body(d).await.map_err(|e| e.into_down())?;
Ok(())
}

async fn response_duplex(&mut self, task: HttpTask) -> Result<bool> {
let end_stream = match task {
HttpTask::Header(header, end_stream) => {
Expand All @@ -667,15 +729,14 @@ impl HttpSession {
.map_err(|e| e.into_down())?;
end_stream
}
HttpTask::Body(data, end_stream) => match data {
Some(d) => {
if !d.is_empty() {
self.write_body(d).await.map_err(|e| e.into_down())?;
}
end_stream
}
None => end_stream,
},
HttpTask::Body(data, end_stream) => {
self.write_non_empty_body(data, false).await?;
end_stream
}
HttpTask::UpgradedBody(data, end_stream) => {
self.write_non_empty_body(data, true).await?;
end_stream
}
HttpTask::Trailer(trailers) => {
self.write_trailers(trailers).await?;
true
Expand Down Expand Up @@ -707,15 +768,14 @@ impl HttpSession {
.map_err(|e| e.into_down())?;
end_stream
}
HttpTask::Body(data, end_stream) => match data {
Some(d) => {
if !d.is_empty() {
self.write_body(d).await.map_err(|e| e.into_down())?;
}
end_stream
}
None => end_stream,
},
HttpTask::Body(data, end_stream) => {
self.write_non_empty_body(data, false).await?;
end_stream
}
HttpTask::UpgradedBody(data, end_stream) => {
self.write_non_empty_body(data, true).await?;
end_stream
}
HttpTask::Done => {
// write done
// we'll send HttpTask::Done at the end of this loop in finish
Expand Down
Loading
Loading