Skip to content

RUST-1509 SDAM Logging #918

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions src/cmap/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,13 @@ pub struct ConnectionInfo {
#[derivative(Debug)]
pub(crate) struct Connection {
/// Driver-generated ID for the connection.
pub(super) id: u32,
pub(crate) id: u32,

/// Server-generated ID for the connection.
pub(crate) server_id: Option<i64>,

pub(crate) address: ServerAddress,

pub(crate) generation: ConnectionGeneration,

/// The cached StreamDescription from the connection's handshake.
Expand Down Expand Up @@ -164,9 +166,8 @@ impl Connection {

/// Create a connection intended for monitoring purposes.
/// TODO: RUST-1454 Rename this to just `new`, drop the pooling-specific data.
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream) -> Self {
// Monitoring connections don't have IDs, so just use 0 as a placeholder here.
Self::new(address, stream, 0, ConnectionGeneration::Monitoring)
pub(crate) fn new_monitoring(address: ServerAddress, stream: AsyncStream, id: u32) -> Self {
Self::new(address, stream, id, ConnectionGeneration::Monitoring)
}

pub(crate) fn info(&self) -> ConnectionInfo {
Expand Down
3 changes: 2 additions & 1 deletion src/cmap/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ impl ConnectionEstablisher {
pub(crate) async fn establish_monitoring_connection(
&self,
address: ServerAddress,
id: u32,
) -> Result<(Connection, HelloReply)> {
let stream = self.make_stream(address.clone()).await?;
let mut connection = Connection::new_monitoring(address, stream);
let mut connection = Connection::new_monitoring(address, stream, id);

let hello_reply = self.handshaker.handshake(&mut connection, None).await?;

Expand Down
23 changes: 23 additions & 0 deletions src/event/sdam.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ pub struct ServerHeartbeatStartedEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
/// new monitoring connection, this value will not be present.
pub server_connection_id: Option<i64>,
}

/// Published when a server monitor's `hello` or legacy hello command succeeds.
Expand All @@ -139,6 +147,13 @@ pub struct ServerHeartbeatSucceededEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present for server versions 4.2+.
pub server_connection_id: Option<i64>,
}

/// Published when a server monitor's `hello` or legacy hello command fails.
Expand All @@ -158,6 +173,14 @@ pub struct ServerHeartbeatFailedEvent {

/// Determines if this heartbeat event is from an awaitable `hello`.
pub awaited: bool,

/// The driver-generated ID for the connection used for the heartbeat.
pub driver_connection_id: u32,

/// The server-generated ID for the connection used for the heartbeat. This value is only
/// present on server versions 4.2+. If this event corresponds to the first heartbeat on a
/// new monitoring connection, this value will not be present.
pub server_connection_id: Option<i64>,
}

#[derive(Clone, Debug)]
Expand Down
41 changes: 34 additions & 7 deletions src/sdam/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::{
sync::Arc,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::{Duration, Instant},
};

use bson::doc;
use lazy_static::lazy_static;
use tokio::sync::watch;

use super::{
Expand All @@ -26,6 +30,13 @@ use crate::{
runtime::{self, stream::DEFAULT_CONNECT_TIMEOUT, WorkerHandle, WorkerHandleListener},
};

fn next_monitoring_connection_id() -> u32 {
lazy_static! {
static ref MONITORING_CONNECTION_ID: AtomicU32 = AtomicU32::new(0);
}
MONITORING_CONNECTION_ID.fetch_add(1, Ordering::SeqCst)
}

pub(crate) const DEFAULT_HEARTBEAT_FREQUENCY: Duration = Duration::from_secs(10);
pub(crate) const MIN_HEARTBEAT_FREQUENCY: Duration = Duration::from_millis(500);

Expand Down Expand Up @@ -162,10 +173,18 @@ impl Monitor {
}

async fn perform_hello(&mut self) -> HelloResult {
let driver_connection_id = self
.connection
.as_ref()
.map(|c| c.id)
.unwrap_or(next_monitoring_connection_id());

self.emit_event(|| {
SdamEvent::ServerHeartbeatStarted(ServerHeartbeatStartedEvent {
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

Expand Down Expand Up @@ -215,7 +234,7 @@ impl Monitor {
let start = Instant::now();
let res = self
.connection_establisher
.establish_monitoring_connection(self.address.clone())
.establish_monitoring_connection(self.address.clone(), driver_connection_id)
.await;
match res {
Ok((conn, hello_reply)) => {
Expand Down Expand Up @@ -264,6 +283,8 @@ impl Monitor {
reply,
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

Expand All @@ -272,18 +293,21 @@ impl Monitor {
self.topology_version = r.command_response.topology_version;
}
HelloResult::Err(ref e) | HelloResult::Cancelled { reason: ref e } => {
// Per the spec, cancelled requests and errors both require the monitoring
// connection to be closed.
self.connection = None;
self.rtt_monitor_handle.reset_average_rtt();
self.emit_event(|| {
SdamEvent::ServerHeartbeatFailed(ServerHeartbeatFailedEvent {
duration,
failure: e.clone(),
server_address: self.address.clone(),
awaited: self.topology_version.is_some(),
driver_connection_id,
server_connection_id: self.connection.as_ref().and_then(|c| c.server_id),
})
});

// Per the spec, cancelled requests and errors both require the monitoring
// connection to be closed.
self.connection = None;
self.rtt_monitor_handle.reset_average_rtt();
self.topology_version.take();
}
}
Expand Down Expand Up @@ -402,7 +426,10 @@ impl RttMonitor {
None => {
let connection = self
.connection_establisher
.establish_monitoring_connection(self.address.clone())
.establish_monitoring_connection(
self.address.clone(),
next_monitoring_connection_id(),
)
.await?
.0;
self.connection = Some(connection);
Expand Down
93 changes: 65 additions & 28 deletions src/sdam/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use crate::{
TopologyType,
};

#[cfg(feature = "tracing-unstable")]
use crate::trace::topology::TopologyTracingEventEmitter;

use super::{
monitor::{MonitorManager, MonitorRequestReceiver},
srv_polling::SrvPollingMonitor,
Expand All @@ -67,22 +70,36 @@ pub(crate) struct Topology {
impl Topology {
pub(crate) fn new(options: ClientOptions) -> Result<Topology> {
let description = TopologyDescription::default();
let id = ObjectId::new();

let event_emitter = options.sdam_event_handler.as_ref().map(|handler| {
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();

// Spin up a task to handle events so that a user's event handling code can't block the
// TopologyWorker.
let handler = handler.clone();
runtime::execute(async move {
while let Some(event) = rx.recv().await {
let (event, ack) = event.into_parts();
handle_sdam_event(handler.as_ref(), event);
ack.acknowledge(());
}
});
SdamEventEmitter { sender: tx }
});
let event_emitter =
if options.sdam_event_handler.is_some() || cfg!(feature = "tracing-unstable") {
let user_handler = options.sdam_event_handler.clone();

#[cfg(feature = "tracing-unstable")]
let tracing_emitter =
TopologyTracingEventEmitter::new(options.tracing_max_document_length_bytes, id);
let (tx, mut rx) = mpsc::unbounded_channel::<AcknowledgedMessage<SdamEvent>>();
runtime::execute(async move {
while let Some(event) = rx.recv().await {
let (event, ack) = event.into_parts();

if let Some(ref user_handler) = user_handler {
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(user_handler.as_ref(), event.clone());
#[cfg(not(feature = "tracing-unstable"))]
handle_sdam_event(user_handler.as_ref(), event);
}
#[cfg(feature = "tracing-unstable")]
handle_sdam_event(&tracing_emitter, event);

ack.acknowledge(());
}
});
Some(SdamEventEmitter { sender: tx })
} else {
None
};

let (updater, update_receiver) = TopologyUpdater::channel();
let (worker_handle, handle_listener) = WorkerHandleListener::channel();
Expand All @@ -95,8 +112,6 @@ impl Topology {
let connection_establisher =
ConnectionEstablisher::new(EstablisherOptions::from_client_options(&options))?;

let id = ObjectId::new();

let worker = TopologyWorker {
id,
topology_description: description,
Expand Down Expand Up @@ -375,18 +390,41 @@ impl TopologyWorker {
// indicate to the topology watchers that the topology is no longer alive
drop(self.publisher);

// close all the monitors.
let mut close_futures = self
.servers
.into_values()
.map(|server| {
drop(server.inner);
server.monitor_manager.close_monitor()
})
.collect::<FuturesUnordered<_>>();
// Close all the monitors.
let mut close_futures = FuturesUnordered::new();
for (address, server) in self.servers.into_iter() {
if let Some(ref emitter) = self.event_emitter {
emitter
.emit(SdamEvent::ServerClosed(ServerClosedEvent {
address,
topology_id: self.id,
}))
.await;
}
drop(server.inner);
close_futures.push(server.monitor_manager.close_monitor());
}
while close_futures.next().await.is_some() {}

if let Some(emitter) = self.event_emitter {
if !self.topology_description.servers.is_empty()
&& self.options.load_balanced != Some(true)
{
let previous_description = self.topology_description;
let mut new_description = previous_description.clone();
new_description.servers.clear();

emitter
.emit(SdamEvent::TopologyDescriptionChanged(Box::new(
TopologyDescriptionChangedEvent {
topology_id: self.id,
previous_description: previous_description.into(),
new_description: new_description.into(),
},
)))
.await;
}

emitter
.emit(SdamEvent::TopologyClosed(TopologyClosedEvent {
topology_id: self.id,
Expand Down Expand Up @@ -436,11 +474,10 @@ impl TopologyWorker {
let diff = old_description.diff(&self.topology_description);
let changed = diff.is_some();
if let Some(diff) = diff {
// For ordering of events in tests, sort the addresses.

#[cfg(not(test))]
let changed_servers = diff.changed_servers;

// For ordering of events in tests, sort the addresses.
#[cfg(test)]
let changed_servers = {
let mut servers = diff.changed_servers.into_iter().collect::<Vec<_>>();
Expand Down
6 changes: 6 additions & 0 deletions src/test/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ pub(crate) fn deserialize_spec_tests<T: DeserializeOwned>(
continue;
};

if let Ok(unskipped_filename) = std::env::var("TEST_FILE") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this intentionally left in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add this when I'm debugging pretty often to make it easier to run single tests, but I can delete if you'd rather not have it checked in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, it seems fine to leave in as long as it's deliberate :)

if filename != unskipped_filename {
continue;
}
}

if let Some(skipped_files) = skipped_files {
if skipped_files.contains(&filename) {
log_uncaptured(format!("Skipping deserializing {:?}", &path));
Expand Down
Loading