-
Notifications
You must be signed in to change notification settings - Fork 181
RUST-107 Convenient transactions #849
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
185d33d
5aa7cd5
708faa9
4249c71
5b229b8
31d644a
700228e
f69a75b
3b8cb66
89a709d
967afd4
67d8b45
fd34bec
a1c6e0a
f45a412
0db3d4d
6eb9e00
9e455a8
496211f
b1d33ee
73acfb6
c9ee2b7
f19a254
84fc205
17e3b37
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -110,6 +110,8 @@ pub struct ClientSession { | |
pub(crate) transaction: Transaction, | ||
pub(crate) snapshot_time: Option<Timestamp>, | ||
pub(crate) operation_time: Option<Timestamp>, | ||
#[cfg(test)] | ||
pub(crate) convenient_transaction_timeout: Option<Duration>, | ||
} | ||
|
||
#[derive(Debug)] | ||
|
@@ -216,6 +218,8 @@ impl ClientSession { | |
transaction: Default::default(), | ||
snapshot_time: None, | ||
operation_time: None, | ||
#[cfg(test)] | ||
convenient_transaction_timeout: None, | ||
} | ||
} | ||
|
||
|
@@ -561,13 +565,117 @@ impl ClientSession { | |
} | ||
} | ||
|
||
/// Starts a transaction, runs the given callback, and commits or aborts the transaction. | ||
/// Transient transaction errors will cause the callback or the commit to be retried; | ||
/// other errors will cause the transaction to be aborted and the error returned to the | ||
/// caller. If the callback needs to provide its own error information, the | ||
/// [`Error::custom`](crate::error::Error::custom) method can accept an arbitrary payload that | ||
/// can be retrieved via [`Error::get_custom`](crate::error::Error::get_custom). | ||
/// | ||
/// Because the callback can be repeatedly executed and because it returns a future, the rust | ||
/// closure borrowing rules for captured values can be overly restrictive. As a | ||
/// convenience, `with_transaction` accepts a context argument that will be passed to the | ||
/// callback along with the session: | ||
/// | ||
/// ```no_run | ||
/// # use mongodb::{bson::{doc, Document}, error::Result, Client}; | ||
/// # use futures::FutureExt; | ||
/// # async fn wrapper() -> Result<()> { | ||
/// # let client = Client::with_uri_str("mongodb://example.com").await?; | ||
/// # let mut session = client.start_session(None).await?; | ||
/// let coll = client.database("mydb").collection::<Document>("mycoll"); | ||
/// let my_data = "my data".to_string(); | ||
/// // This works: | ||
/// session.with_transaction( | ||
/// (&coll, &my_data), | ||
/// |session, (coll, my_data)| async move { | ||
/// coll.insert_one_with_session(doc! { "data": *my_data }, None, session).await | ||
/// }.boxed(), | ||
/// None, | ||
/// ).await?; | ||
/// /* This will not compile with a "variable moved due to use in generator" error: | ||
/// session.with_transaction( | ||
/// (), | ||
/// |session, _| async move { | ||
/// coll.insert_one_with_session(doc! { "data": my_data }, None, session).await | ||
/// }.boxed(), | ||
/// None, | ||
/// ).await?; | ||
/// */ | ||
/// # Ok(()) | ||
/// # } | ||
/// ``` | ||
pub async fn with_transaction<R, C, F>( | ||
&mut self, | ||
mut context: C, | ||
mut callback: F, | ||
options: impl Into<Option<TransactionOptions>>, | ||
) -> Result<R> | ||
where | ||
F: for<'a> FnMut(&'a mut ClientSession, &'a mut C) -> BoxFuture<'a, Result<R>>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a way we could rewrite this signature to avoid including There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, that's unfortunate. The type erasure is necessary - there's no way to express |
||
{ | ||
let options = options.into(); | ||
let timeout = Duration::from_secs(120); | ||
#[cfg(test)] | ||
let timeout = self.convenient_transaction_timeout.unwrap_or(timeout); | ||
let start = Instant::now(); | ||
|
||
use crate::error::{TRANSIENT_TRANSACTION_ERROR, UNKNOWN_TRANSACTION_COMMIT_RESULT}; | ||
|
||
'transaction: loop { | ||
self.start_transaction(options.clone()).await?; | ||
let ret = match callback(self, &mut context).await { | ||
Ok(v) => v, | ||
Err(e) => { | ||
if matches!( | ||
self.transaction.state, | ||
TransactionState::Starting | TransactionState::InProgress | ||
) { | ||
self.abort_transaction().await?; | ||
} | ||
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) && start.elapsed() < timeout { | ||
continue 'transaction; | ||
} | ||
return Err(e); | ||
} | ||
}; | ||
if matches!( | ||
self.transaction.state, | ||
TransactionState::None | ||
| TransactionState::Aborted | ||
| TransactionState::Committed { .. } | ||
) { | ||
return Ok(ret); | ||
} | ||
'commit: loop { | ||
match self.commit_transaction().await { | ||
Ok(()) => return Ok(ret), | ||
Err(e) => { | ||
if e.is_max_time_ms_expired_error() || start.elapsed() >= timeout { | ||
return Err(e); | ||
} | ||
if e.contains_label(UNKNOWN_TRANSACTION_COMMIT_RESULT) { | ||
continue 'commit; | ||
} | ||
if e.contains_label(TRANSIENT_TRANSACTION_ERROR) { | ||
continue 'transaction; | ||
} | ||
return Err(e); | ||
} | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn default_transaction_options(&self) -> Option<&TransactionOptions> { | ||
self.options | ||
.as_ref() | ||
.and_then(|options| options.default_transaction_options.as_ref()) | ||
} | ||
} | ||
|
||
pub type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>; | ||
|
||
struct DroppedClientSession { | ||
cluster_time: Option<ClusterTime>, | ||
server_session: ServerSession, | ||
|
@@ -590,6 +698,8 @@ impl From<DroppedClientSession> for ClientSession { | |
transaction: dropped_session.transaction, | ||
snapshot_time: dropped_session.snapshot_time, | ||
operation_time: dropped_session.operation_time, | ||
#[cfg(test)] | ||
convenient_transaction_timeout: None, | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll also need to add this method to the sync API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Unfortunately, I had to duplicate the body of the function rather than just wrapping the async one to avoid nested
block_on
calls from within the callback.