Skip to content

Commit 4102f4a

Browse files
RUST-1306 Move Compression enum behind compression feature flags (#1055)
1 parent f83403a commit 4102f4a

File tree

17 files changed

+630
-587
lines changed

17 files changed

+630
-587
lines changed

.evergreen/config.yml

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,10 @@ buildvariants:
171171
run_on:
172172
- rhel87-small
173173
expansions:
174-
COMPRESSION: true
175174
AUTH: auth
176175
SSL: ssl
177176
tasks:
178-
- .rapid .replicaset
177+
- .compression
179178

180179
- name: stable-api
181180
display_name: "Stable API V1"
@@ -872,15 +871,38 @@ tasks:
872871
TOPOLOGY: sharded_cluster
873872
- func: "run driver test suite"
874873

875-
- name: test-compression
874+
- name: test-zstd-compression
875+
tags: [compression]
876+
commands:
877+
- func: "bootstrap mongo-orchestration"
878+
vars:
879+
MONGODB_VERSION: rapid
880+
TOPOLOGY: replica_set
881+
- func: "run driver test suite"
882+
vars:
883+
ZSTD: true
884+
885+
- name: test-zlib-compression
886+
tags: [compression]
887+
commands:
888+
- func: "bootstrap mongo-orchestration"
889+
vars:
890+
MONGODB_VERSION: rapid
891+
TOPOLOGY: replica_set
892+
- func: "run driver test suite"
893+
vars:
894+
ZLIB: true
895+
896+
- name: test-snappy-compression
897+
tags: [compression]
876898
commands:
877899
- func: "bootstrap mongo-orchestration"
878900
vars:
879901
MONGODB_VERSION: rapid
880902
TOPOLOGY: replica_set
881903
- func: "run driver test suite"
882904
vars:
883-
COMPRESSION: true
905+
SNAPPY: true
884906

885907
- name: test-aws-auth-regular-credentials
886908
tags: [aws-auth]
@@ -1485,10 +1507,12 @@ functions:
14851507
include_expansions_in_env:
14861508
- PROJECT_DIRECTORY
14871509
- OPENSSL
1488-
- COMPRESSION
14891510
- MONGODB_URI
14901511
- MONGODB_API_VERSION
14911512
- PATH
1513+
- ZSTD
1514+
- ZLIB
1515+
- SNAPPY
14921516

14931517
"run sync tests":
14941518
- command: subprocess.exec

.evergreen/run-tests.sh

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,16 @@ if [ "$OPENSSL" = true ]; then
1212
FEATURE_FLAGS+=("openssl-tls")
1313
fi
1414

15-
if [ "$COMPRESSION" = true ]; then
16-
FEATURE_FLAGS+=("snappy-compression", "zlib-compression", "zstd-compression")
15+
if [ "$ZSTD" = true ]; then
16+
FEATURE_FLAGS+=("zstd-compression")
17+
fi
18+
19+
if [ "$ZLIB" = true ]; then
20+
FEATURE_FLAGS+=("zlib-compression")
21+
fi
22+
23+
if [ "$SNAPPY" = true ]; then
24+
FEATURE_FLAGS+=("snappy-compression")
1725
fi
1826

1927
export SESSION_TEST_REQUIRE_MONGOCRYPTD=true

src/client/options.rs

Lines changed: 65 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,17 @@ use serde_with::skip_serializing_none;
2323
use strsim::jaro_winkler;
2424
use typed_builder::TypedBuilder;
2525

26+
#[cfg(any(
27+
feature = "zstd-compression",
28+
feature = "zlib-compression",
29+
feature = "snappy-compression"
30+
))]
31+
use crate::options::Compressor;
2632
#[cfg(test)]
2733
use crate::srv::LookupHosts;
2834
use crate::{
2935
bson::{doc, Bson, Document},
3036
client::auth::{AuthMechanism, Credential},
31-
compression::Compressor,
3237
concern::{Acknowledgment, ReadConcern, WriteConcern},
3338
error::{Error, ErrorKind, Result},
3439
event::EventHandler,
@@ -389,10 +394,15 @@ pub struct ClientOptions {
389394
#[builder(default)]
390395
pub app_name: Option<String>,
391396

392-
/// The compressors that the Client is willing to use in the order they are specified
393-
/// in the configuration. The Client sends this list of compressors to the server.
394-
/// The server responds with the intersection of its supported list of compressors.
395-
/// The order of compressors indicates preference of compressors.
397+
/// The allowed compressors to use to compress messages sent to and decompress messages
398+
/// received from the server. This list should be specified in priority order, as the
399+
/// compressor used for messages will be the first compressor in this list that is also
400+
/// supported by the server selected for operations.
401+
#[cfg(any(
402+
feature = "zstd-compression",
403+
feature = "zlib-compression",
404+
feature = "snappy-compression"
405+
))]
396406
#[builder(default)]
397407
#[serde(skip)]
398408
pub compressors: Option<Vec<Compressor>>,
@@ -836,6 +846,11 @@ pub struct ConnectionString {
836846
/// By default, connections will not be closed due to being idle.
837847
pub max_idle_time: Option<Duration>,
838848

849+
#[cfg(any(
850+
feature = "zstd-compression",
851+
feature = "zlib-compression",
852+
feature = "snappy-compression"
853+
))]
839854
/// The compressors that the Client is willing to use in the order they are specified
840855
/// in the configuration. The Client sends this list of compressors to the server.
841856
/// The server responds with the intersection of its supported list of compressors.
@@ -1344,6 +1359,11 @@ impl ClientOptions {
13441359
max_idle_time: conn_str.max_idle_time,
13451360
max_connecting: conn_str.max_connecting,
13461361
server_selection_timeout: conn_str.server_selection_timeout,
1362+
#[cfg(any(
1363+
feature = "zstd-compression",
1364+
feature = "zlib-compression",
1365+
feature = "snappy-compression"
1366+
))]
13471367
compressors: conn_str.compressors,
13481368
connect_timeout: conn_str.connect_timeout,
13491369
retry_reads: conn_str.retry_reads,
@@ -1415,6 +1435,11 @@ impl ClientOptions {
14151435
}
14161436
}
14171437

