Skip to content

Commit 1dc143b

Browse files
committed
RUST-1047 Ensure TopologyClosedEvent is the last SDAM event emitted (#485)
1 parent 9944ae0 commit 1dc143b

File tree

7 files changed

+48
-25
lines changed

7 files changed

+48
-25
lines changed

.evergreen/check-rustfmt.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,4 @@ set -o errexit
44

55
. ~/.cargo/env
66
rustfmt +nightly --unstable-features --check src/**/*.rs
7+
rustfmt +nightly --unstable-features --check src/*.rs

rustfmt.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ use_try_shorthand = true
88
wrap_comments = true
99
imports_layout = "HorizontalVertical"
1010
imports_granularity = "Crate"
11+
ignore = ["src/lib.rs"]

src/cmap/establish/handshake/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
event::sdam::SdamEventHandler,
1515
is_master::{is_master_command, run_is_master, IsMasterReply},
1616
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
17-
sdam::WeakTopology,
17+
sdam::Topology,
1818
};
1919

2020
#[cfg(feature = "tokio-runtime")]
@@ -199,7 +199,7 @@ impl Handshaker {
199199
pub(crate) async fn handshake(
200200
&self,
201201
conn: &mut Connection,
202-
topology: Option<&WeakTopology>,
202+
topology: Option<&Topology>,
203203
handler: &Option<Arc<dyn SdamEventHandler>>,
204204
) -> Result<HandshakeResult> {
205205
let mut command = self.command.clone();

src/is_master.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
ServerHeartbeatStartedEvent,
2020
ServerHeartbeatSucceededEvent,
2121
},
22-
sdam::{ServerType, WeakTopology},
22+
sdam::{ServerType, Topology},
2323
selection_criteria::TagSet,
2424
};
2525

@@ -37,10 +37,15 @@ pub(crate) fn is_master_command(api: Option<&ServerApi>) -> Command {
3737
command
3838
}
3939

40+
/// Execute an isMaster command, emiting events if a reference to the topology and a handler are
41+
/// provided.
42+
///
43+
/// A strong reference to the topology is used here to ensure it is still in scope and has not yet
44+
/// emitted a `TopologyClosedEvent`.
4045
pub(crate) async fn run_is_master(
4146
conn: &mut Connection,
4247
command: Command,
43-
topology: Option<&WeakTopology>,
48+
topology: Option<&Topology>,
4449
handler: &Option<Arc<dyn SdamEventHandler>>,
4550
) -> Result<IsMasterReply> {
4651
emit_event(topology, handler, |handler| {
@@ -91,18 +96,13 @@ pub(crate) async fn run_is_master(
9196
}
9297
}
9398

94-
fn emit_event<F>(
95-
topology: Option<&WeakTopology>,
96-
handler: &Option<Arc<dyn SdamEventHandler>>,
97-
emit: F,
98-
) where
99+
fn emit_event<F>(topology: Option<&Topology>, handler: &Option<Arc<dyn SdamEventHandler>>, emit: F)
100+
where
99101
F: FnOnce(&Arc<dyn SdamEventHandler>),
100102
{
101103
if let Some(handler) = handler {
102-
if let Some(topology) = topology {
103-
if topology.is_alive() {
104-
emit(handler);
105-
}
104+
if topology.is_some() {
105+
emit(handler);
106106
}
107107
}
108108
}

src/sdam/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,5 @@ pub(crate) use self::{
2626
server::{Server, ServerUpdate, ServerUpdateReceiver, ServerUpdateSender},
2727
HandshakePhase,
2828
Topology,
29-
WeakTopology,
3029
},
3130
};

src/sdam/monitor.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl HeartbeatMonitor {
158158
/// Returns true if the topology has changed and false otherwise.
159159
async fn check_server(&mut self, topology: &Topology, server: &Server) -> bool {
160160
let mut retried = false;
161-
let check_result = match self.perform_is_master().await {
161+
let check_result = match self.perform_is_master(topology).await {
162162
Ok(reply) => Ok(reply),
163163
Err(e) => {
164164
let previous_description = topology.get_server_description(&server.address).await;
@@ -169,7 +169,7 @@ impl HeartbeatMonitor {
169169
{
170170
self.handle_error(e, topology, server).await;
171171
retried = true;
172-
self.perform_is_master().await
172+
self.perform_is_master(topology).await
173173
} else {
174174
Err(e)
175175
}
@@ -186,14 +186,14 @@ impl HeartbeatMonitor {
186186
}
187187
}
188188

189-
async fn perform_is_master(&mut self) -> Result<IsMasterReply> {
189+
async fn perform_is_master(&mut self, topology: &Topology) -> Result<IsMasterReply> {
190190
let result = match self.connection {
191191
Some(ref mut conn) => {
192192
let command = is_master_command(self.client_options.server_api.as_ref());
193193
run_is_master(
194194
conn,
195195
command,
196-
Some(&self.topology),
196+
Some(topology),
197197
&self.client_options.sdam_event_handler,
198198
)
199199
.await
@@ -210,7 +210,7 @@ impl HeartbeatMonitor {
210210
.handshaker
211211
.handshake(
212212
&mut connection,
213-
Some(&self.topology),
213+
Some(topology),
214214
&self.client_options.sdam_event_handler,
215215
)
216216
.await

src/sdam/state/mod.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ struct TopologyState {
8080
servers: HashMap<ServerAddress, Arc<Server>>,
8181
#[cfg(test)]
8282
mocked: bool,
83+
options: ClientOptions,
84+
id: ObjectId,
8385
}
8486

8587
impl Topology {
@@ -110,6 +112,8 @@ impl Topology {
110112
servers: Default::default(),
111113
http_client: http_client.clone(),
112114
mocked: true,
115+
id,
116+
options: options.clone(),
113117
};
114118

115119
let topology = Self {
@@ -183,13 +187,17 @@ impl Topology {
183187
servers: Default::default(),
184188
http_client,
185189
mocked: false,
190+
id,
191+
options: options.clone(),
186192
};
187193

188194
#[cfg(not(test))]
189195
let topology_state = TopologyState {
190196
description,
191197
servers: Default::default(),
192198
http_client,
199+
options: options.clone(),
200+
id,
193201
};
194202

195203
let state = Arc::new(RwLock::new(topology_state));
@@ -232,12 +240,6 @@ impl Topology {
232240

233241
pub(crate) fn close(&self) {
234242
self.common.is_alive.store(false, Ordering::SeqCst);
235-
if let Some(ref handler) = self.common.options.sdam_event_handler {
236-
let event = TopologyClosedEvent {
237-
topology_id: self.common.id,
238-
};
239-
handler.handle_topology_closed_event(event);
240-
}
241243
}
242244

243245
/// Gets the addresses of the servers in the cluster.
@@ -534,6 +536,26 @@ impl Topology {
534536
}
535537
}
536538

539+
impl Drop for TopologyState {
540+
fn drop(&mut self) {
541+
if let Some(ref handler) = self.options.sdam_event_handler {
542+
if matches!(self.description.topology_type, TopologyType::LoadBalanced) {
543+
for host in self.servers.keys() {
544+
let event = ServerClosedEvent {
545+
address: host.clone(),
546+
topology_id: self.id,
547+
};
548+
handler.handle_server_closed_event(event);
549+
}
550+
}
551+
let event = TopologyClosedEvent {
552+
topology_id: self.id,
553+
};
554+
handler.handle_topology_closed_event(event);
555+
}
556+
}
557+
}
558+
537559
impl WeakTopology {
538560
/// Attempts to convert the WeakTopology to a strong reference.
539561
pub(crate) fn upgrade(&self) -> Option<Topology> {

0 commit comments

Comments
 (0)