Skip to content

Commit

Permalink
Fixed broken decompression
Browse files Browse the repository at this point in the history
  • Loading branch information
fgardt committed Nov 15, 2024
1 parent 6cac0b2 commit 78a4930
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
3 changes: 3 additions & 0 deletions src/gateway/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub enum Error {
#[cfg(feature = "transport_compression_zlib")]
/// A decompression error from the `flate2` crate.
DecompressZlib(flate2::DecompressError),
/// When decompressed gateway data is not valid UTF-8.
DecompressUtf8(std::string::FromUtf8Error),
}

impl fmt::Display for Error {
Expand All @@ -75,6 +77,7 @@ impl fmt::Display for Error {
},
#[cfg(feature = "transport_compression_zlib")]
Self::DecompressZlib(inner) => fmt::Display::fmt(&inner, f),
Self::DecompressUtf8(inner) => fmt::Display::fmt(&inner, f),
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/gateway/sharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ use std::fmt;
use std::sync::Arc;
use std::time::{Duration as StdDuration, Instant};

use aformat::{aformat, aformat_into, ArrayString, CapStr};
#[cfg(feature = "transport_compression_zlib")]
use aformat::aformat_into;
use aformat::{aformat, ArrayString, CapStr};
use tokio_tungstenite::tungstenite::error::Error as TungsteniteError;
use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -985,6 +987,7 @@ pub enum TransportCompression {

impl TransportCompression {
fn query_param(self) -> ArrayString<21> {
#[cfg_attr(not(feature = "transport_compression_zlib"), expect(unused_mut))]
let mut res = ArrayString::new();
match self {
Self::None => {},
Expand Down
23 changes: 16 additions & 7 deletions src/gateway/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,14 @@ enum Compression {
Zlib {
inflater: Decompress,
compressed: Vec<u8>,
decompressed: Vec<u8>,
decompressed: Box<[u8]>,
},
}

impl Compression {
#[cfg(feature = "transport_compression_zlib")]
const ZLIB_DECOMPRESSED_CAPACITY: usize = 64 * 1024;

fn inflate(&mut self, slice: &[u8]) -> Result<Option<&[u8]>> {
match self {
Compression::Payload {
Expand Down Expand Up @@ -127,13 +130,13 @@ impl Compression {
}

let pre_out = inflater.total_out();
decompressed.clear();
inflater
.decompress_vec(compressed, decompressed, flate2::FlushDecompress::Sync)
.decompress(compressed, decompressed, flate2::FlushDecompress::Sync)
.map_err(GatewayError::DecompressZlib)?;
compressed.clear();
let produced = (inflater.total_out() - pre_out) as usize;

let size = inflater.total_out() - pre_out;
Ok(Some(&decompressed[..size as usize]))
Ok(Some(&decompressed[..produced]))
},
}
}
Expand All @@ -150,7 +153,7 @@ impl From<TransportCompression> for Compression {
TransportCompression::Zlib => Compression::Zlib {
inflater: Decompress::new(true),
compressed: Vec::new(),
decompressed: Vec::with_capacity(32 * 1024),
decompressed: vec![0; Self::ZLIB_DECOMPRESSED_CAPACITY].into_boxed_slice(),
},
}
}
Expand Down Expand Up @@ -192,7 +195,13 @@ impl WsClient {
return Ok(None);
};

String::from_utf8_lossy(decompressed).to_string()
String::from_utf8(decompressed.to_vec()).map_err(|why| {
warn!("Err decompressing bytes: Decompression resulted in invalid UTF-8 data: {why}");
debug!("Failing bytes: {bytes:?}");
debug!("Decompressed bytes: {decompressed:?}");

GatewayError::DecompressUtf8(why)
})?
},
Message::Close(Some(frame)) => {
return Err(Error::Gateway(GatewayError::Closed(Some(frame))));
Expand Down

0 comments on commit 78a4930

Please sign in to comment.