Skip to content

RUST-1047 Ensure TopologyClosedEvent is the last SDAM event emitted #485

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .evergreen/check-rustfmt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ set -o errexit

. ~/.cargo/env
rustfmt +nightly --unstable-features --check src/**/*.rs
rustfmt +nightly --unstable-features --check src/*.rs
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ use_try_shorthand = true
wrap_comments = true
imports_layout = "HorizontalVertical"
imports_granularity = "Crate"
ignore = ["src/lib.rs"]
4 changes: 2 additions & 2 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
event::sdam::SdamEventHandler,
is_master::{is_master_command, run_is_master, IsMasterReply},
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
sdam::WeakTopology,
sdam::Topology,
};

#[cfg(feature = "tokio-runtime")]
Expand Down Expand Up @@ -227,7 +227,7 @@ impl Handshaker {
pub(crate) async fn handshake(
&self,
conn: &mut Connection,
topology: Option<&WeakTopology>,
topology: Option<&Topology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
) -> Result<HandshakeResult> {
let mut command = self.command.clone();
Expand Down
22 changes: 11 additions & 11 deletions src/is_master.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
ServerHeartbeatStartedEvent,
ServerHeartbeatSucceededEvent,
},
sdam::{ServerType, WeakTopology},
sdam::{ServerType, Topology},
selection_criteria::TagSet,
};

Expand All @@ -37,10 +37,15 @@ pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command {
command
}

/// Execute an isMaster command, emiting events if a reference to the topology and a handler are
/// provided.
///
/// A strong reference to the topology is used here to ensure it is still in scope and has not yet
/// emitted a `TopologyClosedEvent`.
pub(crate) async fn run_is_master(
conn: &mut Connection,
command: Command,
topology: Option<&WeakTopology>,
topology: Option<&Topology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
) -> Result<IsMasterReply> {
emit_event(topology, handler, |handler| {
Expand Down Expand Up @@ -91,18 +96,13 @@ pub(crate) async fn run_is_master(
}
}

fn emit_event<F>(
topology: Option<&WeakTopology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
emit: F,
) where
fn emit_event<F>(topology: Option<&Topology>, handler: &Option<Arc<dyn SdamEventHandler>>, emit: F)
where
F: FnOnce(&Arc<dyn SdamEventHandler>),
{
if let Some(handler) = handler {
if let Some(topology) = topology {
if topology.is_alive() {
emit(handler);
}
if topology.is_some() {
emit(handler);
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/sdam/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,5 @@ pub(crate) use self::{
server::{Server, ServerUpdate, ServerUpdateReceiver, ServerUpdateSender},
HandshakePhase,
Topology,
WeakTopology,
},
};
10 changes: 5 additions & 5 deletions src/sdam/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl HeartbeatMonitor {
/// Returns true if the topology has changed and false otherwise.
async fn check_server(&mut self, topology: &Topology, server: &Server) -> bool {
let mut retried = false;
let check_result = match self.perform_is_master().await {
let check_result = match self.perform_is_master(topology).await {
Ok(reply) => Ok(reply),
Err(e) => {
let previous_description = topology.get_server_description(&server.address).await;
Expand All @@ -171,7 +171,7 @@ impl HeartbeatMonitor {
{
self.handle_error(e, topology, server).await;
retried = true;
self.perform_is_master().await
self.perform_is_master(topology).await
} else {
Err(e)
}
Expand All @@ -188,14 +188,14 @@ impl HeartbeatMonitor {
}
}

async fn perform_is_master(&mut self) -> Result<IsMasterReply> {
async fn perform_is_master(&mut self, topology: &Topology) -> Result<IsMasterReply> {
let result = match self.connection {
Some(ref mut conn) => {
let command = is_master_command(self.client_options.server_api.as_ref());
run_is_master(
conn,
command,
Some(&self.topology),
Some(topology),
&self.client_options.sdam_event_handler,
)
.await
Expand All @@ -212,7 +212,7 @@ impl HeartbeatMonitor {
.handshaker
.handshake(
&mut connection,
Some(&self.topology),
Some(topology),
&self.client_options.sdam_event_handler,
)
.await
Expand Down
39 changes: 24 additions & 15 deletions src/sdam/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ struct TopologyState {
http_client: HttpClient,
description: TopologyDescription,
servers: HashMap<ServerAddress, Arc<Server>>,
options: ClientOptions,
id: ObjectId,
}

impl Topology {
Expand Down Expand Up @@ -108,6 +110,8 @@ impl Topology {
description,
servers: Default::default(),
http_client,
options: options.clone(),
id,
};

let state = Arc::new(RwLock::new(topology_state));
Expand Down Expand Up @@ -169,21 +173,6 @@ impl Topology {

pub(crate) fn close(&self) {
self.common.is_alive.store(false, Ordering::SeqCst);
if let Some(ref handler) = self.common.options.sdam_event_handler {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic was moved to the drop of TopologyState, so it's guaranteed to come after any work the monitor does because the monitor holds onto a strong reference to the topology (Topology) while it's performing a heartbeat.

if self.common.options.load_balanced.unwrap_or(false) {
for host in &self.common.options.hosts {
let event = ServerClosedEvent {
address: host.clone(),
topology_id: self.common.id,
};
handler.handle_server_closed_event(event);
}
}
let event = TopologyClosedEvent {
topology_id: self.common.id,
};
handler.handle_topology_closed_event(event);
}
}

/// Gets the addresses of the servers in the cluster.
Expand Down Expand Up @@ -485,6 +474,26 @@ impl Topology {
}
}

impl Drop for TopologyState {
fn drop(&mut self) {
if let Some(ref handler) = self.options.sdam_event_handler {
if matches!(self.description.topology_type, TopologyType::LoadBalanced) {
for host in self.servers.keys() {
let event = ServerClosedEvent {
address: host.clone(),
topology_id: self.id,
};
handler.handle_server_closed_event(event);
}
}
let event = TopologyClosedEvent {
topology_id: self.id,
};
handler.handle_topology_closed_event(event);
}
}
}

impl WeakTopology {
/// Attempts to convert the WeakTopology to a strong reference.
pub(crate) fn upgrade(&self) -> Option<Topology> {
Expand Down