1438+
#[cfg(any(
1439+
feature = "zstd-compression",
1440+
feature = "zlib-compression",
1441+
feature = "snappy-compression"
1442+
))]
14181443
if let Some(ref compressors) = self.compressors {
14191444
for compressor in compressors {
14201445
compressor.validate()?;
@@ -1483,12 +1508,19 @@ impl ClientOptions {
14831508
if self.hosts.is_empty() {
14841509
self.hosts = other.hosts;
14851510
}
1511+
1512+
#[cfg(any(
1513+
feature = "zstd-compression",
1514+
feature = "zlib-compression",
1515+
feature = "snappy-compression"
1516+
))]
1517+
merge_options!(other, self, [compressors]);
1518+
14861519
merge_options!(
14871520
other,
14881521
self,
14891522
[
14901523
app_name,
1491-
compressors,
14921524
cmap_event_handler,
14931525
command_event_handler,
14941526
connect_timeout,
@@ -1939,15 +1971,23 @@ impl ConnectionString {
19391971
}
19401972
}
19411973

1942-
// If zlib and zlib_compression_level are specified then write zlib_compression_level into
1943-
// zlib enum
1944-
if let (Some(compressors), Some(zlib_compression_level)) =
1945-
(self.compressors.as_mut(), parts.zlib_compression)
1946-
{
1947-
for compressor in compressors {
1948-
compressor.write_zlib_level(zlib_compression_level)
1974+
#[cfg(feature = "zlib-compression")]
1975+
if let Some(zlib_compression_level) = parts.zlib_compression {
1976+
if let Some(compressors) = self.compressors.as_mut() {
1977+
for compressor in compressors {
1978+
compressor.write_zlib_level(zlib_compression_level)?;
1979+
}
19491980
}
19501981
}
1982+
#[cfg(not(feature = "zlib-compression"))]
1983+
if parts.zlib_compression.is_some() {
1984+
return Err(ErrorKind::InvalidArgument {
1985+
message: "zlibCompressionLevel may not be specified without the zlib-compression \
1986+
feature flag enabled"
1987+
.into(),
1988+
}
1989+
.into());
1990+
}
19511991

19521992
Ok(parts)
19531993
}
@@ -2078,16 +2118,20 @@ impl ConnectionString {
20782118
}
20792119
parts.auth_mechanism_properties = Some(doc);
20802120
}
2121+
#[cfg(any(
2122+
feature = "zstd-compression",
2123+
feature = "zlib-compression",
2124+
feature = "snappy-compression"
2125+
))]
20812126
"compressors" => {
2082-
let compressors = value
2083-
.split(',')
2084-
.filter_map(|x| Compressor::parse_str(x).ok())
2085-
.collect::<Vec<Compressor>>();
2086-
self.compressors = if compressors.is_empty() {
2087-
None
2088-
} else {
2089-
Some(compressors)
2127+
let mut compressors: Option<Vec<Compressor>> = None;
2128+
for compressor in value.split(',') {
2129+
let compressor = Compressor::from_str(compressor)?;
2130+
compressors
2131+
.get_or_insert_with(Default::default)
2132+
.push(compressor);
20902133
}
2134+
self.compressors = compressors;
20912135
}
20922136
k @ "connecttimeoutms" => {
20932137
self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));

src/client/options/test.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,16 @@ use bson::UuidRepresentation;
44
use pretty_assertions::assert_eq;
55
use serde::Deserialize;
66

