Skip to content

RUST-980 Run load balancer tests on evergreen, and update existing tests #477

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
cf9ecc6
update env var names
abr-egn Sep 27, 2021
80e2e31
apply tls opts to MONGOS_LB_URI
abr-egn Sep 27, 2021
0e100e3
synthesize latency
abr-egn Sep 27, 2021
958c968
mock serviceId
abr-egn Sep 28, 2021
0127054
rustfmt
abr-egn Sep 28, 2021
c50f807
skip topology update for load balanced
abr-egn Sep 28, 2021
cebf3e9
skip tests using direct connections on load-balanced topologies
abr-egn Sep 28, 2021
2003f7a
properly initialize connection pool generation
abr-egn Sep 28, 2021
fd61488
set mock_service_id for global CLIENT_OPTIONS
abr-egn Sep 28, 2021
c784398
skip heartbeat_events test
abr-egn Sep 28, 2021
a90cfa5
preserve pool manager for pinned connections
abr-egn Sep 28, 2021
3800554
another place to set mock_service_id
abr-egn Sep 28, 2021
2733cda
skip versioned api examples
abr-egn Sep 28, 2021
30d54bc
always clear the waitqueue
abr-egn Sep 29, 2021
bd0d8c0
transactions
abr-egn Sep 29, 2021
a86b03b
rustfmt
abr-egn Sep 29, 2021
0f26622
clippy
abr-egn Sep 29, 2021
aa462e9
retryable-reads test updates
abr-egn Sep 29, 2021
f642c87
retryable-write test updates
abr-egn Sep 29, 2021
b833b88
evergreen config
abr-egn Sep 29, 2021
5cfb434
undo generate-uri.sh change
abr-egn Sep 29, 2021
a1afa9c
fix newline
abr-egn Sep 29, 2021
84a80cf
use lb uri for multiple mongoses in more places
abr-egn Sep 29, 2021
fbdc045
rustfmt
abr-egn Sep 29, 2021
0b01012
clippy
abr-egn Sep 29, 2021
da36f2b
evergreen fixes
abr-egn Sep 30, 2021
6d37ff9
more evergreen fixes
abr-egn Sep 30, 2021
b67f08b
evergreeeeeen
abr-egn Sep 30, 2021
251e784
attempt to exclude load_balancer tests on windows/arm
abr-egn Sep 30, 2021
6ee9a6d
typo
abr-egn Sep 30, 2021
63e116e
move lb uri var expansion later
abr-egn Sep 30, 2021
71b545b
more detailed dns resolution error message
abr-egn Sep 30, 2021
1b6a049
special-case 127.0.0.1 uris
abr-egn Sep 30, 2021
afe3a39
don't clear the queue when load balanced
abr-egn Oct 5, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions .evergreen/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ functions:
${PREPARE_SHELL}
export MONGODB_URI="${MONGODB_URI}"
export SSL="${SSL}"
export SINGLE_MONGOS_LB_URI="${SINGLE_MONGOS_LB_URI}"
export MULTI_MONGOS_LB_URI="${MULTI_MONGOS_LB_URI}"
. .evergreen/generate-uri.sh

SINGLE_THREAD=${SINGLE_THREAD} \
Expand Down Expand Up @@ -603,6 +605,24 @@ functions:
${PREPARE_SHELL}
echo '{"results": [{ "status": "FAIL", "test_file": "Build", "log_raw": "No test-results.json found was created" } ]}' > ${PROJECT_DIRECTORY}/test-results.json

