Skip to content

RUST-1842 Update prose tests for mongos deprioritization during retryable ops #1397

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ impl Client {
.and_then(|s| s.transaction.pinned_mongos())
.or_else(|| op.selection_criteria());

if op.name() == "insert" || op.name() == "find" {
let first_server = retry.as_ref().map(|r| &r.first_server);
dbg!("deprioritized: {}", &first_server);
}

let (server, effective_criteria) = match self
.select_server(
selection_criteria,
Expand Down
27 changes: 27 additions & 0 deletions src/sdam/description/topology/server_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,33 @@ pub(crate) fn attempt_to_select_server<'a>(
deprioritized: Option<&ServerAddress>,
) -> Result<Option<SelectedServer>> {
let mut in_window = topology_description.suitable_servers_in_latency_window(criteria)?;
dbg!("length of in_window before filter: {}", in_window.len());
for server_desc in in_window.clone() {
if let Some(server) = servers.get(&server_desc.address) {
dbg!("[Before filter] Server address: {}", &server.address);
}
}
if let Some(addr) = deprioritized {
if in_window.len() > 1 {
in_window.retain(|d| &d.address != addr);
}
}
dbg!("length of in_window after filter: {}", in_window.len());
for server_desc in in_window.clone() {
if let Some(server) = servers.get(&server_desc.address) {
dbg!("[After filter] Server address: {}", &server.address);
}
}
let in_window_servers = in_window
.into_iter()
.flat_map(|desc| servers.get(&desc.address))
.collect();
let selected = select_server_in_latency_window(in_window_servers);
if let Some(server) = selected.clone() {
dbg!("Selected server address: {}", &server.address);
} else {
dbg!("No server was selected.");
}
Ok(selected.map(SelectedServer::new))
}

Expand Down Expand Up @@ -133,8 +150,18 @@ impl TopologyDescription {
.collect(),
};

dbg!(
"suitable servers before latency window filter: {}",
suitable_servers.len()
);

self.retain_servers_within_latency_window(&mut suitable_servers);

dbg!(
"suitable servers after latency window filter: {}",
suitable_servers.len()
);

Ok(suitable_servers)
}

Expand Down
55 changes: 38 additions & 17 deletions src/test/spec/retryable_reads.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::IntoFuture, time::Duration};
use std::{future::IntoFuture, sync::Arc, time::Duration};

use crate::bson::doc;

Expand All @@ -8,6 +8,7 @@ use crate::{
cmap::{CmapEvent, ConnectionCheckoutFailedReason},
command::CommandEvent,
},
options::SelectionCriteria,
runtime::{self, AsyncJoinHandle},
test::{
block_connection_supported,
Expand Down Expand Up @@ -173,24 +174,25 @@ async fn retry_read_different_mongos() {
}
client_options.hosts.drain(2..);
client_options.retry_reads = Some(true);
dbg!("\nstart retry_read_different_mongos");

let mut guards = vec![];
for ix in [0, 1] {
let mut opts = client_options.clone();
opts.hosts.remove(ix);
opts.direct_connection = Some(true);
let client = Client::for_test().options(opts).await;
let hosts = client_options.hosts.clone();
let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;

let mut guards = Vec::new();
for address in hosts {
let address = address.clone();
let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1))
.error_code(6)
.close_connection(true);
.selection_criteria(SelectionCriteria::Predicate(Arc::new(move |info| {
info.description.address == address
})));
guards.push(client.enable_fail_point(fail_point).await.unwrap());
}

let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;
let result = client
.database("test")
.collection::<crate::bson::Document>("retry_read_different_mongos")
Expand All @@ -211,6 +213,15 @@ async fn retry_read_different_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_failed().unwrap();
let second_address = &second_failed.connection.address;
assert_ne!(
first_address, second_address,
"Failed commands did not occur on two different mongos instances"
);
dbg!("end retry_read_different_mongos\n");

drop(guards); // enforce lifetime
}
Expand All @@ -226,21 +237,21 @@ async fn retry_read_same_mongos() {
log_uncaptured("skipping retry_read_same_mongos: requires sharded cluster");
return;
}
dbg!("\nstart retry_read_same_mongos");

let mut client_options = get_client_options().await.clone();
client_options.hosts.drain(1..);
client_options.retry_reads = Some(true);
let fp_guard = {
let mut client_options = client_options.clone();
client_options.direct_connection = Some(true);
let client = Client::for_test().options(client_options).await;
let s0 = Client::for_test().options(client_options).await;

let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1))
.error_code(6)
.close_connection(true);
client.enable_fail_point(fail_point).await.unwrap()
let fail_point = FailPoint::fail_command(&["find"], FailPointMode::Times(1)).error_code(6);
s0.enable_fail_point(fail_point).await.unwrap()
};