7+
#[cfg(any(
8+
feature = "zstd-compression",
9+
feature = "zlib-compression",
10+
feature = "snappy-compression"
11+
))]
12+
use crate::options::Compressor;
713
use crate::{
814
bson::{Bson, Document},
915
client::options::{ClientOptions, ConnectionString, ServerAddress},
1016
error::ErrorKind,
11-
options::Compressor,
1217
test::run_spec_test,
1318
Client,
1419
};
@@ -172,9 +177,13 @@ async fn run_test(test_file: TestFile) {
172177
.filter(|(ref key, _)| json_options.contains_key(key))
173178
.collect();
174179

175-
// This is required because compressor is not serialize, but the spec tests
176-
// still expect to see serialized compressors.
177-
// This hardcodes the compressors into the options.
180+
// Compressor does not implement Serialize, so add the compressor names to the
181+
// options manually.
182+
#[cfg(any(
183+
feature = "zstd-compression",
184+
feature = "zlib-compression",
185+
feature = "snappy-compression"
186+
))]
178187
if let Some(compressors) = options.compressors {
179188
options_doc.insert(
180189
"compressors",

src/cmap/conn.rs

Lines changed: 40 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,18 @@ use tokio::{
1414
sync::{mpsc, Mutex},
1515
};
1616

17+
#[cfg(any(
18+
feature = "zstd-compression",
19+
feature = "zlib-compression",
20+
feature = "snappy-compression"
21+
))]
22+
use crate::options::Compressor;
23+
1724
use self::wire::{Message, MessageFlags};
1825
use super::manager::PoolManager;
1926
use crate::{
2027
bson::oid::ObjectId,
2128
cmap::PoolGeneration,
22-
compression::Compressor,
2329
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
2430
event::cmap::{
2531
CmapEventEmitter,
@@ -97,12 +103,13 @@ pub(crate) struct Connection {
97103

98104
stream: BufStream<AsyncStream>,
99105

100-
/// Compressor that the client will use before sending messages.
101-
/// This compressor does not get used to decompress server messages.
102-
/// The client will decompress server messages using whichever compressor
103-
/// the server indicates in its message. This compressor is the first
104-
/// compressor in the client's compressor list that also appears in the
105-
/// server's compressor list.
106+
/// Compressor to use to compress outgoing messages. This compressor is not used to decompress
107+
/// incoming messages from the server.
108+
#[cfg(any(
109+
feature = "zstd-compression",
110+
feature = "zlib-compression",
111+
feature = "snappy-compression"
112+
))]
106113
pub(super) compressor: Option<Compressor>,
107114

108115
/// If the connection is pinned to a cursor or transaction, the channel sender to return this
@@ -141,6 +148,11 @@ impl Connection {
141148
stream_description: None,
142149
error: None,
143150
pinned_sender: None,
151+
#[cfg(any(
152+
feature = "zstd-compression",
153+
feature = "zlib-compression",
154+
feature = "snappy-compression"
155+
))]
144156
compressor: None,
145157
more_to_come: false,
146158
oidc_token_gen_id: std::sync::RwLock::new(0),
@@ -272,7 +284,8 @@ impl Connection {
272284
pub(crate) async fn send_message(
273285
&mut self,
274286
message: Message,
275-
to_compress: bool,
287+
// This value is only read if a compression feature flag is enabled.
288+
#[allow(unused_variables)] can_compress: bool,
276289
) -> Result<RawCommandResponse> {
277290
if self.more_to_come {
278291
return Err(Error::internal(format!(
@@ -283,16 +296,25 @@ impl Connection {
283296

284297
self.command_executing = true;
285298

286-
// If the client has agreed on a compressor with the server, and the command
287-
// is the right type of command, then compress the message.
299+
#[cfg(any(
300+
feature = "zstd-compression",
301+
feature = "zlib-compression",
302+
feature = "snappy-compression"
303+
))]
288304
let write_result = match self.compressor {
289-
Some(ref compressor) if to_compress => {
305+
Some(ref compressor) if can_compress => {
290306
message
291-
.write_compressed_to(&mut self.stream, compressor)
307+
.write_op_compressed_to(&mut self.stream, compressor)
292308
.await
293309
}
294-
_ => message.write_to(&mut self.stream).await,
310+
_ => message.write_op_msg_to(&mut self.stream).await,
295311
};
312+
#[cfg(all(
313+
not(feature = "zstd-compression"),
314+
not(feature = "zlib-compression"),
315+
not(feature = "snappy-compression")
316+
))]
317+
let write_result = message.write_op_msg_to(&mut self.stream).await;
296318

297319
if let Err(ref err) = write_result {
298320
self.error = Some(err.clone());
@@ -430,6 +452,11 @@ impl Connection {
430452
pool_manager: None,
431453
ready_and_available_time: None,
432454
pinned_sender: self.pinned_sender.clone(),
455+
#[cfg(any(
456+
feature = "zstd-compression",
457+
feature = "zlib-compression",
458+
feature = "snappy-compression"
459+
))]
433460
compressor: self.compressor.clone(),
434461
more_to_come: false,
435462
oidc_token_gen_id: std::sync::RwLock::new(0),

0 commit comments

Comments
 (0)