Skip to content

RUST-885 Support snapshot sessions #390

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,27 @@ impl Client {
if let Some(txn_number) = txn_number {
cmd.set_txn_number(txn_number);
}
if session
.options()
.and_then(|opts| opts.snapshot)
.unwrap_or(false)
{
if connection
.stream_description()?
.max_wire_version
.unwrap_or(0)
< 13
{
let labels: Option<Vec<_>> = None;
return Err(Error::new(
ErrorKind::IncompatibleServer {
message: "Snapshot reads require MongoDB 5.0 or later".into(),
},
labels,
));
}
cmd.set_snapshot_read_concern(session)?;
}
match session.transaction.state {
TransactionState::Starting => {
cmd.set_start_transaction();
Expand Down Expand Up @@ -361,6 +382,11 @@ impl Client {
session.advance_cluster_time(cluster_time)
}
}
if let (Some(timestamp), Some(session)) =
(response.snapshot_time(), session.as_mut())
{
session.snapshot_time = Some(*timestamp);
}
response.validate().map(|_| response)
}
err => err,
Expand Down
27 changes: 24 additions & 3 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,20 +158,41 @@ impl Client {
Database::new(self.clone(), name, Some(options))
}

/// Gets information about each database present in the cluster the Client is connected to.
pub async fn list_databases(
async fn list_databases_common(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: Option<&mut ClientSession>,
) -> Result<Vec<DatabaseSpecification>> {
let op = ListDatabases::new(filter.into(), false, options.into());
self.execute_operation(op, None).await.and_then(|dbs| {
self.execute_operation(op, session).await.and_then(|dbs| {
dbs.into_iter()
.map(|db_spec| bson::from_document(db_spec).map_err(crate::error::Error::from))
.collect()
})
}

/// Gets information about each database present in the cluster the Client is connected to.
pub async fn list_databases(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, None).await
}

/// Gets information about each database present in the cluster the Client is connected to
/// using the provided `ClientSession`.
pub async fn list_databases_with_session(
&self,
filter: impl Into<Option<Document>>,
options: impl Into<Option<ListDatabasesOptions>>,
session: &mut ClientSession,
) -> Result<Vec<DatabaseSpecification>> {
self.list_databases_common(filter, options, Some(session))
.await
}

/// Gets the names of the databases present in the cluster the Client is connected to.
pub async fn list_database_names(
&self,
Expand Down
5 changes: 5 additions & 0 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2307,6 +2307,11 @@ pub struct SessionOptions {
/// on the [`Database`](../struct.Database.html) or [`Collection`](../struct.Collection.html)
/// associated with the operations within the transaction.
pub default_transaction_options: Option<TransactionOptions>,

/// If true, all read operations performed using this client session will share the same
/// snapshot. Defaults to false.
// TODO RUST-18 enforce snapshot exclusivity with causalConsistency.
pub snapshot: Option<bool>,
}

/// Contains the options that can be used for a transaction.
Expand Down
18 changes: 17 additions & 1 deletion src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use lazy_static::lazy_static;
use uuid::Uuid;

use crate::{
bson::{doc, spec::BinarySubtype, Binary, Bson, Document},
bson::{doc, spec::BinarySubtype, Binary, Bson, Document, Timestamp},
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
Expand Down Expand Up @@ -106,6 +106,7 @@ pub struct ClientSession {
is_implicit: bool,
options: Option<SessionOptions>,
pub(crate) transaction: Transaction,
pub(crate) snapshot_time: Option<Timestamp>,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -173,6 +174,7 @@ impl ClientSession {
is_implicit,
options,
transaction: Default::default(),
snapshot_time: None,
}
}

Expand Down Expand Up @@ -278,6 +280,17 @@ impl ClientSession {
&mut self,
options: impl Into<Option<TransactionOptions>>,
) -> Result<()> {
if self
.options
.as_ref()
.and_then(|o| o.snapshot)
.unwrap_or(false)
{
return Err(ErrorKind::Transaction {
message: "Transactions are not supported in snapshot sessions".into(),
}
.into());
}
match self.transaction.state {
TransactionState::Starting | TransactionState::InProgress => {
return Err(ErrorKind::Transaction {
Expand Down Expand Up @@ -486,6 +499,7 @@ struct DroppedClientSession {
is_implicit: bool,
options: Option<SessionOptions>,
transaction: Transaction,
snapshot_time: Option<Timestamp>,
}

impl From<DroppedClientSession> for ClientSession {
Expand All @@ -497,6 +511,7 @@ impl From<DroppedClientSession> for ClientSession {
is_implicit: dropped_session.is_implicit,
options: dropped_session.options,
transaction: dropped_session.transaction,
snapshot_time: dropped_session.snapshot_time,
}
}
}
Expand All @@ -511,6 +526,7 @@ impl Drop for ClientSession {
is_implicit: self.is_implicit,
options: self.options.clone(),
transaction: self.transaction.clone(),
snapshot_time: self.snapshot_time,
};
RUNTIME.execute(async move {
let mut session: ClientSession = dropped_session.into();
Expand Down
28 changes: 27 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use serde::{de::DeserializeOwned, Deserialize};

use super::wire::Message;
use crate::{
bson::{Bson, Document},
bson::{Bson, Document, Timestamp},
bson_util,
client::{options::ServerApi, ClusterTime},
concern::ReadConcern,
error::{CommandError, Error, ErrorKind, Result},
options::ServerAddress,
selection_criteria::ReadPreference,
Expand Down Expand Up @@ -84,13 +85,22 @@ impl Command {
}
Ok(())
}

pub(crate) fn set_snapshot_read_concern(&mut self, session: &ClientSession) -> Result<()> {
let mut concern = ReadConcern::snapshot();
concern.at_cluster_time = session.snapshot_time;
self.body
.insert("readConcern", bson::to_document(&concern)?);
Ok(())
}
}

#[derive(Debug, Clone)]
pub(crate) struct CommandResponse {
source: ServerAddress,
pub(crate) raw_response: Document,
cluster_time: Option<ClusterTime>,
snapshot_time: Option<Timestamp>,
}

impl CommandResponse {
Expand All @@ -100,6 +110,7 @@ impl CommandResponse {
source,
raw_response: doc,
cluster_time: None,
snapshot_time: None,
}
}

Expand All @@ -120,11 +131,21 @@ impl CommandResponse {
let cluster_time = raw_response
.get("$clusterTime")
.and_then(|subdoc| bson::from_bson(subdoc.clone()).ok());
let snapshot_time = raw_response
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My initial approach was to add atClusterTime as an explicit field in the output of the relevant commands (find, aggregate, distinct), but that required:

  • introducing a trait for the output type of an operation, which added a lot of churn, and
  • additional special handling in operations implemented on top of one of those three (e.g. countDocuments).

The approach here avoids both of those issues at the cost of digging a bit into the raw document.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of #389, I actually did have to introduce a trait for the output type of an operation (there was definitely a lot of churn, but for that work it was required I think), so we could still go with your original plan once that PR is merged. For the purposes of avoiding merge conflicts, I think it'll probably be easier to merge this PR first though and then rebase #389 off of that and adjust for atClusterTime accordingly there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, with that trait existing it definitely makes sense to go back to extracting the snapshot time that way. Your plan for merging SGTM.

.get("atClusterTime")
.or_else(|| {
raw_response
.get("cursor")
.and_then(|b| b.as_document())
.and_then(|subdoc| subdoc.get("atClusterTime"))
})
.and_then(|subdoc| bson::from_bson(subdoc.clone()).ok());

Ok(Self {
source,
raw_response,
cluster_time,
snapshot_time,
})
}

Expand Down Expand Up @@ -170,6 +191,11 @@ impl CommandResponse {
self.cluster_time.as_ref()
}

/// Gets the snapshot time from the response, if any.
pub(crate) fn snapshot_time(&self) -> Option<&Timestamp> {
self.snapshot_time.as_ref()
}

/// The address of the server that sent this response.
pub(crate) fn source_address(&self) -> &ServerAddress {
&self.source
Expand Down
12 changes: 10 additions & 2 deletions src/concern/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde_with::skip_serializing_none;
use typed_builder::TypedBuilder;

use crate::{
bson::{doc, serde_helpers},
bson::{doc, serde_helpers, Timestamp},
bson_util,
error::{ErrorKind, Result},
};
Expand All @@ -20,11 +20,16 @@ use crate::{
///
/// See the documentation [here](https://docs.mongodb.com/manual/reference/read-concern/) for more
/// information about read concerns.
#[skip_serializing_none]
#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct ReadConcern {
/// The level of the read concern.
pub level: ReadConcernLevel,

/// The snapshot read timestamp.
pub(crate) at_cluster_time: Option<Timestamp>,
}

impl ReadConcern {
Expand Down Expand Up @@ -87,7 +92,10 @@ impl ReadConcern {

impl From<ReadConcernLevel> for ReadConcern {
fn from(level: ReadConcernLevel) -> Self {
Self { level }
Self {
level,
at_cluster_time: None,
}
}
}

Expand Down
11 changes: 5 additions & 6 deletions src/cursor/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use futures_core::{Future, Stream};
use crate::{
bson::Document,
error::{Error, ErrorKind, Result},
operation,
options::ServerAddress,
results::GetMoreResult,
Client,
Expand Down Expand Up @@ -152,22 +153,20 @@ pub(crate) struct CursorSpecification {

impl CursorSpecification {
pub(crate) fn new(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This refactor was in service of being able to pass an at_cluster_time field through easily, but even without that it seemed worthwhile to leave in.

ns: Namespace,
info: operation::CursorInfo,
address: ServerAddress,
id: i64,
batch_size: impl Into<Option<u32>>,
max_time: impl Into<Option<Duration>>,
initial_buffer: VecDeque<Document>,
) -> Self {
Self {
info: CursorInformation {
ns,
id,
ns: info.ns,
id: info.id,
address,
batch_size: batch_size.into(),
max_time: max_time.into(),
},
initial_buffer,
initial_buffer: info.first_batch,
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl Error {
Some(write_error.message.clone())
}
ErrorKind::Transaction { message } => Some(message.clone()),
ErrorKind::IncompatibleServer { message } => Some(message.clone()),
_ => None,
}
}
Expand Down Expand Up @@ -411,6 +412,11 @@ pub enum ErrorKind {
#[error("{message}")]
#[non_exhaustive]
Transaction { message: String },

/// The server does not support the operation.
#[error("The server does not support a database operation: {message}")]
#[non_exhaustive]
IncompatibleServer { message: String },
}

/// An error that occurred due to a database command failing.
Expand Down
4 changes: 1 addition & 3 deletions src/operation/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,10 @@ impl Operation for Aggregate {
let body: CursorBody = response.body()?;

Ok(CursorSpecification::new(
body.cursor.ns,
body.cursor,
source_address,
body.cursor.id,
self.options.as_ref().and_then(|opts| opts.batch_size),
self.options.as_ref().and_then(|opts| opts.max_await_time),
body.cursor.first_batch,
))
}

Expand Down
7 changes: 3 additions & 4 deletions src/operation/find/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,14 @@ impl Operation for Find {
_description: &StreamDescription,
) -> Result<Self::O> {
let source_address = response.source_address().clone();
let body: CursorBody = response.body()?;
let mut body: CursorBody = response.body()?;
body.cursor.ns = self.ns.clone();

Ok(CursorSpecification::new(
self.ns.clone(),
body.cursor,
source_address,
body.cursor.id,
self.options.as_ref().and_then(|opts| opts.batch_size),
self.options.as_ref().and_then(|opts| opts.max_await_time),
body.cursor.first_batch,
))
}

Expand Down
4 changes: 1 addition & 3 deletions src/operation/list_collections/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,10 @@ impl Operation for ListCollections {
let body: CursorBody = response.body()?;

Ok(CursorSpecification::new(
body.cursor.ns,
body.cursor,
source_address,
body.cursor.id,
self.options.as_ref().and_then(|opts| opts.batch_size),
None,
body.cursor.first_batch,
))
}

Expand Down
9 changes: 5 additions & 4 deletions src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,12 @@ struct CursorBody {
}

#[derive(Debug, Deserialize)]
struct CursorInfo {
id: i64,
ns: Namespace,
#[serde(rename_all = "camelCase")]
pub(crate) struct CursorInfo {
pub(crate) id: i64,
pub(crate) ns: Namespace,
#[serde(rename = "firstBatch")]
first_batch: VecDeque<Document>,
pub(crate) first_batch: VecDeque<Document>,
}

#[derive(Debug, PartialEq)]
Expand Down
Loading