Skip to content

RUST-734 Sharded Transactions #408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 79 additions & 17 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ use crate::{
Retryability,
},
options::SelectionCriteria,
sdam::{HandshakePhase, SelectedServer, SessionSupportStatus, TransactionSupportStatus},
sdam::{
HandshakePhase,
SelectedServer,
ServerType,
SessionSupportStatus,
TransactionSupportStatus,
},
selection_criteria::ReadPreference,
};

Expand Down Expand Up @@ -135,18 +141,23 @@ impl Client {
}
}

let server = match self.select_server(op.selection_criteria()).await {
let selection_criteria = session
.as_ref()
.and_then(|s| s.transaction.pinned_mongos.as_ref())
.or_else(|| op.selection_criteria());

let server = match self.select_server(selection_criteria).await {
Ok(server) => server,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;
return Err(err);
}
};

let mut conn = match server.pool.check_out().await {
Ok(conn) => conn,
Err(mut err) => {
err.add_labels(None, &session, None)?;
err.add_labels_and_update_pin(None, &mut session, None)?;

if err.is_pool_cleared() {
return self.execute_retry(&mut op, &mut session, None, err).await;
Expand Down Expand Up @@ -229,6 +240,8 @@ impl Client {
txn_number: Option<i64>,
first_error: Error,
) -> Result<T::O> {
op.update_for_retry();

let server = match self.select_server(op.selection_criteria()).await {
Ok(server) => server,
Err(_) => {
Expand All @@ -246,8 +259,6 @@ impl Client {
return Err(first_error);
}

op.update_for_retry();

match self
.execute_operation_on_connection(op, &mut conn, session, txn_number, &retryability)
.await
Expand Down Expand Up @@ -286,7 +297,9 @@ impl Client {
wc.validate()?;
}

let mut cmd = op.build(connection.stream_description()?)?;
let stream_description = connection.stream_description()?;
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
let mut cmd = op.build(stream_description)?;
self.inner
.topology
.update_command_with_read_pref(connection.address(), &mut cmd, op.selection_criteria())
Expand Down Expand Up @@ -324,12 +337,22 @@ impl Client {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session)?;
if is_sharded {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress
| TransactionState::Committed { .. }
| TransactionState::Aborted => {
TransactionState::InProgress => cmd.set_autocommit(),
TransactionState::Committed { .. } | TransactionState::Aborted => {
cmd.set_autocommit();

// Append the recovery token to the command if we are committing or aborting
// on a sharded transaction.
if is_sharded {
if let Some(ref recovery_token) = session.transaction.recovery_token {
cmd.set_recovery_token(recovery_token);
}
}
}
_ => {}
}
Expand Down Expand Up @@ -398,6 +421,9 @@ impl Client {
Ok(r) => {
self.update_cluster_time(&r, session).await;
if r.is_success() {
// Retrieve recovery token from successful response.
Client::update_recovery_token(is_sharded, &r, session).await;

Ok(CommandResult {
raw: response,
deserialized: r.into_body(),
Expand Down Expand Up @@ -442,7 +468,15 @@ impl Client {
}))
}
// for ok: 1 just return the original deserialization error.
_ => Err(deserialize_error),
_ => {
Client::update_recovery_token(
is_sharded,
&error_response,
session,
)
.await;
Err(deserialize_error)
}
}
}
// We failed to deserialize even that, so just return the original
Expand Down Expand Up @@ -471,13 +505,13 @@ impl Client {
handler.handle_command_failed_event(command_failed_event);
});

if let Some(session) = session {
if let Some(ref mut session) = session {
if err.is_network_error() {
session.mark_dirty();
}
}

err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(Some(connection), session, Some(retryability))?;
op.handle_error(err)
}
Ok(response) => {
Expand All @@ -504,7 +538,11 @@ impl Client {
match op.handle_response(response.deserialized, connection.stream_description()?) {
Ok(response) => Ok(response),
Err(mut err) => {
err.add_labels(Some(connection), session, Some(retryability))?;
err.add_labels_and_update_pin(
Some(connection),
session,
Some(retryability),
)?;
Err(err)
}
}
Expand Down Expand Up @@ -615,10 +653,22 @@ impl Client {
}
}
}

async fn update_recovery_token<T: Response>(
is_sharded: bool,
response: &T,
session: &mut Option<&mut ClientSession>,
) {
if let Some(ref mut session) = session {
if is_sharded && session.in_transaction() {
session.transaction.recovery_token = response.recovery_token().cloned();
}
}
}
}

impl Error {
/// Adds the necessary labels to this Error.
/// Adds the necessary labels to this Error, and unpins the session if needed.
///
/// A TransientTransactionError label should be added if a transaction is in progress and the
/// error is a network or server selection error.
Expand All @@ -628,10 +678,13 @@ impl Error {
/// server version, a label should only be added if the `retry_writes` client option is not set
/// to `false`, the operation during which the error occured is write-retryable, and a
/// TransientTransactionError label has not already been added.
fn add_labels(
///
/// If the TransientTransactionError or UnknownTransactionCommitResult labels are added, the
/// ClientSession should be unpinned.
fn add_labels_and_update_pin(
&mut self,
conn: Option<&Connection>,
session: &Option<&mut ClientSession>,
session: &mut Option<&mut ClientSession>,
retryability: Option<&Retryability>,
) -> Result<()> {
let transaction_state = session.as_ref().map_or(&TransactionState::None, |session| {
Expand Down Expand Up @@ -675,6 +728,15 @@ impl Error {
}
}
}

if let Some(ref mut session) = session {
if self.contains_label(TRANSIENT_TRANSACTION_ERROR)
|| self.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT)
{
session.unpin_mongos();
}
}

Ok(())
}
}
Expand Down
42 changes: 33 additions & 9 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ mod test;

use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -16,13 +17,16 @@ use crate::{
error::{ErrorKind, Result},
operation::{AbortTransaction, CommitTransaction, Operation},
options::{SessionOptions, TransactionOptions},
sdam::TransactionSupportStatus,
sdam::{ServerInfo, TransactionSupportStatus},
selection_criteria::SelectionCriteria,
Client,
RUNTIME,
};
pub(crate) use cluster_time::ClusterTime;
pub(super) use pool::ServerSessionPool;

use super::options::ServerAddress;

lazy_static! {
pub(crate) static ref SESSIONS_UNSUPPORTED_COMMANDS: HashSet<&'static str> = {
let mut hash_set = HashSet::new();
Expand All @@ -43,10 +47,10 @@ lazy_static! {
/// collections atomically. For more information about when and how to use transactions in MongoDB,
/// see the [manual](https://docs.mongodb.com/manual/core/transactions/).
///
/// Replica set transactions are supported on MongoDB 4.0+. Transactions are associated with a
/// `ClientSession`. To begin a transaction, call [`ClientSession::start_transaction`] on a
/// `ClientSession`. The `ClientSession` must be passed to operations to be executed within the
/// transaction.
/// Replica set transactions are supported on MongoDB 4.0+. Sharded transactions are supported on
/// MongoDDB 4.2+. Transactions are associated with a `ClientSession`. To begin a transaction, call
/// [`ClientSession::start_transaction`] on a `ClientSession`. The `ClientSession` must be passed to
/// operations to be executed within the transaction.
///
/// ```rust
/// use mongodb::{
Expand Down Expand Up @@ -95,9 +99,6 @@ lazy_static! {
/// }
/// }
/// ```
// TODO RUST-122 Remove this note and adjust the above description to indicate that sharded
// transactions are supported on 4.2+
/// Note: the driver does not currently support transactions on sharded clusters.
#[derive(Clone, Debug)]
pub struct ClientSession {
cluster_time: Option<ClusterTime>,
Expand All @@ -113,12 +114,15 @@ pub struct ClientSession {
pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
pub(crate) recovery_token: Option<Document>,
}

impl Transaction {
pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
self.state = TransactionState::Starting;
self.options = options;
self.recovery_token = None;
}

pub(crate) fn commit(&mut self, data_committed: bool) {
Expand All @@ -128,11 +132,14 @@ impl Transaction {
pub(crate) fn abort(&mut self) {
self.state = TransactionState::Aborted;
self.options = None;
self.pinned_mongos = None;
}

pub(crate) fn reset(&mut self) {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
self.recovery_token = None;
}
}

Expand All @@ -141,6 +148,8 @@ impl Default for Transaction {
Self {
state: TransactionState::None,
options: None,
pinned_mongos: None,
recovery_token: None,
}
}
}
Expand Down Expand Up @@ -245,6 +254,17 @@ impl ClientSession {
self.server_session.txn_number
}

/// Pin mongos to session.
pub(crate) fn pin_mongos(&mut self, address: ServerAddress) {
self.transaction.pinned_mongos = Some(SelectionCriteria::Predicate(Arc::new(
move |server_info: &ServerInfo| *server_info.address() == address,
)));
}

pub(crate) fn unpin_mongos(&mut self) {
self.transaction.pinned_mongos = None;
}

/// Whether this session is dirty.
#[cfg(test)]
pub(crate) fn is_dirty(&self) -> bool {
Expand Down Expand Up @@ -298,6 +318,9 @@ impl ClientSession {
}
.into());
}
TransactionState::Committed { .. } => {
self.unpin_mongos(); // Unpin session if previous transaction is committed.
}
_ => {}
}
match self.client.transaction_support_status().await? {
Expand Down Expand Up @@ -472,7 +495,8 @@ impl ClientSession {
.as_ref()
.and_then(|options| options.write_concern.as_ref())
.cloned();
let abort_transaction = AbortTransaction::new(write_concern);
let selection_criteria = self.transaction.pinned_mongos.clone();
let abort_transaction = AbortTransaction::new(write_concern, selection_criteria);
self.transaction.abort();
// Errors returned from running an abortTransaction command should be ignored.
let _result = self
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl Command {
}
}

pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) {
self.body.insert("recoveryToken", recovery_token);
}

pub(crate) fn set_txn_number(&mut self, txn_number: i64) {
self.body.insert("txnNumber", txn_number);
}
Expand Down
3 changes: 1 addition & 2 deletions src/concern/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,7 @@ async fn snapshot_read_concern() {
.database(function_name!())
.collection::<Document>(function_name!());

// TODO RUST-122 run this test on sharded clusters
if client.is_replica_set() && client.server_version_gte(4, 0) {
if client.supports_transactions() {
let mut session = client.start_session(None).await.unwrap();
let options = TransactionOptions::builder()
.read_concern(ReadConcern::snapshot())
Expand Down
Loading