Skip to content

RUST-48 Causal consistency support #493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,31 @@ impl Client {
}
cmd.set_snapshot_read_concern(session);
}
// If this is a causally consistent session, set `readConcern.afterClusterTime`.
// Causal consistency defaults to true, unless snapshot is true.
else if session
.options()
.and_then(|opts| opts.causal_consistency)
.unwrap_or(true)
&& matches!(
session.transaction.state,
TransactionState::None | TransactionState::Starting
)
&& op.supports_read_concern(stream_description)
{
cmd.set_after_cluster_time(session);
}

match session.transaction.state {
TransactionState::Starting => {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session);

if let Some(ref options) = session.transaction.options {
if let Some(ref read_concern) = options.read_concern {
cmd.set_read_concern_level(read_concern.level.clone());
}
}
if self.is_load_balanced() {
session.pin_connection(connection.pin()?);
} else if is_sharded {
Expand Down Expand Up @@ -512,6 +532,9 @@ impl Client {
Ok(response) => {
match T::Response::deserialize_response(&response) {
Ok(r) => {
if let (Some(session), Some(ts)) = (session.as_mut(), r.operation_time()) {
session.advance_operation_time(ts);
}
self.update_cluster_time(&r, session).await;
if r.is_success() {
// Retrieve recovery token from successful response.
Expand Down Expand Up @@ -542,6 +565,11 @@ impl Client {
// a generic command response without the operation's body.
match response.body::<CommandResponse<Option<CommandErrorBody>>>() {
Ok(error_response) => {
if let (Some(session), Some(ts)) =
(session.as_mut(), error_response.operation_time())
{
session.advance_operation_time(ts);
}
self.update_cluster_time(&error_response, session).await;
match error_response.body {
// if the response was ok: 0, return the command error.
Expand Down
3 changes: 3 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ impl Client {

/// Starts a new `ClientSession`.
pub async fn start_session(&self, options: Option<SessionOptions>) -> Result<ClientSession> {
if let Some(ref options) = options {
options.validate()?;
}
match self.get_session_support_status().await? {
SessionSupportStatus::Supported {
logical_session_timeout,
Expand Down
24 changes: 22 additions & 2 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1479,7 +1479,7 @@ impl ClientOptionsParser {
credential.source = options
.auth_source
.clone()
.or(db.clone())
.or_else(|| db.clone())
.or_else(|| Some("admin".into()));
} else if authentication_requested {
return Err(ErrorKind::InvalidArgument {
Expand Down Expand Up @@ -2391,12 +2391,32 @@ pub struct SessionOptions {
/// associated with the operations within the transaction.
pub default_transaction_options: Option<TransactionOptions>,

/// If true, all operations performed in the context of this session
/// will be [causally consistent](https://docs.mongodb.com/manual/core/causal-consistency-read-write-concerns/).
///
/// Defaults to true if [`SessionOptions::snapshot`] is unspecified.
pub causal_consistency: Option<bool>,

/// If true, all read operations performed using this client session will share the same
/// snapshot. Defaults to false.
// TODO RUST-18 enforce snapshot exclusivity with causalConsistency.
pub snapshot: Option<bool>,
}

impl SessionOptions {
pub(crate) fn validate(&self) -> Result<()> {
if let (Some(causal_consistency), Some(snapshot)) = (self.causal_consistency, self.snapshot)
{
if causal_consistency && snapshot {
return Err(ErrorKind::InvalidArgument {
message: "snapshot and causal consistency are mutually exclusive".to_string(),
}
.into());
}
}
Ok(())
}
}

/// Contains the options that can be used for a transaction.
#[skip_serializing_none]
#[derive(Debug, Default, Serialize, Deserialize, TypedBuilder, Clone)]
Expand Down
20 changes: 20 additions & 0 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct ClientSession {
options: Option<SessionOptions>,
pub(crate) transaction: Transaction,
pub(crate) snapshot_time: Option<Timestamp>,
pub(crate) operation_time: Option<Timestamp>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -214,6 +215,7 @@ impl ClientSession {
options,
transaction: Default::default(),
snapshot_time: None,
operation_time: None,
}
}

Expand Down Expand Up @@ -257,6 +259,21 @@ impl ClientSession {
}
}

/// Advance operation time for this session. If the provided timestamp is earlier than this
/// session's current operation time, then the operation time is unchanged.
pub fn advance_operation_time(&mut self, ts: Timestamp) {
self.operation_time = match self.operation_time {
Some(current_op_time) if current_op_time < ts => Some(ts),
None => Some(ts),
_ => self.operation_time,
}
}

/// The operation time returned by the last operation executed in this session.
pub fn operation_time(&self) -> Option<Timestamp> {
self.operation_time
}

/// Mark this session (and the underlying server session) as dirty.
pub(crate) fn mark_dirty(&mut self) {
self.server_session.dirty = true;
Expand Down Expand Up @@ -559,6 +576,7 @@ struct DroppedClientSession {
options: Option<SessionOptions>,
transaction: Transaction,
snapshot_time: Option<Timestamp>,
operation_time: Option<Timestamp>,
}

impl From<DroppedClientSession> for ClientSession {
Expand All @@ -571,6 +589,7 @@ impl From<DroppedClientSession> for ClientSession {
options: dropped_session.options,
transaction: dropped_session.transaction,
snapshot_time: dropped_session.snapshot_time,
operation_time: dropped_session.operation_time,
}
}
}
Expand All @@ -586,6 +605,7 @@ impl Drop for ClientSession {
options: self.options.clone(),
transaction: self.transaction.take(),
snapshot_time: self.snapshot_time,
operation_time: self.operation_time,
};
RUNTIME.execute(async move {
let mut session: ClientSession = dropped_session.into();
Expand Down
Loading