Skip to content

Commit c8b2764

Browse files
swallezVimCommando
andauthored
Add option to gzip compress request bodies (#239) (#240)
Co-authored-by: Ryan Eno <ryan.eno@elastic.co>
1 parent e6e2f02 commit c8b2764

File tree

3 files changed

+50
-20
lines changed

3 files changed

+50
-20
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

elasticsearch/Cargo.toml

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ bytes = "1"
3131
dyn-clone = "1"
3232
lazy_static = "1"
3333
percent-encoding = "2"
34-
reqwest = { version = "0.12", default-features = false, features = ["gzip", "json"] }
34+
reqwest = { version = "0.12", default-features = false, features = [
35+
"gzip",
36+
"json",
37+
] }
3538
url = "2"
3639
serde = { version = "1", features = ["derive"] }
3740
serde_json = "1"
@@ -40,6 +43,7 @@ serde_with = "3"
4043

4144
#tokio = { version = "1", default-features = false, features = ["macros", "net", "time", "rt-multi-thread"] }
4245
void = "1"
46+
flate2 = "^1.0.34"
4347

4448
[target.'cfg(not(target_arch = "wasm32"))'.dependencies.tokio]
4549
version = "1.0"
@@ -48,14 +52,14 @@ features = ["macros", "net", "time", "rt-multi-thread"]
4852

4953
[dev-dependencies]
5054
chrono = { version = "0.4", features = ["serde"] }
51-
clap = { version = "4", features = ["env"]}
55+
clap = { version = "4", features = ["env"] }
5256
failure = "0.1"
5357
futures = "0.3"
5458
http = "1"
5559
axum = "0.7"
5660
#hyper = { version = "1", features = ["server", "http1"] }
5761
os_type = "2"
58-
regex="1"
62+
regex = "1"
5963
#sysinfo = "0.31"
6064
textwrap = "0.16"
6165
xml-rs = "0.8"

elasticsearch/src/http/transport.rs

Lines changed: 40 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@
1818
*/
1919
//! HTTP transport and connection components
2020
21-
#[cfg(all(target_arch = "wasm32", any(feature = "native-tls", feature = "rustls-tls")))]
21+
#[cfg(all(
22+
target_arch = "wasm32",
23+
any(feature = "native-tls", feature = "rustls-tls")
24+
))]
2225
compile_error!("TLS features are not compatible with the wasm target");
2326

2427
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
@@ -30,8 +33,8 @@ use crate::{
3033
error::Error,
3134
http::{
3235
headers::{
33-
HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_TYPE,
34-
DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT,
36+
HeaderMap, HeaderName, HeaderValue, ACCEPT, AUTHORIZATION, CONTENT_ENCODING,
37+
CONTENT_TYPE, DEFAULT_ACCEPT, DEFAULT_CONTENT_TYPE, DEFAULT_USER_AGENT, USER_AGENT,
3538
},
3639
request::Body,
3740
response::Response,
@@ -40,6 +43,7 @@ use crate::{
4043
};
4144
use base64::{engine::general_purpose::STANDARD as BASE64_STANDARD, write::EncoderWriter, Engine};
4245
use bytes::BytesMut;
46+
use flate2::{write::GzEncoder, Compression};
4347
use lazy_static::lazy_static;
4448
use serde::Serialize;
4549
use serde_json::Value;
@@ -147,6 +151,7 @@ pub struct TransportBuilder {
147151
credentials: Option<Credentials>,
148152
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
149153
cert_validation: Option<CertificateValidation>,
154+
request_body_compression: bool,
150155
#[cfg(not(target_arch = "wasm32"))]
151156
proxy: Option<Url>,
152157
#[cfg(not(target_arch = "wasm32"))]
@@ -172,6 +177,7 @@ impl TransportBuilder {
172177
credentials: None,
173178
#[cfg(any(feature = "native-tls", feature = "rustls-tls"))]
174179
cert_validation: None,
180+
request_body_compression: false,
175181
#[cfg(not(target_arch = "wasm32"))]
176182
proxy: None,
177183
#[cfg(not(target_arch = "wasm32"))]
@@ -215,6 +221,12 @@ impl TransportBuilder {
215221
self
216222
}
217223

224+
/// Gzip compress the body of requests, adds the `Content-Encoding: gzip` header.
225+
pub fn request_body_compression(mut self, enabled: bool) -> Self {
226+
self.request_body_compression = enabled;
227+
self
228+
}
229+
218230
/// Validation applied to the certificate provided to establish a HTTPS connection.
219231
/// By default, full validation is applied. When using a self-signed certificate,
220232
/// different validation can be applied.
@@ -335,6 +347,7 @@ impl TransportBuilder {
335347
Ok(Transport {
336348
client,
337349
conn_pool: self.conn_pool,
350+
request_body_compression: self.request_body_compression,
338351
credentials: self.credentials,
339352
send_meta: self.meta_header,
340353
})
@@ -381,6 +394,7 @@ impl Connection {
381394
pub struct Transport {
382395
client: reqwest::Client,
383396
credentials: Option<Credentials>,
397+
request_body_compression: bool,
384398
conn_pool: Arc<dyn ConnectionPool>,
385399
send_meta: bool,
386400
}
@@ -481,8 +495,7 @@ impl Transport {
481495
headers: HeaderMap,
482496
query_string: Option<&Q>,
483497
body: Option<B>,
484-
#[allow(unused_variables)]
485-
timeout: Option<Duration>,
498+
#[allow(unused_variables)] timeout: Option<Duration>,
486499
) -> Result<reqwest::RequestBuilder, Error>
487500
where
488501
B: Body,
@@ -552,7 +565,17 @@ impl Transport {
552565
bytes_mut.split().freeze()
553566
};
554567

555-
request_builder = request_builder.body(bytes);
568+
match self.request_body_compression {
569+
true => {
570+
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
571+
encoder.write_all(&bytes)?;
572+
request_builder = request_builder.body(encoder.finish()?);
573+
request_builder = request_builder.header(CONTENT_ENCODING, "gzip");
574+
}
575+
false => {
576+
request_builder = request_builder.body(bytes);
577+
}
578+
}
556579
};
557580

558581
if let Some(q) = query_string {
@@ -589,15 +612,17 @@ impl Transport {
589612
let connection = self.conn_pool.next();
590613

591614
// Build node info request
592-
let node_request = self.request_builder(
593-
&connection,
594-
Method::Get,
595-
"_nodes/http?filter_path=nodes.*.http",
596-
HeaderMap::default(),
597-
None::<&()>,
598-
None::<()>,
599-
None,
600-
).unwrap();
615+
let node_request = self
616+
.request_builder(
617+
&connection,
618+
Method::Get,
619+
"_nodes/http?filter_path=nodes.*.http",
620+
HeaderMap::default(),
621+
None::<&()>,
622+
None::<()>,
623+
None,
624+
)
625+
.unwrap();
601626

602627
let scheme = connection.url.scheme();
603628
let resp = node_request.send().await.unwrap();

0 commit comments

Comments
 (0)