Skip to content

RUST-979 Non-unified spec tests for load balancer support behavior #473

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Sep 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ use strsim::jaro_winkler;
use typed_builder::TypedBuilder;
use webpki_roots::TLS_SERVER_ROOTS;

#[cfg(test)]
use crate::srv::LookupHosts;
use crate::{
bson::{doc, Bson, Document},
bson_util,
Expand Down Expand Up @@ -543,20 +545,30 @@ pub struct ClientOptions {
#[serde(skip)]
pub(crate) resolver_config: Option<ResolverConfig>,

/// Used by tests to override MIN_HEARTBEAT_FREQUENCY.
#[builder(default)]
#[cfg(test)]
pub(crate) heartbeat_freq_test: Option<Duration>,
/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
#[builder(default, setter(skip))]
#[serde(rename = "loadbalanced")]
pub(crate) load_balanced: Option<bool>,

/// Allow use of the `load_balanced` option.
// TODO RUST-653 Remove this when load balancer work is ready for release.
/// Control test behavior of the client.
#[cfg(test)]
#[builder(default, setter(skip))]
#[serde(skip)]
pub(crate) allow_load_balanced: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular gate key turned out to be more hassle than it was worth - it meant that attempting to parse options from test json would fail without jumping through a lot of hoops. The options validation now enforces that the load_balanced option is test-only, which seems a reasonable degree of protection.

#[derivative(PartialEq = "ignore")]
pub(crate) test_options: Option<TestOptions>,
}

/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
#[builder(default, setter(skip))]
pub(crate) load_balanced: Option<bool>,
#[cfg(test)]
#[derive(Debug, Clone, Default)]
pub(crate) struct TestOptions {
/// Override MIN_HEARTBEAT_FREQUENCY.
pub(crate) heartbeat_freq: Option<Duration>,

/// Disable server and SRV-polling monitor threads.
pub(crate) disable_monitoring_threads: bool,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously known as Topology::new_mocked.


/// Mock response for `SrvPollingMonitor::lookup_hosts`.
pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,
}

fn default_hosts() -> Vec<ServerAddress> {
Expand Down Expand Up @@ -629,6 +641,8 @@ impl Serialize for ClientOptions {
writeconcern: &'a Option<WriteConcern>,

zlibcompressionlevel: &'a Option<i32>,

loadbalanced: &'a Option<bool>,
}

let client_options = ClientOptionsHelper {
Expand All @@ -652,6 +666,7 @@ impl Serialize for ClientOptions {
tls: &self.tls,
writeconcern: &self.write_concern,
zlibcompressionlevel: &self.zlib_compression,
loadbalanced: &self.load_balanced,
};

client_options.serialize(serializer)
Expand Down Expand Up @@ -921,11 +936,10 @@ impl From<ClientOptionsParser> for ClientOptions {
original_uri: Some(parser.original_uri),
resolver_config: None,
server_api: None,
#[cfg(test)]
heartbeat_freq_test: None,
allow_load_balanced: false,
load_balanced: parser.load_balanced,
sdam_event_handler: None,
#[cfg(test)]
test_options: None,
}
}
}
Expand Down Expand Up @@ -1132,7 +1146,9 @@ impl ClientOptions {
write_concern.validate()?;
}

if !self.allow_load_balanced && self.load_balanced.is_some() {
// Disallow use of load-balanced configurations in non-test code.
// TODO RUST-653 Remove this when load balancer work is ready for release.
if !cfg!(test) && self.load_balanced.is_some() {
return Err(ErrorKind::InvalidArgument {
message: "loadBalanced is not supported".to_string(),
}
Expand Down Expand Up @@ -1200,6 +1216,11 @@ impl ClientOptions {
]
);
}

#[cfg(test)]
pub(crate) fn test_options_mut(&mut self) -> &mut TestOptions {
self.test_options.get_or_insert_with(Default::default)
}
}

/// Splits a string into a section before a given index and a section exclusively after the index.
Expand Down
15 changes: 15 additions & 0 deletions src/sdam/description/topology/server_selection/test/logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,21 @@ async fn server_selection_unknown() {
.await;
}

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn server_selection_load_balanced() {
run_spec_test(
&[
"server-selection",
"server_selection",
"LoadBalanced",
"read",
],
run_test,
)
.await;
}

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn max_staleness_replica_set_no_primary() {
Expand Down
2 changes: 2 additions & 0 deletions src/sdam/description/topology/server_selection/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ enum TestServerType {
RsOther,
#[serde(rename = "RSGhost")]
RsGhost,
LoadBalancer,
Unknown,
PossiblePrimary,
}
Expand All @@ -148,6 +149,7 @@ impl TestServerType {
TestServerType::RsArbiter => Some(ServerType::RsArbiter),
TestServerType::RsOther => Some(ServerType::RsOther),
TestServerType::RsGhost => Some(ServerType::RsGhost),
TestServerType::LoadBalancer => Some(ServerType::LoadBalancer),
TestServerType::Unknown => Some(ServerType::Unknown),
TestServerType::PossiblePrimary => None,
}
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/description/topology/test/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl PartialEq<TestServerDescription> for ServerDescription {
&& other.passives.is_empty()
&& other.primary.is_none()
&& other.set_name.is_none()
&& other.server_type == ServerType::Unknown
&& other.server_type == self.server_type
}
}
}
Expand Down
33 changes: 23 additions & 10 deletions src/sdam/description/topology/test/sdam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ fn server_type_from_str(s: &str) -> Option<ServerType> {
"RSArbiter" => ServerType::RsArbiter,
"RSOther" => ServerType::RsOther,
"RSGhost" => ServerType::RsGhost,
"LoadBalancer" => ServerType::LoadBalancer,
"Unknown" | "PossiblePrimary" => ServerType::Unknown,
_ => return None,
};
Expand All @@ -241,20 +242,15 @@ async fn run_test(test_file: TestFile) {
return;
}

