Skip to content

RUST-54 Support for OP_COMPRESSED #476

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
449081e
add list of compressors to handshake
Sep 23, 2021
fdf30d1
add compressor implementations
Sep 24, 2021
65c3255
formatting
Sep 24, 2021
84dd94b
compressor algorithms
Sep 24, 2021
9a143c9
new compressor implementation
Sep 25, 2021
629626b
refactor compression calling
Sep 26, 2021
3a30152
sends and receives OP_COMPRESSED
Sep 27, 2021
09b1b8d
snappy working
Sep 28, 2021
54b461c
use enum instead of constants
Sep 28, 2021
4d0cb99
refactor, add builders
Sep 28, 2021
3f94a3b
formatting
Sep 28, 2021
7e82c8a
validate compressors
Sep 28, 2021
a566e30
add test cases
Sep 28, 2021
e3e1ff3
comment
Sep 29, 2021
f226b0e
address clippy
Sep 29, 2021
afb379e
fix broken test
Sep 29, 2021
694a414
don't compress hello commands
Sep 29, 2021
326963f
refactor
Sep 29, 2021
19a8553
use compressor enum in clientoptions
Sep 30, 2021
95365c6
address PR comments (all but feature flag)
Oct 1, 2021
b0fd9d3
edit test, add comments
Oct 1, 2021
2875767
put each compressor behind a feature flag
Oct 2, 2021
9c86af1
edit docstring
Oct 4, 2021
c7a849c
address pr comments, move conditional compilations
Oct 4, 2021
bd763cc
fix README
Oct 5, 2021
d9dc9ba
remove allow unused vars
Oct 5, 2021
18cc94d
formatting
Oct 5, 2021
89892b7
remove allow unused mut
Oct 5, 2021
7f7750f
remove -1 as default if not provided through uri
Oct 5, 2021
1077b44
edit docstring
Oct 5, 2021
d92c6bb
address PR comments
Oct 5, 2021
771602a
fix weird unpacking
Oct 5, 2021
132d1e2
always serde skip compressor
Oct 5, 2021
c050674
readme formatting
Oct 5, 2021
ac97466
add dependency version
Oct 5, 2021
fbe6fae
add spaces
Oct 5, 2021
e3f66eb
set snap version
Oct 5, 2021
3927643
formatting
Oct 5, 2021
3813ed8
fix readme columns
Oct 6, 2021
de04ddb
fix compression test running
Oct 6, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,18 @@ bson-uuid-0_8 = ["bson/uuid-0_8"]
# This can only be used with the tokio-runtime feature flag.
aws-auth = ["reqwest"]

zstd-compression = ["zstd"]
zlib-compression = ["flate2"]
snappy-compression = ["snap"]

[dependencies]
async-trait = "0.1.42"
base64 = "0.13.0"
bitflags = "1.1.0"
bson = { git = "https://github.com/mongodb/bson-rust", branch = "master" }
chrono = "0.4.7"
derivative = "2.1.1"
flate2 = { version = "1.0", optional = true }
futures-core = "0.3.14"
futures-io = "0.3.14"
futures-util = { version = "0.3.14", features = ["io"] }
Expand All @@ -56,6 +61,7 @@ rand = { version = "0.8.3", features = ["small_rng"] }
serde_with = "1.3.1"
sha-1 = "0.9.4"
sha2 = "0.9.3"
snap = { version = "1.0.5", optional = true}
socket2 = "0.4.0"
stringprep = "0.1.2"
strsim = "0.10.0"
Expand All @@ -67,6 +73,7 @@ typed-builder = "0.9.0"
version_check = "0.9.1"
webpki = "0.21.0"
webpki-roots = "0.21.0"
zstd = { version = "0.9.0", optional = true }

