Skip to content

RUST-1857 Clean up names for multi-write errors #1102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions src/action/bulk_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;

use crate::{
bson::{Bson, Document},
error::{ClientBulkWriteError, Error, ErrorKind, Result},
error::{BulkWriteError, Error, ErrorKind, Result},
operation::bulk_write::BulkWrite as BulkWriteOperation,
options::{BulkWriteOptions, WriteConcern, WriteModel},
results::BulkWriteResult,
Expand Down Expand Up @@ -162,25 +162,24 @@ impl ExecutionStatus {
// partial result. Otherwise, create a new BulkWriteError with the existing results and
// set its source as the error that just occurred.
Self::Success(current_result) => match *error.kind {
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => {
ErrorKind::BulkWrite(ref mut bulk_write_error) => {
bulk_write_error.merge_partial_results(current_result);
Self::Error(error)
}
_ => {
let bulk_write_error: Error =
ErrorKind::ClientBulkWrite(ClientBulkWriteError {
write_errors: HashMap::new(),
write_concern_errors: Vec::new(),
partial_result: Some(current_result),
})
.into();
let bulk_write_error: Error = ErrorKind::BulkWrite(BulkWriteError {
write_errors: HashMap::new(),
write_concern_errors: Vec::new(),
partial_result: Some(current_result),
})
.into();
Self::Error(bulk_write_error.with_source(error))
}
},
// If the new error is a BulkWriteError, merge its contents with the existing error.
// Otherwise, set the new error as the existing error's source.
Self::Error(mut current_error) => match *error.kind {
ErrorKind::ClientBulkWrite(bulk_write_error) => {
ErrorKind::BulkWrite(bulk_write_error) => {
let current_bulk_write_error =
Self::get_current_bulk_write_error(&mut current_error);
current_bulk_write_error.merge(bulk_write_error);
Expand All @@ -195,9 +194,9 @@ impl ExecutionStatus {
/// Gets a BulkWriteError from a given Error. This method should only be called when adding a
/// new result or error to the existing state, as it requires that the given Error's kind is
/// ClientBulkWrite.
fn get_current_bulk_write_error(error: &mut Error) -> &mut ClientBulkWriteError {
fn get_current_bulk_write_error(error: &mut Error) -> &mut BulkWriteError {
match *error.kind {
ErrorKind::ClientBulkWrite(ref mut bulk_write_error) => bulk_write_error,
ErrorKind::BulkWrite(ref mut bulk_write_error) => bulk_write_error,
_ => unreachable!(),
}
}
Expand All @@ -208,7 +207,7 @@ impl ExecutionStatus {
match self {
Self::Error(ref error) => {
match *error.kind {
ErrorKind::ClientBulkWrite(ref bulk_write_error) => {
ErrorKind::BulkWrite(ref bulk_write_error) => {
// A top-level error is always fatal.
let top_level_error_occurred = error.source.is_some();
// A write error occurring during an ordered bulk write is fatal.
Expand Down
14 changes: 7 additions & 7 deletions src/action/insert_many.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Serialize;

use crate::{
coll::options::InsertManyOptions,
error::{BulkWriteError, BulkWriteFailure, Error, ErrorKind, Result},
error::{Error, ErrorKind, IndexedWriteError, InsertManyError, Result},
operation::Insert as Op,
options::WriteConcern,
results::InsertManyResult,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl<'a> Action for InsertMany<'a> {
.unwrap_or(true);
let encrypted = self.coll.client().should_auto_encrypt().await;

let mut cumulative_failure: Option<BulkWriteFailure> = None;
let mut cumulative_failure: Option<InsertManyError> = None;
let mut error_labels: HashSet<String> = Default::default();
let mut cumulative_result: Option<InsertManyResult> = None;

Expand Down Expand Up @@ -137,7 +137,7 @@ impl<'a> Action for InsertMany<'a> {
Err(e) => {
let labels = e.labels().clone();
match *e.kind {
ErrorKind::BulkWrite(bw) => {
ErrorKind::InsertMany(bw) => {
// for ordered inserts this size will be incorrect, but knowing the
// batch size isn't needed for ordered
// failures since we return immediately from
Expand All @@ -146,15 +146,15 @@ impl<'a> Action for InsertMany<'a> {
+ bw.write_errors.as_ref().map(|we| we.len()).unwrap_or(0);

let failure_ref =
cumulative_failure.get_or_insert_with(BulkWriteFailure::new);
cumulative_failure.get_or_insert_with(InsertManyError::new);
if let Some(write_errors) = bw.write_errors {
for err in write_errors {
let index = n_attempted + err.index;

failure_ref
.write_errors
.get_or_insert_with(Default::default)
.push(BulkWriteError { index, ..err });
.push(IndexedWriteError { index, ..err });
}
}

Expand All @@ -169,7 +169,7 @@ impl<'a> Action for InsertMany<'a> {
// above.
if let Some(failure) = cumulative_failure {
return Err(Error::new(
ErrorKind::BulkWrite(failure),
ErrorKind::InsertMany(failure),
Some(error_labels),
));
}
Expand All @@ -184,7 +184,7 @@ impl<'a> Action for InsertMany<'a> {

match cumulative_failure {
Some(failure) => Err(Error::new(
ErrorKind::BulkWrite(failure),
ErrorKind::InsertMany(failure),
Some(error_labels),
)),
None => Ok(cumulative_result.unwrap_or_else(InsertManyResult::new)),
Expand Down
4 changes: 2 additions & 2 deletions src/action/insert_one.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use serde::Serialize;

use crate::{
coll::options::{InsertManyOptions, InsertOneOptions},
error::{convert_bulk_errors, Result},
error::{convert_insert_many_error, Result},
operation::Insert as Op,
options::WriteConcern,
results::InsertOneResult,
Expand Down Expand Up @@ -100,6 +100,6 @@ impl<'a> Action for InsertOne<'a> {
.execute_operation(insert, self.session)
.await
.map(InsertOneResult::from_insert_many_result)
.map_err(convert_bulk_errors)
.map_err(convert_insert_many_error)
}
}
91 changes: 47 additions & 44 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
sdam::{ServerType, TopologyVersion},
};

pub use bulk_write::BulkWriteError as ClientBulkWriteError;
pub use bulk_write::BulkWriteError;

const RECOVERING_CODES: [i32; 5] = [11600, 11602, 13436, 189, 91];
const NOTWRITABLEPRIMARY_CODES: [i32; 3] = [10107, 13435, 10058];
Expand Down Expand Up @@ -195,8 +195,8 @@ impl Error {
fn is_write_concern_error(&self) -> bool {
match *self.kind {
ErrorKind::Write(WriteFailure::WriteConcernError(_)) => true,
ErrorKind::BulkWrite(ref bulk_write_error)
if bulk_write_error.write_concern_error.is_some() =>
ErrorKind::InsertMany(ref insert_many_error)
if insert_many_error.write_concern_error.is_some() =>
{
true
}
Expand Down Expand Up @@ -249,7 +249,7 @@ impl Error {
matches!(
self.kind.as_ref(),
ErrorKind::Authentication { .. }
| ErrorKind::BulkWrite(_)
| ErrorKind::InsertMany(_)
| ErrorKind::Command(_)
| ErrorKind::Write(_)
)
Expand Down Expand Up @@ -308,7 +308,7 @@ impl Error {
ErrorKind::Command(command_error) => Some(command_error.code),
// According to SDAM spec, write concern error codes MUST also be checked, and
// writeError codes MUST NOT be checked.
ErrorKind::BulkWrite(BulkWriteFailure {
ErrorKind::InsertMany(InsertManyError {
write_concern_error: Some(wc_error),
..
}) => Some(wc_error.code),
Expand All @@ -323,7 +323,7 @@ impl Error {
pub(crate) fn code(&self) -> Option<i32> {
match self.kind.as_ref() {
ErrorKind::Command(command_error) => Some(command_error.code),
ErrorKind::BulkWrite(BulkWriteFailure {
ErrorKind::InsertMany(InsertManyError {
write_concern_error: Some(wc_error),
..
}) => Some(wc_error.code),
Expand All @@ -334,15 +334,15 @@ impl Error {
}

/// Gets the message for this error, if applicable, for use in testing.
/// If this error is a BulkWriteError, the messages are concatenated.
/// If this error is an InsertManyError, the messages are concatenated.
#[cfg(test)]
pub(crate) fn message(&self) -> Option<String> {
match self.kind.as_ref() {
ErrorKind::Command(command_error) => Some(command_error.message.clone()),
// since this is used primarily for errorMessageContains assertions in the unified
// runner, we just concatenate all the relevant server messages into one for
// bulk errors.
ErrorKind::BulkWrite(BulkWriteFailure {
// insert many errors.
ErrorKind::InsertMany(InsertManyError {
write_concern_error,
write_errors,
inserted_ids: _,
Expand Down Expand Up @@ -382,7 +382,7 @@ impl Error {
WriteFailure::WriteConcernError(ref wce) => Some(wce.code_name.as_str()),
WriteFailure::WriteError(ref we) => we.code_name.as_deref(),
},
ErrorKind::BulkWrite(ref bwe) => bwe
ErrorKind::InsertMany(ref bwe) => bwe
.write_concern_error
.as_ref()
.map(|wce| wce.code_name.as_str()),
Expand Down Expand Up @@ -481,21 +481,21 @@ impl Error {
// This is intentionally written without a catch-all branch so that if new error
// kinds are added we remember to reason about whether they need to be redacted.
match *self.kind {
ErrorKind::BulkWrite(ref mut bwe) => {
if let Some(ref mut wes) = bwe.write_errors {
ErrorKind::InsertMany(ref mut insert_many_error) => {
if let Some(ref mut wes) = insert_many_error.write_errors {
for we in wes {
we.redact();
}
}
if let Some(ref mut wce) = bwe.write_concern_error {
if let Some(ref mut wce) = insert_many_error.write_concern_error {
wce.redact();
}
}
ErrorKind::ClientBulkWrite(ref mut client_bulk_write_error) => {
for write_concern_error in client_bulk_write_error.write_concern_errors.iter_mut() {
ErrorKind::BulkWrite(ref mut bulk_write_error) => {
for write_concern_error in bulk_write_error.write_concern_errors.iter_mut() {
write_concern_error.redact();
}
for (_, write_error) in client_bulk_write_error.write_errors.iter_mut() {
for (_, write_error) in bulk_write_error.write_errors.iter_mut() {
write_error.redact();
}
}
Expand Down Expand Up @@ -612,12 +612,13 @@ pub enum ErrorKind {
#[error("{0}")]
BsonSerialization(crate::bson::ser::Error),

/// An error occurred when trying to execute a write operation consisting of multiple writes.
#[error("An error occurred when trying to execute a write operation: {0:?}")]
BulkWrite(BulkWriteFailure),
/// An error occurred when trying to execute an [`insert_many`](crate::Collection::insert_many)
/// operation.
#[error("An error occurred when trying to execute an insert_many operation: {0:?}")]
InsertMany(InsertManyError),

#[error("An error occurred when executing Client::bulk_write: {0:?}")]
ClientBulkWrite(ClientBulkWriteError),
BulkWrite(BulkWriteError),

/// The server returned an error to an attempted operation.
#[error("Command failed: {0}")]
Expand Down Expand Up @@ -706,7 +707,7 @@ impl ErrorKind {
// TODO CLOUDP-105256 Remove this when Atlas Proxy error label behavior is fixed.
fn get_write_concern_error(&self) -> Option<&WriteConcernError> {
match self {
ErrorKind::BulkWrite(BulkWriteFailure {
ErrorKind::InsertMany(InsertManyError {
write_concern_error,
..
}) => write_concern_error.as_ref(),
Expand Down Expand Up @@ -825,11 +826,11 @@ impl WriteError {
}
}

/// An error that occurred during a write operation consisting of multiple writes that wasn't due to
/// being unable to satisfy a write concern.
/// An individual write error that occurred during an
/// [`insert_many`](crate::Collection::insert_many) operation.
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
#[non_exhaustive]
pub struct BulkWriteError {
pub struct IndexedWriteError {
/// Index into the list of operations that this error corresponds to.
#[serde(default)]
pub index: usize,
Expand All @@ -854,22 +855,23 @@ pub struct BulkWriteError {
pub details: Option<Document>,
}

impl BulkWriteError {
// If any new fields are added to BulkWriteError, this implementation must be updated to redact
impl IndexedWriteError {
// If any new fields are added to InsertError, this implementation must be updated to redact
// them per the CLAM spec.
fn redact(&mut self) {
self.message = "REDACTED".to_string();
self.details = None;
}
}

/// The set of errors that occurred during a write operation.
/// The set of errors that occurred during a call to
/// [`insert_many`](crate::Collection::insert_many).
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct BulkWriteFailure {
pub struct InsertManyError {
/// The error(s) that occurred on account of a non write concern failure.
pub write_errors: Option<Vec<BulkWriteError>>,
pub write_errors: Option<Vec<IndexedWriteError>>,

/// The error that occurred on account of write concern failure.
pub write_concern_error: Option<WriteConcernError>,
Expand All @@ -878,9 +880,9 @@ pub struct BulkWriteFailure {
pub(crate) inserted_ids: HashMap<usize, Bson>,
}

impl BulkWriteFailure {
impl InsertManyError {
pub(crate) fn new() -> Self {
BulkWriteFailure {
InsertManyError {
write_errors: None,
write_concern_error: None,
inserted_ids: Default::default(),
Expand All @@ -901,13 +903,13 @@ pub enum WriteFailure {
}

impl WriteFailure {
fn from_bulk_failure(bulk: BulkWriteFailure) -> Result<Self> {
if let Some(bulk_write_error) = bulk.write_errors.and_then(|es| es.into_iter().next()) {
fn from_insert_many_error(bulk: InsertManyError) -> Result<Self> {
if let Some(insert_error) = bulk.write_errors.and_then(|es| es.into_iter().next()) {
let write_error = WriteError {
code: bulk_write_error.code,
code_name: bulk_write_error.code_name,
message: bulk_write_error.message,
details: bulk_write_error.details,
code: insert_error.code,
code_name: insert_error.code_name,
message: insert_error.message,
details: insert_error.details,
};
Ok(WriteFailure::WriteError(write_error))
} else if let Some(wc_error) = bulk.write_concern_error {
Expand Down Expand Up @@ -993,14 +995,15 @@ pub enum GridFsFileIdentifier {
Id(Bson),
}

/// Translates ErrorKind::BulkWriteError cases to ErrorKind::WriteErrors, leaving all other errors
/// untouched.
pub(crate) fn convert_bulk_errors(error: Error) -> Error {
/// Translates ErrorKind::InsertMany to ErrorKind::Write, leaving all other errors untouched.
pub(crate) fn convert_insert_many_error(error: Error) -> Error {
match *error.kind {
ErrorKind::BulkWrite(bulk_failure) => match WriteFailure::from_bulk_failure(bulk_failure) {
Ok(failure) => Error::new(ErrorKind::Write(failure), Some(error.labels)),
Err(e) => e,
},
ErrorKind::InsertMany(insert_many_error) => {
match WriteFailure::from_insert_many_error(insert_many_error) {
Ok(failure) => Error::new(ErrorKind::Write(failure), Some(error.labels)),
Err(e) => e,
}
}
_ => error,
}
}
Expand Down
Loading