Skip to content

Commit 214ed2c

Browse files
authored
RUST-979 Non-unified spec tests for load balancer support behavior (#473)
1 parent 8755788 commit 214ed2c

File tree

107 files changed

+1114
-267
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

107 files changed

+1114
-267
lines changed

src/client/options/mod.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ use strsim::jaro_winkler;
3737
use typed_builder::TypedBuilder;
3838
use webpki_roots::TLS_SERVER_ROOTS;
3939

40+
#[cfg(test)]
41+
use crate::srv::LookupHosts;
4042
use crate::{
4143
bson::{doc, Bson, Document},
4244
bson_util,
@@ -543,20 +545,30 @@ pub struct ClientOptions {
543545
#[serde(skip)]
544546
pub(crate) resolver_config: Option<ResolverConfig>,
545547

546-
/// Used by tests to override MIN_HEARTBEAT_FREQUENCY.
547-
#[builder(default)]
548-
#[cfg(test)]
549-
pub(crate) heartbeat_freq_test: Option<Duration>,
548+
/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
549+
#[builder(default, setter(skip))]
550+
#[serde(rename = "loadbalanced")]
551+
pub(crate) load_balanced: Option<bool>,
550552

551-
/// Allow use of the `load_balanced` option.
552-
// TODO RUST-653 Remove this when load balancer work is ready for release.
553+
/// Control test behavior of the client.
554+
#[cfg(test)]
553555
#[builder(default, setter(skip))]
554556
#[serde(skip)]
555-
pub(crate) allow_load_balanced: bool,
557+
#[derivative(PartialEq = "ignore")]
558+
pub(crate) test_options: Option<TestOptions>,
559+
}
556560

557-
/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
558-
#[builder(default, setter(skip))]
559-
pub(crate) load_balanced: Option<bool>,
561+
#[cfg(test)]
562+
#[derive(Debug, Clone, Default)]
563+
pub(crate) struct TestOptions {
564+
/// Override MIN_HEARTBEAT_FREQUENCY.
565+
pub(crate) heartbeat_freq: Option<Duration>,
566+
567+
/// Disable server and SRV-polling monitor threads.
568+
pub(crate) disable_monitoring_threads: bool,
569+
570+
/// Mock response for `SrvPollingMonitor::lookup_hosts`.
571+
pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,
560572
}
561573

562574
fn default_hosts() -> Vec<ServerAddress> {
@@ -629,6 +641,8 @@ impl Serialize for ClientOptions {
629641
writeconcern: &'a Option<WriteConcern>,
630642

631643
zlibcompressionlevel: &'a Option<i32>,
644+
645+
loadbalanced: &'a Option<bool>,
632646
}
633647

634648
let client_options = ClientOptionsHelper {
@@ -652,6 +666,7 @@ impl Serialize for ClientOptions {
652666
tls: &self.tls,
653667
writeconcern: &self.write_concern,
654668
zlibcompressionlevel: &self.zlib_compression,
669+
loadbalanced: &self.load_balanced,
655670
};
656671

657672
client_options.serialize(serializer)
@@ -921,11 +936,10 @@ impl From<ClientOptionsParser> for ClientOptions {
921936
original_uri: Some(parser.original_uri),
922937
resolver_config: None,
923938
server_api: None,
924-
#[cfg(test)]
925-
heartbeat_freq_test: None,
926-
allow_load_balanced: false,
927939
load_balanced: parser.load_balanced,
928940
sdam_event_handler: None,
941+
#[cfg(test)]
942+
test_options: None,
929943
}
930944
}
931945
}
@@ -1132,7 +1146,9 @@ impl ClientOptions {
11321146
write_concern.validate()?;
11331147
}
11341148

1135-
if !self.allow_load_balanced && self.load_balanced.is_some() {
1149+
// Disallow use of load-balanced configurations in non-test code.
1150+
// TODO RUST-653 Remove this when load balancer work is ready for release.
1151+
if !cfg!(test) && self.load_balanced.is_some() {
11361152
return Err(ErrorKind::InvalidArgument {
11371153
message: "loadBalanced is not supported".to_string(),
11381154
}
@@ -1200,6 +1216,11 @@ impl ClientOptions {
12001216
]
12011217
);
12021218
}
1219+
1220+
#[cfg(test)]
1221+
pub(crate) fn test_options_mut(&mut self) -> &mut TestOptions {
1222+
self.test_options.get_or_insert_with(Default::default)
1223+
}
12031224
}
12041225

12051226
/// Splits a string into a section before a given index and a section exclusively after the index.

src/sdam/description/topology/server_selection/test/logic.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,21 @@ async fn server_selection_unknown() {
150150
.await;
151151
}
152152

153+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
154+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
155+
async fn server_selection_load_balanced() {
156+
run_spec_test(
157+
&[
158+
"server-selection",
159+
"server_selection",
160+
"LoadBalanced",
161+
"read",
162+
],
163+
run_test,
164+
)
165+
.await;
166+
}
167+
153168
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
154169
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
155170
async fn max_staleness_replica_set_no_primary() {

src/sdam/description/topology/server_selection/test/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ enum TestServerType {
134134
RsOther,
135135
#[serde(rename = "RSGhost")]
136136
RsGhost,
137+
LoadBalancer,
137138
Unknown,
138139
PossiblePrimary,
139140
}
@@ -148,6 +149,7 @@ impl TestServerType {
148149
TestServerType::RsArbiter => Some(ServerType::RsArbiter),
149150
TestServerType::RsOther => Some(ServerType::RsOther),
150151
TestServerType::RsGhost => Some(ServerType::RsGhost),
152+
TestServerType::LoadBalancer => Some(ServerType::LoadBalancer),
151153
TestServerType::Unknown => Some(ServerType::Unknown),
152154
TestServerType::PossiblePrimary => None,
153155
}

src/sdam/description/topology/test/event.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl PartialEq<TestServerDescription> for ServerDescription {
8181
&& other.passives.is_empty()
8282
&& other.primary.is_none()
8383
&& other.set_name.is_none()
84-
&& other.server_type == ServerType::Unknown
84+
&& other.server_type == self.server_type
8585
}
8686
}
8787
}

src/sdam/description/topology/test/sdam.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ fn server_type_from_str(s: &str) -> Option<ServerType> {
225225
"RSArbiter" => ServerType::RsArbiter,
226226
"RSOther" => ServerType::RsOther,
227227
"RSGhost" => ServerType::RsGhost,
228+
"LoadBalancer" => ServerType::LoadBalancer,
228229
"Unknown" | "PossiblePrimary" => ServerType::Unknown,
229230
_ => return None,
230231
};
@@ -241,20 +242,15 @@ async fn run_test(test_file: TestFile) {
241242
return;
242243
}
243244

244-
// TODO RUST-653 unskip load balancer test
245-
if test_description.contains("load balancer") {
246-
println!("Skipping {} (RUST-653)", test_description);
247-
return;
248-
}
249-
250245
let mut options = ClientOptions::parse_uri(&test_file.uri, None)
251246
.await
252247
.expect(test_description);
253248

254249
let handler = Arc::new(EventHandler::new());
255250
options.sdam_event_handler = Some(handler.clone());
251+
options.test_options_mut().disable_monitoring_threads = true;
256252

257-
let topology = Topology::new_mocked(options.clone());
253+
let topology = Topology::new(options.clone()).unwrap();
258254
let mut servers = topology.get_servers().await;
259255

260256
for (i, phase) in test_file.phases.into_iter().enumerate() {
@@ -351,7 +347,13 @@ async fn run_test(test_file: TestFile) {
351347
}
352348
Outcome::Events(EventsOutcome { events: expected }) => {
353349
let actual = handler.get_all_sdam_events();
354-
assert_eq!(actual.len(), expected.len());
350+
assert_eq!(
351+
actual.len(),
352+
expected.len(),
353+
"event list length mismatch:\n actual: {:#?}, expected: {:#?}",
354+
actual,
355+
expected
356+
);
355357
for (actual, expected) in actual.iter().zip(expected.iter()) {
356358
assert_eq!(
357359
actual, expected,
@@ -522,6 +524,16 @@ async fn monitoring() {
522524
run_spec_test(&["server-discovery-and-monitoring", "monitoring"], run_test).await;
523525
}
524526

527+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
528+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
529+
async fn load_balanced() {
530+
run_spec_test(
531+
&["server-discovery-and-monitoring", "load-balanced"],
532+
run_test,
533+
)
534+
.await;
535+
}
536+
525537
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
526538
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
527539
#[function_name::named]
@@ -676,10 +688,11 @@ async fn direct_connection() {
676688
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
677689
async fn pool_cleared_error_does_not_mark_unknown() {
678690
let address = ServerAddress::parse("a:1234").unwrap();
679-
let options = ClientOptions::builder()
691+
let mut options = ClientOptions::builder()
680692
.hosts(vec![address.clone()])
681693
.build();
682-
let topology = Topology::new_mocked(options);
694+
options.test_options_mut().disable_monitoring_threads = true;
695+
let topology = Topology::new(options).unwrap();
683696

684697
// get the one server in the topology
685698
let server = topology

src/sdam/monitor.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,9 @@ impl HeartbeatMonitor {
139139
#[cfg(test)]
140140
let min_frequency = self
141141
.client_options
142-
.heartbeat_freq_test
142+
.test_options
143+
.as_ref()
144+
.and_then(|to| to.heartbeat_freq)
143145
.unwrap_or(MIN_HEARTBEAT_FREQUENCY);
144146

145147
#[cfg(not(test))]

src/sdam/srv_polling/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,15 @@ impl SrvPollingMonitor {
112112
}
113113

114114
async fn lookup_hosts(&mut self) -> Result<LookupHosts> {
115+
#[cfg(test)]
116+
if let Some(mock) = self
117+
.client_options
118+
.test_options
119+
.as_ref()
120+
.and_then(|to| to.mock_lookup_hosts.as_ref())
121+
{
122+
return mock.clone();
123+
}
115124
let initial_hostname = self.initial_hostname.clone();
116125
let resolver = self.get_or_create_srv_resolver().await?;
117126
resolver.get_srv_hosts(initial_hostname.as_str()).await

src/sdam/srv_polling/test.rs

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use crate::{
77
error::Result,
88
options::{ClientOptions, ServerAddress},
99
sdam::Topology,
10+
RUNTIME,
1011
};
1112

1213
fn localhost_test_build_10gen(port: u16) -> ServerAddress {
@@ -26,22 +27,23 @@ lazy_static::lazy_static! {
2627
async fn run_test(new_hosts: Result<Vec<ServerAddress>>, expected_hosts: HashSet<ServerAddress>) {
2728
let mut options = ClientOptions::new_srv();
2829
options.hosts = DEFAULT_HOSTS.clone();
29-
let topology = Topology::new_mocked(options);
30+
options.test_options_mut().disable_monitoring_threads = true;
31+
let topology = Topology::new(options).unwrap();
3032
let mut monitor = SrvPollingMonitor::new(topology.downgrade()).unwrap();
31-
3233
monitor
33-
.update_hosts(
34-
new_hosts.map(|hosts| LookupHosts {
35-
hosts: hosts.into_iter().map(Result::Ok).collect(),
36-
min_ttl: Duration::from_secs(60),
37-
}),
38-
topology.clone(),
39-
)
34+
.update_hosts(new_hosts.and_then(make_lookup_hosts), topology.clone())
4035
.await;
4136

4237
assert_eq!(expected_hosts, topology.servers().await);
4338
}
4439

40+
fn make_lookup_hosts(hosts: Vec<ServerAddress>) -> Result<LookupHosts> {
41+
Ok(LookupHosts {
42+
hosts: hosts.into_iter().map(Result::Ok).collect(),
43+
min_ttl: Duration::from_secs(60),
44+
})
45+
}
46+
4547
// If a new DNS record is returned, it should be reflected in the topology.
4648
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
4749
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
@@ -102,3 +104,25 @@ async fn timeout_error() {
102104
async fn no_results() {
103105
run_test(Ok(Vec::new()), DEFAULT_HOSTS.iter().cloned().collect()).await;
104106
}
107+
108+
// SRV polling is not done for load-balanced clusters (as per spec at
109+
// https://github.com/mongodb/specifications/blob/master/source/polling-srv-records-for-mongos-discovery/tests/README.rst#test-that-srv-polling-is-not-done-for-load-balalanced-clusters).
110+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
111+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
112+
async fn load_balanced_no_srv_polling() {
113+
let hosts = vec![localhost_test_build_10gen(27017)];
114+
let mut options = ClientOptions::new_srv();
115+
let rescan_interval = options.original_srv_info.as_ref().cloned().unwrap().min_ttl;
116+
options.hosts = hosts.clone();
117+
options.load_balanced = Some(true);
118+
options.test_options_mut().mock_lookup_hosts = Some(make_lookup_hosts(vec![
119+
localhost_test_build_10gen(27017),
120+
localhost_test_build_10gen(27018),
121+
]));
122+
let topology = Topology::new(options).unwrap();
123+
RUNTIME.delay_for(rescan_interval * 2).await;
124+
assert_eq!(
125+
hosts.into_iter().collect::<HashSet<_>>(),
126+
topology.servers().await
127+
);
128+
}

0 commit comments

Comments
 (0)