"start load balancer":
- command: shell.exec
params:
script: |
${PREPARE_SHELL}
export MONGODB_URI="${MONGODB_URI}"
${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh start
- command: expansions.update
params:
file: lb-expansion.yml

"stop load balancer":
- command: shell.exec
params:
script: |
${PREPARE_SHELL}
${DRIVERS_TOOLS}/.evergreen/run-load-balancer.sh stop

pre:
- func: "fetch source"
- func: "prepare resources"
Expand All @@ -613,6 +633,7 @@ pre:
- func: "install dependencies"

post:
- func: "stop load balancer"
- func: "stop mongo orchestration"
- func: "upload-mo-artifacts"
- func: "cleanup"
Expand Down Expand Up @@ -815,6 +836,17 @@ tasks:
MONGODB_VERSION: "latest"
TOPOLOGY: "sharded_cluster"
- func: "run tests"

- name: "test-latest-load_balancer"
tags: ["latest", "load_balancer"]
commands:
- func: "bootstrap mongo-orchestration"
vars:
MONGODB_VERSION: "latest"
TOPOLOGY: "sharded_cluster"
- func: "start load balancer"
- func: "run tests"


- name: "test-latest-aws-auth"
# "latest" explicitly left off to keep this out of the generic matrix
Expand Down Expand Up @@ -1369,6 +1401,13 @@ buildvariants:
async-runtime: "*"
then:
remove_tasks: [".3.6", ".4.0"]
# haproxy isn't installed on windows / ubuntu-arm
- if:
os: ["ubuntu-18.04-arm64", "windows-64-vs2017"]
auth-and-tls: "*"
async-runtime: "*"
then:
remove_tasks: ".load_balancer"
- matrix_name: "x509-auth"
matrix_spec:
os:
Expand Down
26 changes: 19 additions & 7 deletions .evergreen/generate-uri.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,25 @@ DRIVERS_TOOLS_X509=`echo "$DRIVERS_TOOLS_X509" | sed 's/\//%2F/g'`
CA_FILE="${DRIVERS_TOOLS_X509}%2Fca.pem"
CERT_FILE="${DRIVERS_TOOLS_X509}%2Fclient.pem"

if [[ "$MONGODB_URI" == *"?"* ]]; then
export MONGODB_URI="${MONGODB_URI}&"
else
export MONGODB_URI="${MONGODB_URI}/?"
fi

update_uri() {
local ORIG_URI=$1
if [[ "$ORIG_URI" == "" ]]; then
return
fi
# The rustls library requires a domain name.
ORIG_URI=$(echo "$ORIG_URI" | sed s/127.0.0.1/localhost/)
if [[ "$ORIG_URI" == *"?"* ]]; then
ORIG_URI="${ORIG_URI}&"
else
ORIG_URI="${ORIG_URI}/?"
fi
echo "${ORIG_URI}tls=true&tlsCAFile=${CA_FILE}&tlsCertificateKeyFile=${CERT_FILE}&tlsAllowInvalidCertificates=true"
}

export MONGODB_URI="${MONGODB_URI}tls=true&tlsCAFile=${CA_FILE}&tlsCertificateKeyFile=${CERT_FILE}&tlsAllowInvalidCertificates=true"
export MONGODB_URI="$(update_uri ${MONGODB_URI})"
export SINGLE_MONGOS_LB_URI="$(update_uri ${SINGLE_MONGOS_LB_URI})"
export MULTI_MONGOS_LB_URI="$(update_uri ${MULTI_MONGOS_LB_URI})"

echo "MONGODB_URI: ${MONGODB_URI}"
echo "SINGLE_MONGOS_LB_URI: ${SINGLE_MONGOS_LB_URI}"
echo "MULTI_MONGOS_LB_URI: ${MULTI_MONGOS_LB_URI}"
3 changes: 3 additions & 0 deletions src/client/options/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,9 @@ pub(crate) struct TestOptions {

/// Mock response for `SrvPollingMonitor::lookup_hosts`.
pub(crate) mock_lookup_hosts: Option<Result<LookupHosts>>,

/// Mock the `serviceId` response for a load-balanced hello.
pub(crate) mock_service_id: bool,
}

fn default_hosts() -> Vec<ServerAddress> {
Expand Down
22 changes: 16 additions & 6 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl Connection {
let (tx, rx) = mpsc::channel(1);
self.pinned_sender = Some(tx);
Ok(PinnedConnectionHandle {
id: self.id,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't needed for functionality, but it's very handy for debugging.

receiver: Arc::new(Mutex::new(rx)),
})
}
Expand Down Expand Up @@ -369,7 +370,8 @@ impl Drop for Connection {
if let Some(pool_manager) = self.pool_manager.take() {
let mut dropped_connection = self.take();
let result = if let Some(sender) = self.pinned_sender.as_mut() {
// Preserve the timestamp for pinned connections.
// Preserve the pool manager and timestamp for pinned connections.
dropped_connection.pool_manager = Some(pool_manager.clone());
dropped_connection.ready_and_available_time = self.ready_and_available_time;
match sender.try_send(dropped_connection) {
Ok(()) => Ok(()),
Expand All @@ -385,7 +387,11 @@ impl Drop for Connection {
Err(mpsc::error::TrySendError::Full(mut conn)) => {
// Panic in debug mode
if cfg!(debug_assertions) {
panic!("buffer full when attempting to return a pinned connection")
panic!(
"buffer full when attempting to return a pinned connection (id = \
{})",
conn.id
);
}
// TODO RUST-230 log an error in non-debug mode.
conn.pinned_sender = None;
Expand All @@ -409,6 +415,7 @@ impl Drop for Connection {
/// normal pool via this handle.
#[derive(Debug)]
pub(crate) struct PinnedConnectionHandle {
id: u32,
receiver: Arc<Mutex<mpsc::Receiver<Connection>>>,
}

Expand All @@ -418,6 +425,7 @@ impl PinnedConnectionHandle {
/// normal borrow.
pub(crate) fn replicate(&self) -> Self {
Self {
id: self.id,
receiver: self.receiver.clone(),
}
}
Expand All @@ -426,10 +434,12 @@ impl PinnedConnectionHandle {
/// connection has been unpinned.
pub(crate) async fn take_connection(&self) -> Result<Connection> {
let mut receiver = self.receiver.lock().await;
receiver
.recv()
.await
.ok_or_else(|| Error::internal("cannot take connection after unpin"))
receiver.recv().await.ok_or_else(|| {
Error::internal(format!(
"cannot take connection after unpin (id={})",
self.id
))
})
}

/// Return the pinned connection to the normal connection pool.
Expand Down
36 changes: 36 additions & 0 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ pub(crate) struct Handshaker {
/// given the same pool options, so it can be created at the time the Handshaker is created.
command: Command,
credential: Option<Credential>,
#[cfg(test)]
mock_service_id: bool,
}

impl Handshaker {
Expand All @@ -154,6 +156,9 @@ impl Handshaker {
let mut command =
is_master_command(options.as_ref().and_then(|opts| opts.server_api.as_ref()));

#[cfg(test)]
let mut mock_service_id = false;

if let Some(options) = options {
if let Some(app_name) = options.app_name {
metadata.application = Some(AppMetadata { name: app_name });
Expand Down Expand Up @@ -185,13 +190,20 @@ impl Handshaker {
if options.load_balanced {
command.body.insert("loadBalanced", true);
}

#[cfg(test)]
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just out of curiosity) why does this statement need to be in its own scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applying attributes to individual statements turns out not to be available in stable Rust yet.

mock_service_id = options.mock_service_id;
}
}

command.body.insert("client", metadata);

Self {
command,
credential,
#[cfg(test)]
mock_service_id,
}
}

Expand All @@ -207,6 +219,24 @@ impl Handshaker {
let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;

let mut is_master_reply = run_is_master(conn, command, topology, handler).await?;
// TODO PM-2369 Remove serviceId mocking when it's returned by the server.
#[cfg(test)]
{
if self.command.body.contains_key("loadBalanced")
&& is_master_reply.command_response.service_id.is_none()
&& self.mock_service_id
{
is_master_reply.command_response.service_id = Some(
is_master_reply
.command_response
.topology_version
.as_ref()
.unwrap()
.get_object_id("processId")
.unwrap(),
);
}
}
if self.command.body.contains_key("loadBalanced")
&& is_master_reply.command_response.service_id.is_none()
{
Expand Down Expand Up @@ -256,6 +286,8 @@ pub(crate) struct HandshakerOptions {
driver_info: Option<DriverInfo>,
server_api: Option<ServerApi>,
load_balanced: bool,
#[cfg(test)]
mock_service_id: bool,
}

impl From<ConnectionPoolOptions> for HandshakerOptions {
Expand All @@ -266,6 +298,8 @@ impl From<ConnectionPoolOptions> for HandshakerOptions {
driver_info: options.driver_info,
server_api: options.server_api,
load_balanced: options.load_balanced.unwrap_or(false),
#[cfg(test)]
mock_service_id: options.mock_service_id,
}
}
}
Expand All @@ -278,6 +312,8 @@ impl From<ClientOptions> for HandshakerOptions {
driver_info: options.driver_info,
server_api: options.server_api,
load_balanced: options.load_balanced.unwrap_or(false),
#[cfg(test)]
mock_service_id: options.test_options.map_or(false, |to| to.mock_service_id),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl ConnectionPool {
let (manager, _) = manager::channel();
let handle = PoolWorkerHandle::new_mocked();
let (connection_requester, _) = connection_requester::channel(Default::default(), handle);
let (_, generation_subscriber) = status::channel();
let (_, generation_subscriber) = status::channel(PoolGeneration::normal());

Self {
address,
Expand Down
10 changes: 10 additions & 0 deletions src/cmap/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,11 @@ pub(crate) struct ConnectionPoolOptions {

/// Whether or not the client is connecting to a MongoDB cluster through a load balancer.
pub(crate) load_balanced: Option<bool>,

/// Whether or not to mock the `serviceId` field of a hello through a load balancer.
#[cfg(test)]
#[serde(skip)]
pub(crate) mock_service_id: bool,
}

impl ConnectionPoolOptions {
Expand All @@ -109,6 +114,11 @@ impl ConnectionPoolOptions {
#[cfg(test)]
ready: None,
load_balanced: options.load_balanced,
#[cfg(test)]
mock_service_id: options
.test_options
.as_ref()
.map_or(false, |to| to.mock_service_id),
}
}

Expand Down
12 changes: 2 additions & 10 deletions src/cmap/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,9 @@ struct PoolStatus {
generation: PoolGeneration,
}

impl Default for PoolStatus {
fn default() -> Self {
PoolStatus {
generation: PoolGeneration::normal(),
}
}
}

/// Create a channel for publishing and receiving updates to the pool's generation.
pub(super) fn channel() -> (PoolGenerationPublisher, PoolGenerationSubscriber) {
let (sender, receiver) = tokio::sync::watch::channel(Default::default());
pub(super) fn channel(init: PoolGeneration) -> (PoolGenerationPublisher, PoolGenerationSubscriber) {
let (sender, receiver) = tokio::sync::watch::channel(PoolStatus { generation: init });
(
PoolGenerationPublisher { sender },
PoolGenerationSubscriber { receiver },
Expand Down
10 changes: 10 additions & 0 deletions src/cmap/test/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ async fn concurrent_connections() {
let _guard = LOCK.run_exclusively().await;

let mut options = CLIENT_OPTIONS.clone();
if options.load_balanced.unwrap_or(false) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

direct_connection is explicitly incompatible with load_balanced.

println!("skipping concurrent_connections test due to load-balanced topology");
return;
}
options.direct_connection = Some(true);
options.hosts.drain(1..);

Expand Down Expand Up @@ -163,6 +167,12 @@ async fn connection_error_during_establishment() {
let _guard: RwLockWriteGuard<_> = LOCK.run_exclusively().await;

let mut client_options = CLIENT_OPTIONS.clone();
if client_options.load_balanced.unwrap_or(false) {
println!(
"skipping connection_error_during_establishment test due to load-balanced topology"
);
return;
}
client_options.heartbeat_freq = Duration::from_secs(300).into(); // high so that monitors dont trip failpoint
client_options.hosts.drain(1..);
client_options.direct_connection = Some(true);
Expand Down
3 changes: 3 additions & 0 deletions src/cmap/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,9 @@ async fn cmap_spec_tests() {
let _guard: RwLockWriteGuard<()> = LOCK.run_exclusively().await;

let mut options = CLIENT_OPTIONS.clone();
if options.load_balanced.unwrap_or(false) {
return;
}
options.hosts.drain(1..);
options.direct_connection = Some(true);
let client = EventClient::with_options(options).await;
Expand Down
8 changes: 2 additions & 6 deletions src/cmap/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ impl ConnectionPoolWorker {
let (connection_requester, request_receiver) =
connection_requester::channel(address.clone(), handle);
let (manager, management_receiver) = manager::channel();
let (generation_publisher, generation_subscriber) = status::channel();

let is_load_balanced = options
.as_ref()
Expand All @@ -185,6 +184,7 @@ impl ConnectionPoolWorker {
} else {
PoolGeneration::normal()
};
let (generation_publisher, generation_subscriber) = status::channel(generation.clone());

#[cfg(test)]
let mut state = if options
Expand Down Expand Up @@ -513,7 +513,7 @@ impl ConnectionPoolWorker {
handler.handle_pool_cleared_event(event);
});

if !self.generation.is_load_balanced() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I swear that some part of the spec said that the queue shouldn't be drained in load-balancing mode, but I was apparently hallucinating that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this draining behavior cause tests to fail? Intuitively, it makes sense to not drain in LB mode, since these requests will probably go to a working service rather than the one that caused a clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep - https://github.com/mongodb/mongo-rust-driver/blob/master/src/test/spec/retryable_reads.rs#L74 failed (the second checkout succeeds rather than fails), and the load balancer spec specifically calls out retryable reads tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, so that test is specifically verifying the pool paused behavior, which shouldn't apply to load balancers. The spec doesn't currently clarify this so I filed DRIVERS-1942 to update it. In the meantime, I think we can preserve the existing behavior of not clearing the WaitQueue on pool clear if in load balanced mode and just skip that individual test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated - needed to skip the equivalent test for retryable writes as well.

if !matches!(self.generation, PoolGeneration::LoadBalanced(_)) {
for request in self.wait_queue.drain(..) {
// an error means the other end hung up already, which is okay because we were
// returning an error anyways
Expand Down Expand Up @@ -754,10 +754,6 @@ impl PoolGeneration {
Self::LoadBalanced(HashMap::new())
}

fn is_load_balanced(&self) -> bool {
matches!(self, Self::LoadBalanced(_))
}

#[cfg(test)]
pub(crate) fn as_normal(&self) -> Option<u32> {
match self {
Expand Down
Loading