Skip to content

Commit

Permalink
Weaken or remove etags when downstream compression applies
Browse files Browse the repository at this point in the history
Allow this as an adjustable setting to preserve etag when
(de)compressing. The default is to weaken etags whenever a compression
module applies.
  • Loading branch information
drcaramelsyrup authored and eaufavor committed Aug 30, 2024
1 parent 1b9e8ee commit 4f45792
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
aadd07a5b3064b0fbdf57c8c02a5ef7b65b5fc03
fdc26b8f9eefd902a1e27d4ef1aafc480a6db305
2 changes: 1 addition & 1 deletion pingora-core/src/modules/http/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl ResponseCompressionBuilder {
impl HttpModuleBuilder for ResponseCompressionBuilder {
fn init(&self) -> Module {
Box::new(ResponseCompression(ResponseCompressionCtx::new(
self.level, false,
self.level, false, false,
)))
}

Expand Down
129 changes: 108 additions & 21 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ enum CtxInner {
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
decompress_enable: [bool; Algorithm::COUNT],
preserve_etag: [bool; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}
Expand All @@ -78,11 +79,14 @@ impl ResponseCompressionCtx {
/// Create a new [`ResponseCompressionCtx`] with the expected compression level. `0` will disable
/// the compression. The compression level is applied across all algorithms.
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
/// The `preserve_etag` flag indicates whether the ctx should avoid modifying the etag,
/// which will otherwise be weakened if the flag is false and (de)compression is applied.
pub fn new(compression_level: u32, decompress_enable: bool, preserve_etag: bool) -> Self {
Self(CtxInner::HeaderPhase {
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
decompress_enable: [decompress_enable; Algorithm::COUNT],
preserve_etag: [preserve_etag; Algorithm::COUNT],
})
}

Expand Down Expand Up @@ -166,16 +170,38 @@ impl ResponseCompressionCtx {
}
}

/// Adjust preserve etag setting.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_preserve_etag(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase { preserve_etag, .. } => {
*preserve_etag = [enabled; Algorithm::COUNT];
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Adjust preserve etag setting for a specific algorithm.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_algorithm_preserve_etag(&mut self, algorithm: Algorithm, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase { preserve_etag, .. } => {
preserve_etag[algorithm.index()] = enabled;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Feed the request header into this ctx.
pub fn request_filter(&mut self, req: &RequestHeader) {
if !self.is_enabled() {
return;
}
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding,
encoding_levels: _,
accept_encoding, ..
} => parse_accept_encoding(
req.headers.get(http::header::ACCEPT_ENCODING),
accept_encoding,
Expand All @@ -192,6 +218,7 @@ impl ResponseCompressionCtx {
match &self.0 {
CtxInner::HeaderPhase {
decompress_enable,
preserve_etag,
accept_encoding,
encoding_levels: levels,
} => {
Expand Down Expand Up @@ -221,15 +248,22 @@ impl ResponseCompressionCtx {
}

let action = decide_action(resp, accept_encoding);
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
let (encoder, preserve_etag) = match action {
Action::Noop => (None, false),
Action::Compress(algorithm) => {
let idx = algorithm.index();
(algorithm.compressor(levels[idx]), preserve_etag[idx])
}
Action::Decompress(algorithm) => {
algorithm.decompressor(decompress_enable[algorithm.index()])
let idx = algorithm.index();
(
algorithm.decompressor(decompress_enable[idx]),
preserve_etag[idx],
)
}
};
if encoder.is_some() {
adjust_response_header(resp, &action);
adjust_response_header(resp, &action, preserve_etag);
}
self.0 = CtxInner::BodyPhase(encoder);
}
Expand All @@ -242,11 +276,7 @@ impl ResponseCompressionCtx {
/// Return None if the compressed is not enabled
pub fn response_body_filter(&mut self, data: Option<&Bytes>, end: bool) -> Option<Bytes> {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => panic!("Wrong phase: HeaderPhase"),
CtxInner::HeaderPhase { .. } => panic!("Wrong phase: HeaderPhase"),
CtxInner::BodyPhase(compressor) => {
let result = compressor
.as_mut()
Expand Down Expand Up @@ -718,9 +748,9 @@ fn test_add_vary_header() {
);
}

fn adjust_response_header(resp: &mut ResponseHeader, action: &Action) {
fn adjust_response_header(resp: &mut ResponseHeader, action: &Action, preserve_etag: bool) {
use http::header::{
HeaderValue, ACCEPT_RANGES, CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING,
HeaderValue, ACCEPT_RANGES, CONTENT_ENCODING, CONTENT_LENGTH, ETAG, TRANSFER_ENCODING,
};

fn set_stream_headers(resp: &mut ResponseHeader) {
Expand All @@ -734,16 +764,49 @@ fn adjust_response_header(resp: &mut ResponseHeader, action: &Action) {
.unwrap();
}

fn weaken_or_clear_etag(resp: &mut ResponseHeader) {
// RFC9110: https://datatracker.ietf.org/doc/html/rfc9110#section-8.8.1
// "a validator is weak if it is shared by two or more representations
// of a given resource at the same time, unless those representations
// have identical representation data"
// Follow nginx gzip filter's example when changing content encoding:
// - if the ETag is not a valid strong ETag, clear it (i.e. does not start with `"`)
// - else, weaken it
if let Some(etag) = resp.headers.get(&ETAG) {
let etag_bytes = etag.as_bytes();
if etag_bytes.starts_with(b"W/") {
// this is already a weak ETag, noop
} else if etag_bytes.starts_with(b"\"") {
// strong ETag, weaken since we are changing the byte representation
let weakened_etag = HeaderValue::from_bytes(&[b"W/", etag_bytes].concat())
.expect("valid header value prefixed with \"W/\" should remain valid");
resp.insert_header(&ETAG, weakened_etag)
.expect("can insert weakened etag when etag was already valid");
} else {
// invalid strong ETag, just clear it
// https://datatracker.ietf.org/doc/html/rfc9110#section-8.8.3
// says the opaque-tag section needs to be a quoted string
resp.remove_header(&ETAG);
}
}
}

match action {
Action::Noop => { /* do nothing */ }
Action::Decompress(_) => {
resp.remove_header(&CONTENT_ENCODING);
set_stream_headers(resp)
set_stream_headers(resp);
if !preserve_etag {
weaken_or_clear_etag(resp);
}
}
Action::Compress(a) => {
resp.insert_header(&CONTENT_ENCODING, HeaderValue::from_static(a.as_str()))
.unwrap();
set_stream_headers(resp)
set_stream_headers(resp);
if !preserve_etag {
weaken_or_clear_etag(resp);
}
}
}
}
Expand All @@ -758,7 +821,8 @@ fn test_adjust_response_header() {
header.insert_header("content-length", "20").unwrap();
header.insert_header("content-encoding", "gzip").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Noop);
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Noop, false);
assert_eq!(
header.headers.get("content-encoding").unwrap().as_bytes(),
b"gzip"
Expand All @@ -767,27 +831,45 @@ fn test_adjust_response_header() {
header.headers.get("content-length").unwrap().as_bytes(),
b"20"
);
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"\"abc123\""
);
assert!(header.headers.get("transfer-encoding").is_none());

// decompress gzip
let mut header = ResponseHeader::build(200, None).unwrap();
header.insert_header("content-length", "20").unwrap();
header.insert_header("content-encoding", "gzip").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip));
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip), false);
assert!(header.headers.get("content-encoding").is_none());
assert!(header.headers.get("content-length").is_none());
assert_eq!(
header.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"
);
assert!(header.headers.get("accept-ranges").is_none());
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"W/\"abc123\""
);
// when preserve_etag on, strong etag is kept
header.insert_header("etag", "\"abc123\"").unwrap();
adjust_response_header(&mut header, &Decompress(Gzip), true);
assert_eq!(
header.headers.get("etag").unwrap().as_bytes(),
b"\"abc123\""
);

// compress
let mut header = ResponseHeader::build(200, None).unwrap();
header.insert_header("content-length", "20").unwrap();
header.insert_header("accept-ranges", "bytes").unwrap();
adjust_response_header(&mut header, &Compress(Gzip));
// try invalid etag, should be cleared
header.insert_header("etag", "abc123").unwrap();
adjust_response_header(&mut header, &Compress(Gzip), false);
assert_eq!(
header.headers.get("content-encoding").unwrap().as_bytes(),
b"gzip"
Expand All @@ -798,4 +880,9 @@ fn test_adjust_response_header() {
header.headers.get("transfer-encoding").unwrap().as_bytes(),
b"chunked"
);
assert!(header.headers.get("etag").is_none());
// when preserve_etag on, etag is kept
header.insert_header("etag", "abc123").unwrap();
adjust_response_header(&mut header, &Compress(Gzip), true);
assert_eq!(header.headers.get("etag").unwrap().as_bytes(), b"abc123");
}
3 changes: 2 additions & 1 deletion pingora-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ impl Session {
Session {
downstream_session: downstream_session.into(),
cache: HttpCache::new(),
upstream_compression: ResponseCompressionCtx::new(0, false), // disable both
// disable both upstream and downstream compression
upstream_compression: ResponseCompressionCtx::new(0, false, false),
ignore_downstream_range: false,
subrequest_ctx: None,
downstream_modules_ctx: downstream_modules.build_ctx(),
Expand Down

0 comments on commit 4f45792

Please sign in to comment.