Skip to content

Commit

Permalink
Add un-gzip support and allow decompress by algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
drcaramelsyrup authored and andrewhavck committed Aug 9, 2024
1 parent e5fda7c commit e1c6e57
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .bleep
Original file line number Diff line number Diff line change
@@ -1 +1 @@
e68f6024370efed50aebc8741171956acabf9c35
4c6da000f956f3c13473dee0c6302ac0126418dd
1 change: 0 additions & 1 deletion pingora-core/src/protocols/http/compression/brotli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ impl Decompressor {

impl Encode for Decompressor {
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
// reserve at most 16k
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
// Brotli compress ratio can be 3.5 to 4.5
const ESTIMATED_COMPRESSION_RATIO: usize = 4;
Expand Down
89 changes: 85 additions & 4 deletions pingora-core/src/protocols/http/compression/gzip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,65 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::Encode;
use super::{Encode, COMPRESSION_ERROR};

use bytes::Bytes;
use flate2::write::GzEncoder;
use pingora_error::Result;
use flate2::write::{GzDecoder, GzEncoder};
use pingora_error::{OrErr, Result};
use std::io::Write;
use std::time::{Duration, Instant};

// TODO: unzip
pub struct Decompressor {
decompress: GzDecoder<Vec<u8>>,
total_in: usize,
total_out: usize,
duration: Duration,
}

impl Decompressor {
pub fn new() -> Self {
Decompressor {
decompress: GzDecoder::new(vec![]),
total_in: 0,
total_out: 0,
duration: Duration::new(0, 0),
}
}
}

impl Encode for Decompressor {
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
const ESTIMATED_COMPRESSION_RATIO: usize = 3; // estimated 2.5-3x compression
let start = Instant::now();
self.total_in += input.len();
// cap the buf size amplification, there is a DoS risk of always allocate
// 3x the memory of the input buffer
let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP {
input.len() * ESTIMATED_COMPRESSION_RATIO
} else {
input.len()
};
self.decompress.get_mut().reserve(reserve_size);
self.decompress
.write_all(input)
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
// write to vec will never fail, only possible error is that the input data
// was not actually gzip compressed
if end {
self.decompress
.try_finish()
.or_err(COMPRESSION_ERROR, "while decompress Gzip")?;
}
self.total_out += self.decompress.get_ref().len();
self.duration += start.elapsed();
Ok(std::mem::take(self.decompress.get_mut()).into()) // into() Bytes will drop excess capacity
}

fn stat(&self) -> (&'static str, usize, usize, Duration) {
("de-gzip", self.total_in, self.total_out, self.duration)
}
}

pub struct Compressor {
// TODO: enum for other compression algorithms
Expand Down Expand Up @@ -66,6 +116,20 @@ impl Encode for Compressor {
}

use std::ops::{Deref, DerefMut};
impl Deref for Decompressor {
type Target = GzDecoder<Vec<u8>>;

fn deref(&self) -> &Self::Target {
&self.decompress
}
}

impl DerefMut for Decompressor {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.decompress
}
}

impl Deref for Compressor {
type Target = GzEncoder<Vec<u8>>;

Expand Down Expand Up @@ -100,4 +164,21 @@ mod tests_stream {

assert!(compressor.get_ref().is_empty());
}

#[test]
fn gunzip_data() {
let mut decompressor = Decompressor::new();

let compressed_bytes = &[
0x1f, 0x8b, 0x08, 0, 0, 0, 0, 0, 0, 255, 75, 76, 74, 78, 73, 77, 75, 7, 0, 166, 106,
42, 49, 7, 0, 0, 0,
];
let decompressed = decompressor.encode(compressed_bytes, true).unwrap();

assert_eq!(&decompressed[..], b"abcdefg");
assert_eq!(decompressor.total_in, compressed_bytes.len());
assert_eq!(decompressor.total_out, decompressed.len());

assert!(decompressor.get_ref().is_empty());
}
}
47 changes: 28 additions & 19 deletions pingora-core/src/protocols/http/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ pub struct ResponseCompressionCtx(CtxInner);

enum CtxInner {
HeaderPhase {
decompress_enable: bool,
// Store the preferred list to compare with content-encoding
accept_encoding: Vec<Algorithm>,
encoding_levels: [u32; Algorithm::COUNT],
decompress_enable: [bool; Algorithm::COUNT],
},
BodyPhase(Option<Box<dyn Encode + Send + Sync>>),
}
Expand All @@ -81,9 +81,9 @@ impl ResponseCompressionCtx {
/// The `decompress_enable` flag will tell the ctx to decompress if needed.
pub fn new(compression_level: u32, decompress_enable: bool) -> Self {
Self(CtxInner::HeaderPhase {
decompress_enable,
accept_encoding: Vec::new(),
encoding_levels: [compression_level; Algorithm::COUNT],
decompress_enable: [decompress_enable; Algorithm::COUNT],
})
}

Expand All @@ -93,9 +93,9 @@ impl ResponseCompressionCtx {
match &self.0 {
CtxInner::HeaderPhase {
decompress_enable,
accept_encoding: _,
encoding_levels: levels,
} => levels.iter().any(|l| *l != 0) || *decompress_enable,
..
} => levels.iter().any(|l| *l != 0) || decompress_enable.iter().any(|d| *d),
CtxInner::BodyPhase(c) => c.is_some(),
}
}
Expand All @@ -104,11 +104,7 @@ impl ResponseCompressionCtx {
/// algorithm name, in bytes, out bytes, time took for the compression
pub fn get_info(&self) -> Option<(&'static str, usize, usize, Duration)> {
match &self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: _,
} => None,
CtxInner::HeaderPhase { .. } => None,
CtxInner::BodyPhase(c) => c.as_ref().map(|c| c.stat()),
}
}
Expand All @@ -119,9 +115,8 @@ impl ResponseCompressionCtx {
pub fn adjust_level(&mut self, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
..
} => {
*levels = [new_level; Algorithm::COUNT];
}
Expand All @@ -135,27 +130,38 @@ impl ResponseCompressionCtx {
pub fn adjust_algorithm_level(&mut self, algorithm: Algorithm, new_level: u32) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable: _,
accept_encoding: _,
encoding_levels: levels,
..
} => {
levels[algorithm.index()] = new_level;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Adjust the decompression flag.
/// Adjust the decompression flag for all compression algorithms.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_decompression(&mut self, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable,
accept_encoding: _,
encoding_levels: _,
decompress_enable, ..
} => {
*decompress_enable = enabled;
*decompress_enable = [enabled; Algorithm::COUNT];
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
}

/// Adjust the decompression flag for a specific algorithm.
/// # Panic
/// This function will panic if it has already started encoding the response body.
pub fn adjust_algorithm_decompression(&mut self, algorithm: Algorithm, enabled: bool) {
match &mut self.0 {
CtxInner::HeaderPhase {
decompress_enable, ..
} => {
decompress_enable[algorithm.index()] = enabled;
}
CtxInner::BodyPhase(_) => panic!("Wrong phase: BodyPhase"),
}
Expand Down Expand Up @@ -208,7 +214,9 @@ impl ResponseCompressionCtx {
let encoder = match action {
Action::Noop => None,
Action::Compress(algorithm) => algorithm.compressor(levels[algorithm.index()]),
Action::Decompress(algorithm) => algorithm.decompressor(*decompress_enable),
Action::Decompress(algorithm) => {
algorithm.decompressor(decompress_enable[algorithm.index()])
}
};
if encoder.is_some() {
adjust_response_header(resp, &action);
Expand Down Expand Up @@ -317,6 +325,7 @@ impl Algorithm {
None
} else {
match self {
Self::Gzip => Some(Box::new(gzip::Decompressor::new())),
Self::Brotli => Some(Box::new(brotli::Decompressor::new())),
_ => None, // not implemented
}
Expand Down

0 comments on commit e1c6e57

Please sign in to comment.