Skip to content

RUST-97 Support sharded transactions recovery token #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ impl Client {
}

let stream_description = connection.stream_description()?;
let is_sharded = stream_description.initial_server_type == ServerType::Mongos;
let mut cmd = op.build(stream_description)?;
self.inner
.topology
Expand Down Expand Up @@ -328,15 +329,22 @@ impl Client {
cmd.set_start_transaction();
cmd.set_autocommit();
cmd.set_txn_read_concern(*session)?;
if stream_description.initial_server_type == ServerType::Mongos {
if is_sharded {
session.pin_mongos(connection.address().clone());
}
session.transaction.state = TransactionState::InProgress;
}
TransactionState::InProgress
| TransactionState::Committed { .. }
| TransactionState::Aborted => {
TransactionState::InProgress => cmd.set_autocommit(),
TransactionState::Committed { .. } | TransactionState::Aborted => {
cmd.set_autocommit();

// Append the recovery token to the command if we are committing or aborting
// on a sharded transaction.
if is_sharded {
if let Some(ref recovery_token) = session.transaction.recovery_token {
cmd.set_recovery_token(recovery_token);
}
}
}
_ => {}
}
Expand Down Expand Up @@ -403,6 +411,9 @@ impl Client {
Ok(r) => {
self.update_cluster_time(&r, session).await;
if r.is_success() {
// Retrieve recovery token from successful response.
Client::update_recovery_token(is_sharded, &r, session).await;

Ok(CommandResult {
raw: response,
deserialized: r.into_body(),
Expand Down Expand Up @@ -447,7 +458,15 @@ impl Client {
}))
}
// for ok: 1 just return the original deserialization error.
_ => Err(deserialize_error),
_ => {
Client::update_recovery_token(
is_sharded,
&error_response,
session,
)
.await;
Err(deserialize_error)
}
}
}
// We failed to deserialize even that, so just return the original
Expand Down Expand Up @@ -626,6 +645,18 @@ impl Client {
}
}
}

async fn update_recovery_token<T: Response>(
is_sharded: bool,
response: &T,
session: &mut Option<&mut ClientSession>,
) {
if let Some(ref mut session) = session {
if is_sharded && session.in_transaction() {
session.transaction.recovery_token = response.recovery_token().cloned();
}
}
}
}

impl Error {
Expand Down
4 changes: 4 additions & 0 deletions src/client/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,14 @@ pub(crate) struct Transaction {
pub(crate) state: TransactionState,
pub(crate) options: Option<TransactionOptions>,
pub(crate) pinned_mongos: Option<SelectionCriteria>,
pub(crate) recovery_token: Option<Document>,
}

impl Transaction {
pub(crate) fn start(&mut self, options: Option<TransactionOptions>) {
self.state = TransactionState::Starting;
self.options = options;
self.recovery_token = None;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment necessary? Similarly to pinned_mongos I believe this field should be None by default upon the first transaction and reset to None after previous transactions have finished.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. From the spec:

Drivers MUST clear a session's cached recoveryToken when transitioning to the "no transaction" or "starting transaction" state.

So, I'm clearing the recoveryToken on Transaction::start and Transaction::reset

}

pub(crate) fn commit(&mut self, data_committed: bool) {
Expand All @@ -140,6 +142,7 @@ impl Transaction {
self.state = TransactionState::None;
self.options = None;
self.pinned_mongos = None;
self.recovery_token = None;
}
}

Expand All @@ -149,6 +152,7 @@ impl Default for Transaction {
state: TransactionState::None,
options: None,
pinned_mongos: None,
recovery_token: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ impl Command {
}
}

pub(crate) fn set_recovery_token(&mut self, recovery_token: &Document) {
self.body.insert("recoveryToken", recovery_token);
}

pub(crate) fn set_txn_number(&mut self, txn_number: i64) {
self.body.insert("txnNumber", txn_number);
}
Expand Down
13 changes: 13 additions & 0 deletions src/operation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,9 @@ pub(crate) trait Response: Sized {
/// The `atClusterTime` field of the response.
fn at_cluster_time(&self) -> Option<Timestamp>;

/// The `recoveryToken` field of the response.
fn recovery_token(&self) -> Option<&Document>;

/// Convert into the body of the response.
fn into_body(self) -> Self::Body;
}
Expand All @@ -168,6 +171,8 @@ pub(crate) struct CommandResponse<T> {

pub(crate) at_cluster_time: Option<Timestamp>,

pub(crate) recovery_token: Option<Document>,

#[serde(flatten)]
pub(crate) body: T,
}
Expand Down Expand Up @@ -197,6 +202,10 @@ impl<T: DeserializeOwned> Response for CommandResponse<T> {
self.at_cluster_time
}

fn recovery_token(&self) -> Option<&Document> {
self.recovery_token.as_ref()
}

fn into_body(self) -> Self::Body {
self.body
}
Expand Down Expand Up @@ -229,6 +238,10 @@ impl<T: DeserializeOwned> Response for CursorResponse<T> {
self.response.body.cursor.at_cluster_time
}

fn recovery_token(&self) -> Option<&Document> {
self.response.recovery_token()
}

fn into_body(self) -> Self::Body {
self.response.body
}
Expand Down
13 changes: 12 additions & 1 deletion src/operation/run_command/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl Operation for RunCommand {
pub(crate) struct Response {
doc: Document,
cluster_time: Option<ClusterTime>,
recovery_token: Option<Document>,
}

impl super::Response for Response {
Expand All @@ -109,7 +110,13 @@ impl super::Response for Response {
.ok()
.and_then(|doc| bson::from_document(doc.clone()).ok());

Ok(Self { doc, cluster_time })
let recovery_token = doc.get_document("recoveryToken").ok().cloned();

Ok(Self {
doc,
cluster_time,
recovery_token,
})
}

fn ok(&self) -> Option<&Bson> {
Expand All @@ -131,6 +138,10 @@ impl super::Response for Response {
.ok()
}

fn recovery_token(&self) -> Option<&Document> {
self.recovery_token.as_ref()
}

fn into_body(self) -> Self::Body {
self.doc
}
Expand Down
Loading