Skip to content

RUST-1433 Propagate original error for some labeled retry errors #903

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,9 +410,10 @@ impl Client {
drop(server);

if let Some(r) = retry {
if err.is_server_error()
if (err.is_server_error()
|| err.is_read_retryable()
|| err.is_write_retryable()
|| err.is_write_retryable())
&& !err.contains_label("NoWritesPerformed")
{
return Err(err);
} else {
Expand Down Expand Up @@ -606,7 +607,8 @@ impl Client {
connection: connection_info.clone(),
service_id,
})
});
})
.await;

let start_time = Instant::now();
let command_result = match connection.send_raw_command(raw_cmd, request_id).await {
Expand Down Expand Up @@ -706,7 +708,8 @@ impl Client {
connection: connection_info.clone(),
service_id,
})
});
})
.await;

if let Some(ref mut session) = session {
if err.is_network_error() {
Expand Down Expand Up @@ -735,7 +738,8 @@ impl Client {
connection: connection_info.clone(),
service_id,
})
});
})
.await;

#[cfg(feature = "in-use-encryption-unstable")]
let response = {
Expand Down
42 changes: 37 additions & 5 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,16 +218,42 @@ impl Client {
.map_or(false, |cs| cs.exec().has_mongocryptd_client())
}

fn test_command_event_channel(&self) -> Option<&options::TestEventSender> {
#[cfg(test)]
{
self.inner
.options
.test_options
.as_ref()
.and_then(|t| t.async_event_listener.as_ref())
}
#[cfg(not(test))]
{
None
}
}

#[cfg(not(feature = "tracing-unstable"))]
pub(crate) fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
if let Some(ref handler) = self.inner.options.command_event_handler {
let event = generate_event();
pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
let handler = self.inner.options.command_event_handler.as_ref();
let test_channel = self.test_command_event_channel();
if handler.is_none() && test_channel.is_none() {
return;
}

let event = generate_event();
if let Some(tx) = test_channel {
let (msg, ack) = crate::runtime::AcknowledgedMessage::package(event.clone());
let _ = tx.send(msg).await;
ack.wait_for_acknowledgment().await;
}
if let Some(handler) = handler {
handle_command_event(handler.as_ref(), event);
}
}

#[cfg(feature = "tracing-unstable")]
pub(crate) fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
pub(crate) async fn emit_command_event(&self, generate_event: impl FnOnce() -> CommandEvent) {
let tracing_emitter = if trace_or_log_enabled!(
target: COMMAND_TRACING_EVENT_TARGET,
TracingOrLogLevel::Debug
Expand All @@ -240,11 +266,17 @@ impl Client {
None
};
let apm_event_handler = self.inner.options.command_event_handler.as_ref();
if !(tracing_emitter.is_some() || apm_event_handler.is_some()) {
let test_channel = self.test_command_event_channel();
if !(tracing_emitter.is_some() || apm_event_handler.is_some() || test_channel.is_some()) {
return;
}

let event = generate_event();
if let Some(tx) = test_channel {
let (msg, ack) = crate::runtime::AcknowledgedMessage::package(event.clone());
let _ = tx.send(msg).await;
ack.wait_for_acknowledgment().await;
}
if let (Some(event_handler), Some(ref tracing_emitter)) =
(apm_event_handler, &tracing_emitter)
{
Expand Down
7 changes: 7 additions & 0 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,8 +587,15 @@ pub(crate) struct TestOptions {

/// Mock response for `SrvPollingMonitor::lookup_hosts`.
pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,

/// Async-capable command event listener.
pub(crate) async_event_listener: Option<TestEventSender>,
}

pub(crate) type TestEventSender = tokio::sync::mpsc::Sender<
crate::runtime::AcknowledgedMessage<crate::event::command::CommandEvent>,
>;

fn default_hosts() -> Vec<ServerAddress> {
vec![ServerAddress::default()]
}
Expand Down
100 changes: 97 additions & 3 deletions src/test/spec/retryable_writes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ mod test_file;

use std::{sync::Arc, time::Duration};

use bson::Bson;
use futures::stream::TryStreamExt;
use semver::VersionReq;
use tokio::sync::{RwLockReadGuard, RwLockWriteGuard};
use tokio::sync::{Mutex, RwLockReadGuard, RwLockWriteGuard};

use test_file::{TestFile, TestResult};

Expand All @@ -13,11 +14,11 @@ use crate::{
error::{ErrorKind, Result, RETRYABLE_WRITE_ERROR},
event::{
cmap::{CmapEvent, CmapEventHandler, ConnectionCheckoutFailedReason},
command::CommandEventHandler,
command::{CommandEvent, CommandEventHandler},
},
options::{ClientOptions, FindOptions, InsertManyOptions},
runtime,
runtime::AsyncJoinHandle,
runtime::{spawn, AcknowledgedMessage, AsyncJoinHandle},
sdam::MIN_HEARTBEAT_FREQUENCY,
test::{
assert_matches,
Expand All @@ -35,6 +36,7 @@ use crate::{
CLIENT_OPTIONS,
LOCK,
},
Client,
};

#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
Expand Down Expand Up @@ -499,3 +501,95 @@ async fn retry_write_pool_cleared() {

assert_eq!(handler.get_command_started_events(&["insert"]).len(), 3);
}

/// Prose test from retryable writes spec verifying that the original error is returned after
/// encountering a WriteConcernError with a RetryableWriteError label.
#[cfg_attr(feature = "tokio-runtime", tokio::test(flavor = "multi_thread"))]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn retry_write_retryable_write_error() {
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;

let mut client_options = CLIENT_OPTIONS.get().await.clone();
client_options.retry_writes = Some(true);
let (event_tx, event_rx) = tokio::sync::mpsc::channel::<AcknowledgedMessage<CommandEvent>>(1);
// The listener needs to be active on client startup, but also needs a handle to the client
// itself for the trigger action.
let listener_client: Arc<Mutex<Option<TestClient>>> = Arc::new(Mutex::new(None));
// Set up event listener
let (fp_tx, mut fp_rx) = tokio::sync::mpsc::unbounded_channel();
{
let client = listener_client.clone();
let mut event_rx = event_rx;
let fp_tx = fp_tx.clone();
// Spawn a task to watch the event channel
spawn(async move {
while let Some(msg) = event_rx.recv().await {
if let CommandEvent::Succeeded(ev) = &*msg {
if let Some(Bson::Document(wc_err)) = ev.reply.get("writeConcernError") {
if ev.command_name == "insert" && wc_err.get_i32("code") == Ok(91) {
// Spawn a new task so events continue to process
let client = client.clone();
let fp_tx = fp_tx.clone();
spawn(async move {
// Enable the failpoint.
let fp_guard = {
let client = client.lock().await;
FailPoint::fail_command(
&["insert"],
FailPointMode::Times(1),
FailCommandOptions::builder()
.error_code(10107)
.error_labels(vec![
"RetryableWriteError".to_string(),
"NoWritesPerformed".to_string(),
])
.build(),
)
.enable(client.as_ref().unwrap(), None)
.await
.unwrap()
};
fp_tx.send(fp_guard).unwrap();
// Defer acknowledging the message until the failpoint has been set
// up so the retry hits it.
msg.acknowledge(());
});
}
}
}
}
});
}
client_options.test_options_mut().async_event_listener = Some(event_tx);
let client = Client::test_builder().options(client_options).build().await;
*listener_client.lock().await = Some(client.clone());

if !client.is_replica_set() || client.server_version_lt(6, 0) {
log_uncaptured("skipping retry_write_retryable_write_error: invalid topology");
return;
}

let _fp_guard = FailPoint::fail_command(
&["insert"],
FailPointMode::Times(1),
FailCommandOptions::builder()
.write_concern_error(doc! {
"code": 91,
"errorLabels": ["RetryableWriteError"],
})
.build(),
)
.enable(&client, None)
.await
.unwrap();

let result = client
.database("test")
.collection::<Document>("test")
.insert_one(doc! { "hello": "there" }, None)
.await;
assert_eq!(result.unwrap_err().code(), Some(91));

// Consume failpoint guard.
let _ = fp_rx.recv().await;
}
3 changes: 2 additions & 1 deletion src/test/util/failpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ impl FailPoint {
}
}

#[derive(Debug)]
pub struct FailPointGuard {
client: Client,
failpoint_name: String,
Expand Down Expand Up @@ -103,7 +104,7 @@ pub enum FailPointMode {
}

#[serde_with::skip_serializing_none]
#[derive(Debug, TypedBuilder, Serialize)]
#[derive(Debug, Default, TypedBuilder, Serialize)]
#[builder(field_defaults(default, setter(into)))]
#[serde(rename_all = "camelCase")]
pub struct FailCommandOptions {
Expand Down