-
Notifications
You must be signed in to change notification settings - Fork 180
RUST-980 Run load balancer tests on evergreen, and update existing tests #477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cf9ecc6
80e2e31
0e100e3
958c968
0127054
c50f807
cebf3e9
2003f7a
fd61488
c784398
a90cfa5
3800554
2733cda
30d54bc
bd0d8c0
a86b03b
0f26622
aa462e9
f642c87
b833b88
5cfb434
a1afa9c
84a80cf
fbdc045
0b01012
da36f2b
6d37ff9
b67f08b
251e784
6ee9a6d
63e116e
71b545b
1b6a049
afe3a39
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,6 +143,8 @@ pub(crate) struct Handshaker { | |
/// given the same pool options, so it can be created at the time the Handshaker is created. | ||
command: Command, | ||
credential: Option<Credential>, | ||
#[cfg(test)] | ||
mock_service_id: bool, | ||
} | ||
|
||
impl Handshaker { | ||
|
@@ -154,6 +156,9 @@ impl Handshaker { | |
let mut command = | ||
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref())); | ||
|
||
#[cfg(test)] | ||
let mut mock_service_id = false; | ||
|
||
if let Some(options) = options { | ||
if let Some(app_name) = options.app_name { | ||
metadata.application = Some(AppMetadata { name: app_name }); | ||
|
@@ -185,13 +190,20 @@ impl Handshaker { | |
if options.load_balanced { | ||
command.body.insert("loadBalanced", true); | ||
} | ||
|
||
#[cfg(test)] | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (just out of curiosity) why does this statement need to be in its own scope? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applying attributes to individual statements turns out not to be available in stable Rust yet. |
||
mock_service_id = options.mock_service_id; | ||
} | ||
} | ||
|
||
command.body.insert("client", metadata); | ||
|
||
Self { | ||
command, | ||
credential, | ||
#[cfg(test)] | ||
mock_service_id, | ||
} | ||
} | ||
|
||
|
@@ -207,6 +219,24 @@ impl Handshaker { | |
let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?; | ||
|
||
let mut is_master_reply = run_is_master(conn, command, topology, handler).await?; | ||
// TODO PM-2369 Remove serviceId mocking when it's returned by the server. | ||
#[cfg(test)] | ||
{ | ||
if self.command.body.contains_key("loadBalanced") | ||
&& is_master_reply.command_response.service_id.is_none() | ||
&& self.mock_service_id | ||
{ | ||
is_master_reply.command_response.service_id = Some( | ||
is_master_reply | ||
.command_response | ||
.topology_version | ||
.as_ref() | ||
.unwrap() | ||
.get_object_id("processId") | ||
.unwrap(), | ||
); | ||
} | ||
} | ||
if self.command.body.contains_key("loadBalanced") | ||
&& is_master_reply.command_response.service_id.is_none() | ||
{ | ||
|
@@ -256,6 +286,8 @@ pub(crate) struct HandshakerOptions { | |
driver_info: Option<DriverInfo>, | ||
server_api: Option<ServerApi>, | ||
load_balanced: bool, | ||
#[cfg(test)] | ||
mock_service_id: bool, | ||
} | ||
|
||
impl From<ConnectionPoolOptions> for HandshakerOptions { | ||
|
@@ -266,6 +298,8 @@ impl From<ConnectionPoolOptions> for HandshakerOptions { | |
driver_info: options.driver_info, | ||
server_api: options.server_api, | ||
load_balanced: options.load_balanced.unwrap_or(false), | ||
#[cfg(test)] | ||
mock_service_id: options.mock_service_id, | ||
} | ||
} | ||
} | ||
|
@@ -278,6 +312,8 @@ impl From<ClientOptions> for HandshakerOptions { | |
driver_info: options.driver_info, | ||
server_api: options.server_api, | ||
load_balanced: options.load_balanced.unwrap_or(false), | ||
#[cfg(test)] | ||
mock_service_id: options.test_options.map_or(false, |to| to.mock_service_id), | ||
} | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -78,6 +78,10 @@ async fn concurrent_connections() { | |
let _guard = LOCK.run_exclusively().await; | ||
|
||
let mut options = CLIENT_OPTIONS.clone(); | ||
if options.load_balanced.unwrap_or(false) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
println!("skipping concurrent_connections test due to load-balanced topology"); | ||
return; | ||
} | ||
options.direct_connection = Some(true); | ||
options.hosts.drain(1..); | ||
|
||
|
@@ -163,6 +167,12 @@ async fn connection_error_during_establishment() { | |
let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await; | ||
|
||
let mut client_options = CLIENT_OPTIONS.clone(); | ||
if client_options.load_balanced.unwrap_or(false) { | ||
println!( | ||
"skipping connection_error_during_establishment test due to load-balanced topology" | ||
); | ||
return; | ||
} | ||
client_options.heartbeat_freq = Duration::from_secs(300).into(); // high so that monitors dont trip failpoint | ||
client_options.hosts.drain(1..); | ||
client_options.direct_connection = Some(true); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -174,7 +174,6 @@ impl ConnectionPoolWorker { | |
let (connection_requester, request_receiver) = | ||
connection_requester::channel(address.clone(), handle); | ||
let (manager, management_receiver) = manager::channel(); | ||
let (generation_publisher, generation_subscriber) = status::channel(); | ||
|
||
let is_load_balanced = options | ||
.as_ref() | ||
|
@@ -185,6 +184,7 @@ impl ConnectionPoolWorker { | |
} else { | ||
PoolGeneration::normal() | ||
}; | ||
let (generation_publisher, generation_subscriber) = status::channel(generation.clone()); | ||
|
||
#[cfg(test)] | ||
let mut state = if options | ||
|
@@ -513,7 +513,7 @@ impl ConnectionPoolWorker { | |
handler.handle_pool_cleared_event(event); | ||
}); | ||
|
||
if !self.generation.is_load_balanced() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I swear that some part of the spec said that the queue shouldn't be drained in load-balancing mode, but I was apparently hallucinating that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did this draining behavior cause tests to fail? Intuitively, it makes sense to not drain in LB mode, since these requests will probably go to a working service rather than the one that caused a clear. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep - https://github.com/mongodb/mongo-rust-driver/blob/master/src/test/spec/retryable_reads.rs#L74 failed (the second checkout succeeds rather than fails), and the load balancer spec specifically calls out retryable reads tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah okay, so that test is specifically verifying the pool paused behavior, which shouldn't apply to load balancers. The spec doesn't currently clarify this so I filed DRIVERS-1942 to update it. In the meantime, I think we can preserve the existing behavior of not clearing the WaitQueue on pool clear if in load balanced mode and just skip that individual test. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated - needed to skip the equivalent test for retryable writes as well. |
||
if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) { | ||
for request in self.wait_queue.drain(..) { | ||
// an error means the other end hung up already, which is okay because we were | ||
// returning an error anyways | ||
|
@@ -754,10 +754,6 @@ impl PoolGeneration { | |
Self::LoadBalanced(HashMap::new()) | ||
} | ||
|
||
fn is_load_balanced(&self) -> bool { | ||
matches!(self, Self::LoadBalanced(_)) | ||
} | ||
|
||
#[cfg(test)] | ||
pub(crate) fn as_normal(&self) -> Option<u32> { | ||
match self { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't needed for functionality, but it's very handy for debugging.