Skip to content

RUST-1587 Implement server selection tracing events #805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl Client {
.and_then(|s| s.transaction.pinned_mongos())
.or_else(|| op.selection_criteria());

let server = match self.select_server(selection_criteria).await {
let server = match self.select_server(selection_criteria, op.name()).await {
Ok(server) => server,
Err(mut err) => {
retry.first_error()?;
Expand Down Expand Up @@ -803,14 +803,14 @@ impl Client {
}
}

async fn select_data_bearing_server(&self) -> Result<()> {
async fn select_data_bearing_server(&self, operation_name: &str) -> Result<()> {
let topology_type = self.inner.topology.topology_type();
let criteria = SelectionCriteria::Predicate(Arc::new(move |server_info| {
let server_type = server_info.server_type();
(matches!(topology_type, TopologyType::Single) && server_type.is_available())
|| server_type.is_data_bearing()
}));
let _: SelectedServer = self.select_server(Some(&criteria)).await?;
let _: SelectedServer = self.select_server(Some(&criteria), operation_name).await?;
Ok(())
}

Expand All @@ -824,7 +824,8 @@ impl Client {
// sessions are supported or not.
match initial_status {
SessionSupportStatus::Undetermined => {
self.select_data_bearing_server().await?;
self.select_data_bearing_server(crate::client::SESSIONS_SUPPORT_OP_NAME)
.await?;
Ok(self.inner.topology.session_support_status())
}
_ => Ok(initial_status),
Expand All @@ -841,7 +842,8 @@ impl Client {
// sessions are supported or not.
match initial_status {
TransactionSupportStatus::Undetermined => {
self.select_data_bearing_server().await?;
self.select_data_bearing_server("Check transactions support status")
.await?;
Ok(self.inner.topology.transaction_support_status())
}
_ => Ok(initial_status),
Expand Down
85 changes: 66 additions & 19 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::options::ServerAddress;
#[cfg(feature = "tracing-unstable")]
use crate::trace::{
command::CommandTracingEventEmitter,
server_selection::ServerSelectionTracingEventEmitter,
trace_or_log_enabled,
TracingOrLogLevel,
COMMAND_TRACING_EVENT_TARGET,
Expand All @@ -33,7 +34,7 @@ use crate::{
},
concern::{ReadConcern, WriteConcern},
db::Database,
error::{ErrorKind, Result},
error::{Error, ErrorKind, Result},
event::command::{handle_command_event, CommandEvent},
operation::{AggregateTarget, ListDatabases},
options::{
Expand All @@ -55,6 +56,8 @@ pub(crate) use session::{ClusterTime, SESSIONS_UNSUPPORTED_COMMANDS};
use session::{ServerSession, ServerSessionPool};

const DEFAULT_SERVER_SELECTION_TIMEOUT: Duration = Duration::from_secs(30);
// TODO: RUST-1585 Remove this constant.
pub(crate) const SESSIONS_SUPPORT_OP_NAME: &str = "Check sessions support status";

/// This is the main entry point for the API. A `Client` is used to connect to a MongoDB cluster.
/// By default, it will monitor the topology of the cluster, keeping track of any changes, such
Expand Down Expand Up @@ -480,13 +483,18 @@ impl Client {
&self,
criteria: Option<&SelectionCriteria>,
) -> Result<ServerAddress> {
let server = self.select_server(criteria).await?;
let server = self.select_server(criteria, "Test select server").await?;
Ok(server.address.clone())
}

/// Select a server using the provided criteria. If none is provided, a primary read preference
/// will be used instead.
async fn select_server(&self, criteria: Option<&SelectionCriteria>) -> Result<SelectedServer> {
#[allow(unused_variables)] // we only use the operation_name for tracing.
async fn select_server(
&self,
criteria: Option<&SelectionCriteria>,
operation_name: &str,
) -> Result<SelectedServer> {
let criteria =
criteria.unwrap_or(&SelectionCriteria::ReadPreference(ReadPreference::Primary));

Expand All @@ -497,31 +505,70 @@ impl Client {
.server_selection_timeout
.unwrap_or(DEFAULT_SERVER_SELECTION_TIMEOUT);

#[cfg(feature = "tracing-unstable")]
let event_emitter = ServerSelectionTracingEventEmitter::new(
self.inner.topology.id,
criteria,
operation_name,
start_time,
timeout,
);
#[cfg(feature = "tracing-unstable")]
event_emitter.emit_started_event(self.inner.topology.watch().observe_latest().description);
// We only want to emit this message once per operation at most.
#[cfg(feature = "tracing-unstable")]
let mut emitted_waiting_message = false;

let mut watcher = self.inner.topology.watch();
loop {
let state = watcher.observe_latest();

if let Some(server) = server_selection::attempt_to_select_server(
let result = server_selection::attempt_to_select_server(
criteria,
&state.description,
&state.servers(),
)? {
return Ok(server);
}
);
match result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really unfortunate how much noise this adds to the function - can logging be factored out to helper methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, done. I ended up adding a helper type to store the shared info and avoid having to pass it with each message. this matches nicely with our existing XEventEmitter types for CMAP and command tracing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

Err(error) => {
#[cfg(feature = "tracing-unstable")]
event_emitter.emit_failed_event(&state.description, &error);

watcher.request_immediate_check();

let change_occurred = start_time.elapsed() < timeout
&& watcher
.wait_for_update(timeout - start_time.elapsed())
.await;
if !change_occurred {
return Err(ErrorKind::ServerSelection {
message: state
.description
.server_selection_timeout_error_message(criteria),
return Err(error);
}
Ok(result) => {
if let Some(server) = result {
#[cfg(feature = "tracing-unstable")]
event_emitter.emit_succeeded_event(&state.description, &server);

return Ok(server);
} else {
#[cfg(feature = "tracing-unstable")]
if !emitted_waiting_message {
event_emitter.emit_waiting_event(&state.description);
emitted_waiting_message = true;
}

watcher.request_immediate_check();

let change_occurred = start_time.elapsed() < timeout
&& watcher
.wait_for_update(timeout - start_time.elapsed())
.await;
if !change_occurred {
let error: Error = ErrorKind::ServerSelection {
message: state
.description
.server_selection_timeout_error_message(criteria),
}
.into();

#[cfg(feature = "tracing-unstable")]
event_emitter.emit_failed_event(&state.description, &error);

return Err(error);
}
}
}
.into());
}
}
}
Expand Down
38 changes: 2 additions & 36 deletions src/event/sdam/topology_description.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use crate::{

/// A description of the most up-to-date information known about a topology. Further details can
/// be found in the [Server Discovery and Monitoring specification](https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst).
#[derive(Clone)]
#[derive(Clone, derive_more::Display)]
#[display(fmt = "{}", description)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just means that the output will be the Display impl for description. I took advantage of derive_more::Display in a couple places in this PR for the sake of brevity.

also, it was an intentional choice to leave Display prefixed with derive_more rather than importing it throughout this PR. I found myself confused other places in the code base that we were using an imported derive_more because it is easy to mistake it for the std trait with the same name.

pub struct TopologyDescription {
pub(crate) description: crate::sdam::TopologyDescription,
}
Expand Down Expand Up @@ -102,38 +103,3 @@ impl fmt::Debug for TopologyDescription {
.finish()
}
}

impl fmt::Display for TopologyDescription {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation was combined with the existing and more bare-bones implementation of Display on the internal crate::sdam::TopologyDescription type, so that this logic can be reused for the tracing messages, where we only have the internal type available.

fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
write!(f, "{{ Type: {:?}", self.description.topology_type)?;

if let Some(ref set_name) = self.description.set_name {
write!(f, ", Set Name: {}", set_name)?;
}

if let Some(max_set_version) = self.description.max_set_version {
write!(f, ", Max Set Version: {}", max_set_version)?;
}

if let Some(max_election_id) = self.description.max_election_id {
write!(f, ", Max Election ID: {}", max_election_id)?;
}

if let Some(ref compatibility_error) = self.description.compatibility_error {
write!(f, ", Compatibility Error: {}", compatibility_error)?;
}

if !self.description.servers.is_empty() {
write!(f, ", Servers: ")?;
let mut iter = self.description.servers.values();
if let Some(server) = iter.next() {
write!(f, "{}", ServerInfo::new_borrowed(server))?;
}
for server in iter {
write!(f, ", {}", ServerInfo::new_borrowed(server))?;
}
}

write!(f, " }}")
}
}
2 changes: 1 addition & 1 deletion src/sdam/description/topology/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::{
use self::server_selection::IDLE_WRITE_PERIOD;

/// The possible types for a topology.
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Deserialize, Serialize, derive_more::Display)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seemed worth adding Display here for use in TopologyDescription's Display impl.

#[non_exhaustive]
pub enum TopologyType {
/// A single mongod server.
Expand Down
41 changes: 36 additions & 5 deletions src/sdam/description/topology/server_selection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ impl SelectedServer {
server.increment_operation_count();
Self { server }
}

#[cfg(feature = "tracing-unstable")]
pub(crate) fn address(&self) -> &ServerAddress {
&self.server.address
}
}

impl Deref for SelectedServer {
Expand Down Expand Up @@ -362,12 +367,38 @@ impl TopologyDescription {
}

impl fmt::Display for TopologyDescription {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{{ Type: {:?}, Servers: [ ", self.topology_type)?;
for server_info in self.servers.values().map(ServerInfo::new_borrowed) {
write!(f, "{}, ", server_info)?;
fn fmt(&self, f: &mut fmt::Formatter) -> std::result::Result<(), fmt::Error> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is largely copied over from the event::sdam::TopologyDescription implementation.

write!(f, "{{ Type: {}", self.topology_type)?;

if let Some(ref set_name) = self.set_name {
write!(f, ", Set Name: {}", set_name)?;
}

if let Some(max_set_version) = self.max_set_version {
write!(f, ", Max Set Version: {}", max_set_version)?;
}

if let Some(max_election_id) = self.max_election_id {
write!(f, ", Max Election ID: {}", max_election_id)?;
}
write!(f, "] }}")

if let Some(ref compatibility_error) = self.compatibility_error {
write!(f, ", Compatibility Error: {}", compatibility_error)?;
}

if !self.servers.is_empty() {
write!(f, ", Servers: [ ")?;
let mut iter = self.servers.values();
if let Some(server) = iter.next() {
write!(f, "{}", ServerInfo::new_borrowed(server))?;
}
for server in iter {
write!(f, ", {}", ServerInfo::new_borrowed(server))?;
}
write!(f, " ]")?;
}

write!(f, " }}")
}
}

Expand Down
46 changes: 45 additions & 1 deletion src/selection_criteria.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ use crate::{
};

/// Describes which servers are suitable for a given operation.
#[derive(Clone, Derivative)]
#[derive(Clone, Derivative, derive_more::Display)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see unit tests in src/test/spec/trace.rs to see how the SelectionCriteria gets stringified by this logic.

#[derivative(Debug)]
#[non_exhaustive]
pub enum SelectionCriteria {
/// A read preference that describes the suitable servers based on the server type, max
/// staleness, and server tags.
///
/// See the documentation [here](https://www.mongodb.com/docs/manual/core/read-preference/) for more details.
#[display(fmt = "ReadPreference {}", _0)]
ReadPreference(ReadPreference),

/// A predicate used to filter servers that are considered suitable. A `server` will be
/// considered suitable by a `predicate` if `predicate(server)` returns true.
#[display(fmt = "Custom predicate")]
Predicate(#[derivative(Debug = "ignore")] Predicate),
}

Expand Down Expand Up @@ -129,6 +131,48 @@ pub enum ReadPreference {
Nearest { options: ReadPreferenceOptions },
}

impl std::fmt::Display for ReadPreference {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{{ Mode: ")?;
let opts_ref = match self {
ReadPreference::Primary => {
write!(f, "Primary")?;
None
}
ReadPreference::Secondary { options } => {
write!(f, "Secondary")?;
Some(options)
}
ReadPreference::PrimaryPreferred { options } => {
write!(f, "PrimaryPreferred")?;
Some(options)
}
ReadPreference::SecondaryPreferred { options } => {
write!(f, "SecondaryPreferred")?;
Some(options)
}
ReadPreference::Nearest { options } => {
write!(f, "Nearest")?;
Some(options)
}
};
if let Some(opts) = opts_ref {
if !opts.is_default() {
if let Some(ref tag_sets) = opts.tag_sets {
write!(f, ", Tag Sets: {:?}", tag_sets)?;
}
if let Some(ref max_staleness) = opts.max_staleness {
write!(f, ", Max Staleness: {:?}", max_staleness)?;
}
if let Some(ref hedge) = opts.hedge {
write!(f, ", Hedge: {}", hedge.enabled)?;
}
}
}
write!(f, " }}")
}
}

impl<'de> Deserialize<'de> for ReadPreference {
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
where
Expand Down
Loading