// TODO RUST-653 unskip load balancer test
if test_description.contains("load balancer") {
println!("Skipping {} (RUST-653)", test_description);
return;
}

let mut options = ClientOptions::parse_uri(&test_file.uri, None)
.await
.expect(test_description);

let handler = Arc::new(EventHandler::new());
options.sdam_event_handler = Some(handler.clone());
options.test_options_mut().disable_monitoring_threads = true;

let topology = Topology::new_mocked(options.clone());
let topology = Topology::new(options.clone()).unwrap();
let mut servers = topology.get_servers().await;

for (i, phase) in test_file.phases.into_iter().enumerate() {
Expand Down Expand Up @@ -351,7 +347,13 @@ async fn run_test(test_file: TestFile) {
}
Outcome::Events(EventsOutcome { events: expected }) => {
let actual = handler.get_all_sdam_events();
assert_eq!(actual.len(), expected.len());
assert_eq!(
actual.len(),
expected.len(),
"event list length mismatch:\n actual: {:#?}, expected: {:#?}",
actual,
expected
);
for (actual, expected) in actual.iter().zip(expected.iter()) {
assert_eq!(
actual, expected,
Expand Down Expand Up @@ -522,6 +524,16 @@ async fn monitoring() {
run_spec_test(&["server-discovery-and-monitoring", "monitoring"], run_test).await;
}

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn load_balanced() {
run_spec_test(
&["server-discovery-and-monitoring", "load-balanced"],
run_test,
)
.await;
}

#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
#[function_name::named]
Expand Down Expand Up @@ -676,10 +688,11 @@ async fn direct_connection() {
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn pool_cleared_error_does_not_mark_unknown() {
let address = ServerAddress::parse("a:1234").unwrap();
let options = ClientOptions::builder()
let mut options = ClientOptions::builder()
.hosts(vec![address.clone()])
.build();
let topology = Topology::new_mocked(options);
options.test_options_mut().disable_monitoring_threads = true;
let topology = Topology::new(options).unwrap();

// get the one server in the topology
let server = topology
Expand Down
4 changes: 3 additions & 1 deletion src/sdam/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ impl HeartbeatMonitor {
#[cfg(test)]
let min_frequency = self
.client_options
.heartbeat_freq_test
.test_options
.as_ref()
.and_then(|to| to.heartbeat_freq)
.unwrap_or(MIN_HEARTBEAT_FREQUENCY);

#[cfg(not(test))]
Expand Down
9 changes: 9 additions & 0 deletions src/sdam/srv_polling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ impl SrvPollingMonitor {
}

async fn lookup_hosts(&mut self) -> Result<LookupHosts> {
#[cfg(test)]
if let Some(mock) = self
.client_options
.test_options
.as_ref()
.and_then(|to| to.mock_lookup_hosts.as_ref())
{
return mock.clone();
}
let initial_hostname = self.initial_hostname.clone();
let resolver = self.get_or_create_srv_resolver().await?;
resolver.get_srv_hosts(initial_hostname.as_str()).await
Expand Down
42 changes: 33 additions & 9 deletions src/sdam/srv_polling/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::{
error::Result,
options::{ClientOptions, ServerAddress},
sdam::Topology,
RUNTIME,
};

fn localhost_test_build_10gen(port: u16) -> ServerAddress {
Expand All @@ -26,22 +27,23 @@ lazy_static::lazy_static! {
async fn run_test(new_hosts: Result<Vec<ServerAddress>>, expected_hosts: HashSet<ServerAddress>) {
let mut options = ClientOptions::new_srv();
options.hosts = DEFAULT_HOSTS.clone();
let topology = Topology::new_mocked(options);
options.test_options_mut().disable_monitoring_threads = true;
let topology = Topology::new(options).unwrap();
let mut monitor = SrvPollingMonitor::new(topology.downgrade()).unwrap();

monitor
.update_hosts(
new_hosts.map(|hosts| LookupHosts {
hosts: hosts.into_iter().map(Result::Ok).collect(),
min_ttl: Duration::from_secs(60),
}),
topology.clone(),
)
.update_hosts(new_hosts.and_then(make_lookup_hosts), topology.clone())
.await;

assert_eq!(expected_hosts, topology.servers().await);
}

fn make_lookup_hosts(hosts: Vec<ServerAddress>) -> Result<LookupHosts> {
Ok(LookupHosts {
hosts: hosts.into_iter().map(Result::Ok).collect(),
min_ttl: Duration::from_secs(60),
})
}

// If a new DNS record is returned, it should be reflected in the topology.
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
Expand Down Expand Up @@ -102,3 +104,25 @@ async fn timeout_error() {
async fn no_results() {
run_test(Ok(Vec::new()), DEFAULT_HOSTS.iter().cloned().collect()).await;
}

// SRV polling is not done for load-balanced clusters (as per spec at
// https://github.com/mongodb/specifications/blob/master/source/polling-srv-records-for-mongos-discovery/tests/README.rst#test-that-srv-polling-is-not-done-for-load-balalanced-clusters).
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
async fn load_balanced_no_srv_polling() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has a different form than the other tests because those directly fire server updates, skipping the rest of the srv-polling machinery, whereas this one needs to test that the dns polling doesn't happen at all.

let hosts = vec![localhost_test_build_10gen(27017)];
let mut options = ClientOptions::new_srv();
let rescan_interval = options.original_srv_info.as_ref().cloned().unwrap().min_ttl;
options.hosts = hosts.clone();
options.load_balanced = Some(true);
options.test_options_mut().mock_lookup_hosts = Some(make_lookup_hosts(vec![
localhost_test_build_10gen(27017),
localhost_test_build_10gen(27018),
]));
let topology = Topology::new(options).unwrap();
RUNTIME.delay_for(rescan_interval * 2).await;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unfortunately long, but also mandated by the spec :(

assert_eq!(
hosts.into_iter().collect::<HashSet<_>>(),
topology.servers().await
);
}
Loading