[dependencies.async-std]
version = "1.9.0"
Expand Down
13 changes: 8 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ features = ["sync"]
| `aws-auth` | Enable support for the MONGODB-AWS authentication mechanism. | `reqwest` 0.11 | no |
| `bson-uuid-0_8` | Enable support for v0.8 of the [`uuid`](docs.rs/uuid/0.8) crate in the public API of the re-exported `bson` crate. | n/a | no |
| `bson-chrono-0_4` | Enable support for v0.4 of the [`chrono`](docs.rs/chrono/0.4) crate in the public API of the re-exported `bson` crate. | n/a | no |
| `zlib-compression` | Enable support for compressing messages with [`zlib`](https://zlib.net/) | `flate2` 1.0 | no |
| `zstd-compression` | Enable support for compressing messages with [`zstd`](http://facebook.github.io/zstd/). This flag requires Rust version 1.54. | `zstd` 0.9.0 | no |
| `snappy-compression`| Enable support for compressing messages with [`snappy`](http://google.github.io/snappy/) | `snap` 1.0.5 | no |

## Example Usage
Below are simple examples of using the driver. For more specific examples and the API reference, see the driver's [docs.rs page](https://docs.rs/mongodb/latest).
Expand Down Expand Up @@ -159,7 +162,7 @@ typed_collection.insert_many(books, None).await?;
```

#### Finding documents in a collection
Results from queries are generally returned via [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html), a struct which streams the results back from the server as requested. The [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html) type implements the [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) trait from the [`futures`](https://crates.io/crates/futures) crate, and in order to access its streaming functionality you need to import at least one of the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) or [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) traits.
Results from queries are generally returned via [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html), a struct which streams the results back from the server as requested. The [`Cursor`](https://docs.rs/mongodb/latest/mongodb/struct.Cursor.html) type implements the [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) trait from the [`futures`](https://crates.io/crates/futures) crate, and in order to access its streaming functionality you need to import at least one of the [`StreamExt`](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html) or [`TryStreamExt`](https://docs.rs/futures/latest/futures/stream/trait.TryStreamExt.html) traits.

``` toml
# In Cargo.toml, add the following dependency.
Expand Down Expand Up @@ -297,17 +300,17 @@ We encourage and would happily accept contributions in the form of GitHub pull r
### Integration and unit tests
In order to run the tests (which are mostly integration tests), you must have access to a MongoDB deployment. You may specify a [MongoDB connection string](https://docs.mongodb.com/manual/reference/connection-string/) in the `MONGODB_URI` environment variable, and the tests will use it to connect to the deployment. If `MONGODB_URI` is unset, the tests will attempt to connect to a local deployment on port 27017.

**Note:** The integration tests will clear out the databases/collections they need to use, but they do not clean up after themselves.
**Note:** The integration tests will clear out the databases/collections they need to use, but they do not clean up after themselves.

To actually run the tests, you can use `cargo` like you would in any other crate:
```bash
cargo test --verbose # runs against localhost:27017
export MONGODB_URI="mongodb://localhost:123"
export MONGODB_URI="mongodb://localhost:123"
cargo test --verbose # runs against localhost:123
```

#### Auth tests
The authentication tests will only be included in the test run if certain requirements are met:
The authentication tests will only be included in the test run if certain requirements are met:
- The deployment must have `--auth` enabled
- Credentials must be specified in `MONGODB_URI`
- The credentials specified in `MONGODB_URI` must be valid and have root privileges on the deployment
Expand All @@ -327,7 +330,7 @@ cargo test --verbose
```

#### Run the tests with TLS/SSL
To run the tests with TLS/SSL enabled, you must enable it on the deployment and in `MONGODB_URI`.
To run the tests with TLS/SSL enabled, you must enable it on the deployment and in `MONGODB_URI`.
```bash
export MONGODB_URI="mongodb://localhost:27017/?tls=true&tlsCertificateKeyFile=cert.pem&tlsCAFile=ca.pem"
cargo test --verbose
Expand Down
47 changes: 35 additions & 12 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
bson::{doc, Bson, Document},
bson_util,
client::auth::{AuthMechanism, Credential},
compression::Compressor,
concern::{Acknowledgment, ReadConcern, WriteConcern},
error::{ErrorKind, Result},
event::{cmap::CmapEventHandler, command::CommandEventHandler, sdam::SdamEventHandler},
Expand Down Expand Up @@ -370,8 +371,13 @@ pub struct ClientOptions {
#[builder(default)]
pub app_name: Option<String>,

#[builder(default, setter(skip))]
pub(crate) compressors: Option<Vec<String>>,
/// The compressors that the Client is willing to use in the order they are specified
/// in the configuration. The Client sends this list of compressors to the server.
/// The server responds with the intersection of its supported list of compressors.
/// The order of compressors indicates preference of compressors.
#[builder(default)]
#[serde(skip)]
pub compressors: Option<Vec<Compressor>>,

/// The handler that should process all Connection Monitoring and Pooling events. See the
/// CmapEventHandler type documentation for more details.
Expand Down Expand Up @@ -525,9 +531,6 @@ pub struct ClientOptions {
#[builder(default)]
pub write_concern: Option<WriteConcern>,

#[builder(default, setter(skip))]
pub(crate) zlib_compression: Option<i32>,

/// Information from the SRV URI that generated these client options, if applicable.
#[builder(default, setter(skip))]
#[serde(skip)]
Expand Down Expand Up @@ -593,7 +596,6 @@ impl Serialize for ClientOptions {
#[derive(Serialize)]
struct ClientOptionsHelper<'a> {
appname: &'a Option<String>,
compressors: &'a Option<Vec<String>>,

#[serde(serialize_with = "bson_util::serialize_duration_option_as_int_millis")]
connecttimeoutms: &'a Option<Duration>,
Expand Down Expand Up @@ -650,7 +652,6 @@ impl Serialize for ClientOptions {

let client_options = ClientOptionsHelper {
appname: &self.app_name,
compressors: &self.compressors,
connecttimeoutms: &self.connect_timeout,
credential: &self.credential,
directconnection: &self.direct_connection,
Expand All @@ -668,8 +669,8 @@ impl Serialize for ClientOptions {
sockettimeoutms: &self.socket_timeout,
tls: &self.tls,
writeconcern: &self.write_concern,
zlibcompressionlevel: &self.zlib_compression,
loadbalanced: &self.load_balanced,
zlibcompressionlevel: &None,
};

client_options.serialize(serializer)
Expand All @@ -693,7 +694,7 @@ struct ClientOptionsParser {
pub min_pool_size: Option<u32>,
pub max_idle_time: Option<Duration>,
pub wait_queue_timeout: Option<Duration>,
pub compressors: Option<Vec<String>>,
pub compressors: Option<Vec<Compressor>>,
pub connect_timeout: Option<Duration>,
pub retry_reads: Option<bool>,
pub retry_writes: Option<bool>,
Expand Down Expand Up @@ -929,7 +930,6 @@ impl From<ClientOptionsParser> for ClientOptions {
retry_reads: parser.retry_reads,
retry_writes: parser.retry_writes,
socket_timeout: parser.socket_timeout,
zlib_compression: parser.zlib_compression,
direct_connection: parser.direct_connection,
driver_info: None,
credential: parser.credential,
Expand Down Expand Up @@ -1180,6 +1180,12 @@ impl ClientOptions {
}
}

if let Some(ref compressors) = self.compressors {
for compressor in compressors {
compressor.validate()?;
}
}

Ok(())
}

Expand Down Expand Up @@ -1213,7 +1219,6 @@ impl ClientOptions {
socket_timeout,
tls,
write_concern,
zlib_compression,
original_srv_info,
original_uri
]
Expand Down Expand Up @@ -1564,6 +1569,16 @@ impl ClientOptionsParser {
}
}

// If zlib and zlib_compression_level are specified then write zlib_compression_level into
// zlib enum
if let (Some(compressors), Some(zlib_compression_level)) =
(self.compressors.as_mut(), self.zlib_compression)
{
for compressor in compressors {
compressor.write_zlib_level(zlib_compression_level)
}
}

Ok(())
}

Expand Down Expand Up @@ -1668,7 +1683,15 @@ impl ClientOptionsParser {
self.auth_mechanism_properties = Some(doc);
}
"compressors" => {
self.compressors = Some(value.split(',').map(String::from).collect());
let compressors = value
.split(',')
.filter_map(|x| Compressor::parse_str(x).ok())
.collect::<Vec<Compressor>>();
self.compressors = if compressors.is_empty() {
None
} else {
Some(compressors)
}
}
k @ "connecttimeoutms" => {
self.connect_timeout = Some(Duration::from_millis(get_duration!(value, k)));
Expand Down
32 changes: 32 additions & 0 deletions src/client/options/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
bson::{Bson, Document},
client::options::{ClientOptions, ClientOptionsParser, ServerAddress},
error::ErrorKind,
options::Compressor,
test::run_spec_test,
};
#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -40,6 +41,18 @@ async fn run_test(test_file: TestFile) {
|| test_case.description.contains("serverSelectionTryOnce")
|| test_case.description.contains("Unix")
|| test_case.description.contains("relative path")
// Compression is implemented but will only pass the tests if all
// the appropriate feature flags are set. That is because
// valid compressors are only parsed correctly if the corresponding feature flag is set.
// (otherwise they are treated as invalid, and hence ignored)
|| (test_case.description.contains("compress") &&
!cfg!(
all(features = "zlib-compression",
features = "zstd-compression",
features = "snappy-compression"
)
)
)
{
continue;
}
Expand Down Expand Up @@ -105,6 +118,25 @@ async fn run_test(test_file: TestFile) {
.filter(|(ref key, _)| json_options.contains_key(key))
.collect();

// This is required because compressor is not serialize, but the spec tests
// still expect to see serialized compressors.
// This hardcodes the compressors into the options.
if let Some(compressors) = options.compressors {
options_doc.insert(
"compressors",
compressors
.iter()
.map(Compressor::name)
.collect::<Vec<&str>>(),
);
#[cfg(feature = "zlib-compression")]
for compressor in compressors {
if let Compressor::Zlib { level: Some(level) } = compressor {
options_doc.insert("zlibcompressionlevel", level);
}
}
}

assert_eq!(options_doc, json_options, "{}", test_case.description)
}
// auth
Expand Down
9 changes: 8 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::{de::DeserializeOwned, Serialize};
use super::wire::Message;
use crate::{
bson::Document,
client::{options::ServerApi, ClusterTime},
client::{options::ServerApi, ClusterTime, HELLO_COMMAND_NAMES, REDACTED_COMMANDS},
error::{Error, ErrorKind, Result},
is_master::{IsMasterCommandResponse, IsMasterReply},
operation::{CommandErrorBody, CommandResponse, Response},
Expand All @@ -22,6 +22,13 @@ pub(crate) struct RawCommand {
pub(crate) bytes: Vec<u8>,
}

impl RawCommand {
pub(crate) fn should_compress(&self) -> bool {
let name = self.name.to_lowercase();
!REDACTED_COMMANDS.contains(name.as_str()) && !HELLO_COMMAND_NAMES.contains(name.as_str())
}
}

/// Driver-side model of a database command.
#[serde_with::skip_serializing_none]
#[derive(Clone, Debug, Serialize, Default)]
Expand Down
36 changes: 32 additions & 4 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::{
options::{ConnectionOptions, StreamOptions},
PoolGeneration,
},
compression::Compressor,
error::{load_balanced_mode_mismatch, Error, ErrorKind, Result},
event::cmap::{
CmapEventHandler,
Expand Down Expand Up @@ -76,6 +77,14 @@ pub(crate) struct Connection {

stream: AsyncStream,

/// Compressor that the client will use before sending messages.
/// This compressor does not get used to decompress server messages.
/// The client will decompress server messages using whichever compressor
/// the server indicates in its message. This compressor is the first
/// compressor in the client's compressor list that also appears in the
/// server's compressor list.
pub(super) compressor: Option<Compressor>,

/// If the connection is pinned to a cursor or transaction, the channel sender to return this
/// connection to the pin holder.
pinned_sender: Option<mpsc::Sender<Connection>>,
Expand Down Expand Up @@ -109,6 +118,7 @@ impl Connection {
stream_description: None,
error: false,
pinned_sender: None,
compressor: None,
};

Ok(conn)
Expand Down Expand Up @@ -244,9 +254,24 @@ impl Connection {
}
}

async fn send_message(&mut self, message: Message) -> Result<RawCommandResponse> {
async fn send_message(
&mut self,
message: Message,
to_compress: bool,
) -> Result<RawCommandResponse> {
self.command_executing = true;
let write_result = message.write_to(&mut self.stream).await;

// If the client has agreed on a compressor with the server, and the command
// is the right type of command, then compress the message.
let write_result = match self.compressor {
Some(ref compressor) if to_compress => {
message
.write_compressed_to(&mut self.stream, compressor)
.await
}
_ => message.write_to(&mut self.stream).await,
};

self.error = write_result.is_err();
write_result?;

Expand All @@ -267,8 +292,9 @@ impl Connection {
command: Command,
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let to_compress = command.should_compress();
let message = Message::with_command(command, request_id.into())?;
self.send_message(message).await
self.send_message(message, to_compress).await
}

/// Executes a `RawCommand` and returns a `CommandResponse` containing the result from the
Expand All @@ -282,8 +308,9 @@ impl Connection {
command: RawCommand,
request_id: impl Into<Option<i32>>,
) -> Result<RawCommandResponse> {
let to_compress = command.should_compress();
let message = Message::with_raw_command(command, request_id.into());
self.send_message(message).await
self.send_message(message, to_compress).await
}

/// Gets the connection's StreamDescription.
Expand Down Expand Up @@ -351,6 +378,7 @@ impl Connection {
pool_manager: None,
ready_and_available_time: None,
pinned_sender: self.pinned_sender.clone(),
compressor: self.compressor.clone(),
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/cmap/conn/wire/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub(crate) enum OpCode {
Reply = 1,
Query = 2004,
Message = 2013,
Compressed = 2012,
}

impl OpCode {
Expand All @@ -21,6 +22,7 @@ impl OpCode {
1 => Ok(OpCode::Reply),
2004 => Ok(OpCode::Query),
2013 => Ok(OpCode::Message),
2012 => Ok(OpCode::Compressed),
other => Err(ErrorKind::InvalidResponse {
message: format!("Invalid wire protocol opcode: {}", other),
}
Expand Down
Loading