client_options.direct_connection = Some(false);
let client = Client::for_test()
.options(client_options)
.monitor_events()
Expand All @@ -265,6 +276,16 @@ async fn retry_read_same_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_succeeded().unwrap();
let second_address = &second_failed.connection.address;
assert_eq!(
first_address, second_address,
"Failed command and retry did not occur on the same mongos instance",
);

dbg!("end retry_read_same_mongos\n");

drop(fp_guard); // enforce lifetime
}
51 changes: 35 additions & 16 deletions src/test/spec/retryable_writes.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{sync::Arc, time::Duration};

use crate::bson::Bson;
use crate::{bson::Bson, options::SelectionCriteria};
use tokio::sync::Mutex;

use crate::{
Expand Down Expand Up @@ -319,25 +319,25 @@ async fn retry_write_different_mongos() {
}
client_options.hosts.drain(2..);
client_options.retry_writes = Some(true);
let hosts = client_options.hosts.clone();
dbg!("\nstart retry_write_different_mongos");
let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;

let mut guards = vec![];
for ix in [0, 1] {
let mut opts = client_options.clone();
opts.hosts.remove(ix);
opts.direct_connection = Some(true);
let client = Client::for_test().options(opts).await;

let mut guards = Vec::new();
for address in hosts {
let address = address.clone();
let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1))
.error_code(6)
.error_labels(vec![RETRYABLE_WRITE_ERROR])
.close_connection(true);
.error_labels([RETRYABLE_WRITE_ERROR])
.selection_criteria(SelectionCriteria::Predicate(Arc::new(move |info| {
info.description.address == address
})));
guards.push(client.enable_fail_point(fail_point).await.unwrap());
}

let client = Client::for_test()
.options(client_options)
.monitor_events()
.await;
let result = client
.database("test")
.collection::<crate::bson::Document>("retry_write_different_mongos")
Expand All @@ -358,6 +358,15 @@ async fn retry_write_different_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_failed().unwrap();
let second_address = &second_failed.connection.address;
assert_ne!(
first_address, second_address,
"Failed commands did not occur on two different mongos instances"
);
dbg!("end retry_write_different_mongos\n");

drop(guards); // enforce lifetime
}
Expand All @@ -374,6 +383,7 @@ async fn retry_write_same_mongos() {
return;
}

dbg!("\nstart retry_write_same_mongos");
let mut client_options = get_client_options().await.clone();
client_options.hosts.drain(1..);
client_options.retry_writes = Some(true);
Expand All @@ -384,11 +394,11 @@ async fn retry_write_same_mongos() {

let fail_point = FailPoint::fail_command(&["insert"], FailPointMode::Times(1))
.error_code(6)
.error_labels(vec![RETRYABLE_WRITE_ERROR])
.close_connection(true);
.error_labels(vec![RETRYABLE_WRITE_ERROR]);
client.enable_fail_point(fail_point).await.unwrap()
};

client_options.direct_connection = Some(false);
let client = Client::for_test()
.options(client_options)
.monitor_events()
Expand All @@ -413,6 +423,15 @@ async fn retry_write_same_mongos() {
"unexpected events: {:#?}",
events,
);
let first_failed = events[1].as_command_failed().unwrap();
let first_address = &first_failed.connection.address;
let second_failed = events[3].as_command_succeeded().unwrap();
let second_address = &second_failed.connection.address;
assert_eq!(
first_address, second_address,
"Failed commands did not occur on the same mongos instance",
);
dbg!("end retry_write_same_mongos\n");

drop(fp_guard); // enforce lifetime
}
9 changes: 8 additions & 1 deletion src/test/util/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
bson::doc,
event::{
cmap::CmapEvent,
command::{CommandEvent, CommandSucceededEvent},
command::{CommandEvent, CommandFailedEvent, CommandSucceededEvent},
sdam::SdamEvent,
},
test::get_client_options,
Expand Down Expand Up @@ -101,6 +101,13 @@ impl CommandEvent {
_ => None,
}
}

pub(crate) fn as_command_failed(&self) -> Option<&CommandFailedEvent> {
match self {
CommandEvent::Failed(e) => Some(e),
_ => None,
}
}
}

#[derive(Clone, Debug)]
Expand Down