-
Notifications
You must be signed in to change notification settings - Fork 178
RUST-1713 Bulk Write #1034
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
RUST-1713 Bulk Write #1034
Changes from all commits
Commits
Show all changes
80 commits
Select commit
Hold shift + click to select a range
34b1579
api shell
isabelatkinson 8e387ab
basic command building and outcome testing
isabelatkinson c4ffe01
add results
isabelatkinson fd4232d
top level options support and testing
isabelatkinson 28d1bb6
impl and refactor
isabelatkinson 42a2ae4
add server responses file
isabelatkinson 6588a5e
add batching prose tests
isabelatkinson 3a84805
update tests
isabelatkinson 3a9d970
rebase fixes
isabelatkinson 1c8fbc2
resync retryable writes/transactions
isabelatkinson 84203d7
fix transactions test typo
isabelatkinson a491642
rustdoc workaround
isabelatkinson 47a5975
resync tests
isabelatkinson 29938fa
remove extra file
isabelatkinson 040d807
fix msrv failure
isabelatkinson d42fe0e
reorg
isabelatkinson ad7aa44
fix files
isabelatkinson 321b64b
abraham comments
isabelatkinson 31d1c9e
add network errors test
isabelatkinson 199163f
upserted id changes
isabelatkinson 77daa8e
rename write concern error message
isabelatkinson 1193a5c
add cursor iteration test
isabelatkinson d62beda
sync retryability tests
isabelatkinson 427ec51
write concern error prose test, refactor fail point
isabelatkinson a1375a8
resync retryable writes
isabelatkinson 6d2b8f2
sync bypassdocumentvalidation test
isabelatkinson b80ac98
add partial result check
isabelatkinson d566c99
prose test updates
isabelatkinson 37e0a73
Merge branch 'main' into bulk-write-merge
isabelatkinson 02c54d2
rework cursor test
isabelatkinson 8f0dd5e
add pipeline tests
isabelatkinson 6fd1a7a
validate update and replacement documents
isabelatkinson 927e9c5
simplify check
isabelatkinson a2ffd7a
allow empty replaces
isabelatkinson 7b11bf0
add failed iteration test
isabelatkinson 6f26067
empty models test
isabelatkinson 04f06bd
write concern
isabelatkinson 4a07b61
more write concern tests
isabelatkinson 37852df
add retryWrites:false test
isabelatkinson 733cfb9
remove OperationResponse
isabelatkinson de46b4a
use pinned connection for cursor
isabelatkinson acbc534
reduce _id size
isabelatkinson 20ce39d
fix fle
isabelatkinson e82caf2
Merge branch 'main' into bulk-write
isabelatkinson cfd0c3e
strip extra mongoses
isabelatkinson dbe4352
rework iteration tests
isabelatkinson a5612cd
more assertions
isabelatkinson fe6de0a
skip and sync files
isabelatkinson 356248a
remove method
isabelatkinson ecb8973
fix integer casts
isabelatkinson 36ba52c
skip transaction test on standalone
isabelatkinson 41194ab
don't use multiple mongoses
isabelatkinson 0f2a523
update transactions tests
isabelatkinson 956729e
add file
isabelatkinson c943b89
transaction test, change error expectations
isabelatkinson 0cb9676
fail point improvements
isabelatkinson a2c52e8
add file
isabelatkinson ecd7d7e
Merge branch 'main' into bulk-write
isabelatkinson cef01ed
small cleanup
isabelatkinson 5ee0bd6
add namespace batching test
isabelatkinson 9ad9e88
retry getMore
isabelatkinson f99b736
rework batch splitting sizes
isabelatkinson 0e89378
Revert "retry getMore"
isabelatkinson a815a75
add ns size batching test
isabelatkinson ad579fe
different ns info batching test
isabelatkinson 72ee8c5
too large test
isabelatkinson 7fcec2d
skip tests
isabelatkinson e23d49c
rework namespace splitting test
isabelatkinson d648462
Merge branch 'main' into bulk-write
isabelatkinson c598745
error for fle
isabelatkinson 98eb33c
sync retryable writes
isabelatkinson 808a64a
add encryption test
isabelatkinson 3c5a936
sync transaction tests
isabelatkinson fbc6651
minor: bump clippy to 1.78.0
isabelatkinson c086020
kevin comments
isabelatkinson 11dba57
update operation methods
isabelatkinson 81bde91
RUST-1921 Sign crate on release (#1095)
abr-egn d425f73
RUST-1945 Add a `with_type` method to the `Aggregate` action (#1100)
isabelatkinson 5173765
Merge branch 'main' into bulk-write
isabelatkinson 8956a55
Merge branch 'main' into bulk-write
isabelatkinson File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,227 @@ | ||
#![allow(missing_docs)] | ||
|
||
use std::collections::HashMap; | ||
|
||
use crate::{ | ||
bson::{Bson, Document}, | ||
error::{ClientBulkWriteError, Error, ErrorKind, Result}, | ||
operation::bulk_write::BulkWrite as BulkWriteOperation, | ||
options::{BulkWriteOptions, WriteConcern, WriteModel}, | ||
results::BulkWriteResult, | ||
Client, | ||
ClientSession, | ||
}; | ||
|
||
use super::{action_impl, option_setters}; | ||
|
||
impl Client { | ||
pub fn bulk_write(&self, models: impl IntoIterator<Item = WriteModel>) -> BulkWrite { | ||
BulkWrite::new(self, models.into_iter().collect()) | ||
} | ||
} | ||
|
||
#[must_use] | ||
pub struct BulkWrite<'a> { | ||
client: &'a Client, | ||
models: Vec<WriteModel>, | ||
options: Option<BulkWriteOptions>, | ||
session: Option<&'a mut ClientSession>, | ||
} | ||
|
||
impl<'a> BulkWrite<'a> { | ||
option_setters!(options: BulkWriteOptions; | ||
ordered: bool, | ||
bypass_document_validation: bool, | ||
comment: Bson, | ||
let_vars: Document, | ||
verbose_results: bool, | ||
write_concern: WriteConcern, | ||
); | ||
|
||
pub fn session(mut self, session: &'a mut ClientSession) -> BulkWrite<'a> { | ||
self.session = Some(session); | ||
self | ||
} | ||
|
||
fn new(client: &'a Client, models: Vec<WriteModel>) -> Self { | ||
Self { | ||
client, | ||
models, | ||
options: None, | ||
session: None, | ||
} | ||
} | ||
|
||
fn is_ordered(&self) -> bool { | ||
self.options | ||
.as_ref() | ||
.and_then(|options| options.ordered) | ||
.unwrap_or(true) | ||
} | ||
} | ||
|
||
#[action_impl] | ||
impl<'a> Action for BulkWrite<'a> { | ||
type Future = BulkWriteFuture; | ||
|
||
async fn execute(mut self) -> Result<BulkWriteResult> { | ||
#[cfg(feature = "in-use-encryption-unstable")] | ||
if self.client.should_auto_encrypt().await { | ||
use mongocrypt::error::{Error as EncryptionError, ErrorKind as EncryptionErrorKind}; | ||
|
||
let error = EncryptionError { | ||
kind: EncryptionErrorKind::Client, | ||
code: None, | ||
message: Some( | ||
"bulkWrite does not currently support automatic encryption".to_string(), | ||
), | ||
}; | ||
return Err(ErrorKind::Encryption(error).into()); | ||
} | ||
|
||
resolve_write_concern_with_session!( | ||
self.client, | ||
self.options, | ||
self.session.as_deref_mut() | ||
)?; | ||
|
||
let mut total_attempted = 0; | ||
let mut execution_status = ExecutionStatus::None; | ||
|
||
while total_attempted < self.models.len() | ||
&& execution_status.should_continue(self.is_ordered()) | ||
{ | ||
let mut operation = BulkWriteOperation::new( | ||
self.client.clone(), | ||
&self.models[total_attempted..], | ||
total_attempted, | ||
self.options.as_ref(), | ||
) | ||
.await; | ||
let result = self | ||
.client | ||
.execute_operation::<BulkWriteOperation>( | ||
&mut operation, | ||
self.session.as_deref_mut(), | ||
) | ||
.await; | ||
total_attempted += operation.n_attempted; | ||
|
||
match result { | ||
Ok(result) => { | ||
execution_status = execution_status.with_success(result); | ||
} | ||
Err(error) => { | ||
execution_status = execution_status.with_failure(error); | ||
} | ||
} | ||
} | ||
|
||
match execution_status { | ||
ExecutionStatus::Success(bulk_write_result) => Ok(bulk_write_result), | ||
ExecutionStatus::Error(error) => Err(error), | ||
ExecutionStatus::None => Err(ErrorKind::InvalidArgument { | ||
message: "bulk_write must be provided at least one write operation".into(), | ||
} | ||
.into()), | ||
} | ||
} | ||
} | ||
|
||
/// Represents the execution status of a bulk write. The status starts at `None`, indicating that no | ||
/// writes have been attempted yet, and transitions to either `Success` or `Error` as batches are | ||
/// executed. The contents of `Error` can be inspected to determine whether a bulk write can | ||
/// continue with further batches or should be terminated. | ||
enum ExecutionStatus { | ||
Success(BulkWriteResult), | ||
Error(Error), | ||
None, | ||
} | ||
|
||
impl ExecutionStatus { | ||
fn with_success(mut self, result: BulkWriteResult) -> Self { | ||
match self { | ||
// Merge two successful sets of results together. | ||
Self::Success(ref mut current_result) => { | ||
current_result.merge(result); | ||
self | ||
} | ||
// Merge the results of the new batch into the existing bulk write error. | ||
Self::Error(ref mut current_error) => { | ||
let bulk_write_error = Self::get_current_bulk_write_error(current_error); | ||
bulk_write_error.merge_partial_results(result); | ||
self | ||
} | ||
Self::None => Self::Success(result), | ||
} | ||
} | ||
|
||
fn with_failure(self, mut error: Error) -> Self { | ||
match self { | ||
// If the new error is a BulkWriteError, merge the successful results into the error's | ||
// partial result. Otherwise, create a new BulkWriteError with the existing results and | ||
// set its source as the error that just occurred. | ||
abr-egn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Self::Success(current_result) => match *error.kind { | ||
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => { | ||
bulk_write_error.merge_partial_results(current_result); | ||
Self::Error(error) | ||
} | ||
_ => { | ||
let bulk_write_error: Error = | ||
ErrorKind::ClientBulkWrite(ClientBulkWriteError { | ||
write_errors: HashMap::new(), | ||
write_concern_errors: Vec::new(), | ||
partial_result: Some(current_result), | ||
}) | ||
.into(); | ||
Self::Error(bulk_write_error.with_source(error)) | ||
} | ||
}, | ||
// If the new error is a BulkWriteError, merge its contents with the existing error. | ||
// Otherwise, set the new error as the existing error's source. | ||
Self::Error(mut current_error) => match *error.kind { | ||
ErrorKind::ClientBulkWrite(bulk_write_error) => { | ||
let current_bulk_write_error = | ||
Self::get_current_bulk_write_error(&mut current_error); | ||
current_bulk_write_error.merge(bulk_write_error); | ||
Self::Error(current_error) | ||
} | ||
_ => Self::Error(current_error.with_source(error)), | ||
}, | ||
Self::None => Self::Error(error), | ||
} | ||
} | ||
|
||
/// Gets a BulkWriteError from a given Error. This method should only be called when adding a | ||
/// new result or error to the existing state, as it requires that the given Error's kind is | ||
/// ClientBulkWrite. | ||
fn get_current_bulk_write_error(error: &mut Error) -> &mut ClientBulkWriteError { | ||
match *error.kind { | ||
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => bulk_write_error, | ||
_ => unreachable!(), | ||
} | ||
} | ||
|
||
/// Whether further bulk write batches should be executed based on the current status of | ||
/// execution. | ||
fn should_continue(&self, ordered: bool) -> bool { | ||
match self { | ||
Self::Error(ref error) => { | ||
match *error.kind { | ||
ErrorKind::ClientBulkWrite(ref bulk_write_error) => { | ||
// A top-level error is always fatal. | ||
let top_level_error_occurred = error.source.is_some(); | ||
// A write error occurring during an ordered bulk write is fatal. | ||
let terminal_write_error_occurred = | ||
ordered && !bulk_write_error.write_errors.is_empty(); | ||
|
||
!top_level_error_occurred && !terminal_write_error_occurred | ||
} | ||
// A top-level error is always fatal. | ||
_ => false, | ||
} | ||
} | ||
_ => true, | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.