Skip to content

RUST-360 Streaming monitoring protocol #721

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
91 commits
Select commit Hold shift + click to select a range
532cc0d
wip initial streaming protocol impl
patrickfreed May 5, 2022
4b73ca5
implement prose tests, enable rtt monitoring
patrickfreed Jul 26, 2022
ee8ef91
clear requests after check completes
patrickfreed Jul 28, 2022
bf3df4c
cleanup
patrickfreed Aug 3, 2022
210f54d
sync sdam tests
patrickfreed Aug 3, 2022
99be868
fix clippy various cleanup
patrickfreed Aug 3, 2022
5f480e6
return error if send_message when moreToCome set
patrickfreed Aug 3, 2022
5a0ba19
document rtt monitor
patrickfreed Aug 3, 2022
8bde2f8
wip test changes
patrickfreed Aug 8, 2022
11e0de9
skip heartbeat_events test on 4.4+
patrickfreed Aug 8, 2022
5e9f7d3
reject invalid set names, only update RTT in monitor
patrickfreed Aug 8, 2022
2f699d4
pull in unified spec tests
patrickfreed Aug 8, 2022
88b477a
wip monitor timeout
patrickfreed Aug 8, 2022
f4a1ca9
idk
patrickfreed Aug 8, 2022
4281d67
wip
patrickfreed Aug 9, 2022
80fee99
idk tests not working
patrickfreed Aug 9, 2022
28fd38b
wip fix cancellation, resource leaks
patrickfreed Aug 10, 2022
077c8ac
various cleanup
patrickfreed Aug 10, 2022
8b3470e
fix clippy
patrickfreed Aug 11, 2022
a0a6c25
simplify reset
patrickfreed Aug 11, 2022
091368e
remove unneeded ord/partialord impl for ServerAddress
patrickfreed Aug 11, 2022
aa9a8d4
set command_executing = true before streaming responses
patrickfreed Aug 11, 2022
c0e6163
various more cleanup
patrickfreed Aug 11, 2022
b69fadb
ensure monitors are closed before closing topology
patrickfreed Aug 11, 2022
74d9737
delete old integration tests
patrickfreed Aug 11, 2022
d2e20e3
check_requester -> monitors_manager
patrickfreed Aug 11, 2022
1af37d4
skip sdam prose tests on load balanced topology
patrickfreed Aug 11, 2022
04c8172
wip monitor check request changes
patrickfreed Aug 12, 2022
45f5fe7
add acknowledgment back to poolready
patrickfreed Aug 16, 2022
33cda59
fix some races in the test
patrickfreed Aug 17, 2022
30a59c5
fix hang
patrickfreed Aug 17, 2022
b5ecdd1
skip sdam test that relies on socketTimeoutMS
patrickfreed Aug 18, 2022
fd74070
don't start SDAM monitors on load balanced topologies
patrickfreed Aug 18, 2022
30fa783
revert setup_client change in legacy retryable writes runner
patrickfreed Aug 18, 2022
cde696c
ensure failpoints are always disabled after retryable_writes legacy
patrickfreed Aug 18, 2022
983a5d6
ensure monitors aren't started for load balanced topologies, fix load
patrickfreed Aug 18, 2022
b7b655f
attempt to reduce flakiness of sdam_pool_management, wip
patrickfreed Aug 18, 2022
0b63cff
properly use useMultipleMongoses
patrickfreed Aug 18, 2022
857bacd
fix races in sdam tests, panic in async-std
patrickfreed Aug 19, 2022
8aef7a0
fix lint
patrickfreed Aug 19, 2022
a098cfc
fix uri merging
patrickfreed Aug 22, 2022
05648ab
fix clippy
patrickfreed Aug 22, 2022
84bff34
fix monitor crash on async-std with infinite connectTimeoutMS
patrickfreed Aug 22, 2022
6f37cc8
add debug print
patrickfreed Aug 22, 2022
dce4fb3
use correct hosts in v2 runner
patrickfreed Aug 22, 2022
f8e24a8
fix discarding of buffered data
patrickfreed Aug 22, 2022
a5e6eff
remove debug print
patrickfreed Aug 22, 2022
77a57f8
fix sdam pool management test
patrickfreed Aug 22, 2022
cd5141a
fix heartbeat_events test on auth/tls variants
patrickfreed Aug 22, 2022
7945fbe
remove unneeded mark-as-seen, possibly fix race in sdam
patrickfreed Aug 22, 2022
ee41d36
improve handling of RTT, factor in initial handshake in avg
patrickfreed Aug 23, 2022
d73f1ef
improve asserteventcount failure message
patrickfreed Aug 24, 2022
082ebc8
dont try to remove hosts from srv test uris
patrickfreed Aug 24, 2022
8325cb9
use BufStream<T> instead of BufReader<BufWriter<T>>
patrickfreed Aug 24, 2022
79bd04e
break correctly from monitor on topology close
patrickfreed Aug 24, 2022
2d0d9bb
drop os_info from ClientMetadata
patrickfreed Aug 24, 2022
2b34813
reuse tls connectors, refactor all establishment to go through Connec…
patrickfreed Aug 24, 2022
bfab549
fix compilation on openssl
patrickfreed Aug 25, 2022
946064a
ensure load balanced generation is set even when auth handshake fails
patrickfreed Aug 25, 2022
446afd8
add debug print to monitors closing
patrickfreed Aug 25, 2022
5fc4389
properly handle loadBalanced=true against non-LB
patrickfreed Aug 25, 2022
6a91a89
remove unneeded assert
patrickfreed Aug 25, 2022
c1ab640
remove print debug
patrickfreed Aug 25, 2022
5304a1e
fix lint
patrickfreed Aug 25, 2022
cae749c
Merge branch 'main' into RUST-360/streamable-hello-driver-parts
patrickfreed Aug 25, 2022
de8fdc3
fix compilation error
patrickfreed Aug 25, 2022
94753a9
fix clippy compilation
patrickfreed Aug 25, 2022
9175ea1
delete commented out test
patrickfreed Aug 25, 2022
7daa603
handle handshake errors in LB topology properly
patrickfreed Aug 25, 2022
38cba5f
fix clippy
patrickfreed Aug 25, 2022
cb57d92
fix csfle compilation, allow for standalone AsyncStream connection
patrickfreed Aug 25, 2022
34751a0
fix clippy, add a docstring
patrickfreed Aug 26, 2022
35cb043
add pool generation assertions to SDAM error tests
patrickfreed Aug 29, 2022
21504b6
consolidate verify_max_staleness
patrickfreed Sep 2, 2022
796797c
update docstring
patrickfreed Sep 2, 2022
33f5ab0
pass server_api directly
patrickfreed Sep 2, 2022
0c485d9
revert to using hashset/hashmap for topologydescriptiondiff
patrickfreed Sep 2, 2022
a542eef
set = to None instead of calling take()
patrickfreed Sep 2, 2022
a643820
remove commented out code
patrickfreed Sep 2, 2022
b987ff4
remove unneeded workerhandlelistener creation
patrickfreed Sep 2, 2022
90eed60
add TestClient method for checking for streaming protocol support
patrickfreed Sep 2, 2022
11a27d7
clarify test comment
patrickfreed Sep 2, 2022
c92da56
assert multiple heartbeats are received
patrickfreed Sep 2, 2022
0cdaf21
fix outdated comment
patrickfreed Sep 2, 2022
23da1b1
add note about closeConnection
patrickfreed Sep 2, 2022
2c1bd80
remove needless pin
patrickfreed Sep 7, 2022
d7e7bc1
drop unused timeout argument, use async establishment for async-std
patrickfreed Sep 7, 2022
712d460
use Connection::new instead of direct struct construction
patrickfreed Sep 7, 2022
24018ad
fix allow_invalid_hostnames behavior on OpenSSL
patrickfreed Sep 8, 2022
cd2eabf
fix outdated comment
patrickfreed Sep 8, 2022
2fabe8d
Merge remote-tracking branch 'origin/main' into RUST-360/streamable-h…
patrickfreed Sep 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/client/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ impl Client {
let raw_cmd = RawCommand {
name: cmd_name.clone(),
target_db,
exhaust_allowed: false,
bytes: serialized,
};

Expand Down
21 changes: 11 additions & 10 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl Client {
if let Some(server) = server_selection::attempt_to_select_server(
criteria,
&state.description,
&state.servers,
&state.servers(),
)? {
return Ok(server);
}
Expand All @@ -432,13 +432,14 @@ impl Client {
}
}

#[cfg(all(test, not(feature = "sync"), not(feature = "tokio-sync")))]
pub(crate) async fn get_hosts(&self) -> Vec<String> {
#[cfg(test)]
pub(crate) fn get_hosts(&self) -> Vec<String> {
let watcher = self.inner.topology.watch();
let state = watcher.peek_latest();

let servers = state.servers.keys();
servers
state
.servers()
.keys()
.map(|stream_address| format!("{}", stream_address))
.collect()
}
Expand All @@ -458,6 +459,11 @@ impl Client {
.clone()
}

#[cfg(test)]
pub(crate) fn topology(&self) -> &crate::sdam::Topology {
&self.inner.topology
}

#[cfg(feature = "csfle")]
pub(crate) fn weak(&self) -> WeakClient {
WeakClient {
Expand All @@ -475,11 +481,6 @@ impl Client {
})
.ok()
}

#[cfg(test)]
pub(crate) fn topology(&self) -> &Topology {
&self.inner.topology
}
}

#[cfg(feature = "csfle")]
Expand Down
9 changes: 8 additions & 1 deletion src/cmap/conn/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
pub(crate) struct RawCommand {
pub(crate) name: String,
pub(crate) target_db: String,
/// Whether or not the server may respond to this command multiple times via the moreToComeBit.
pub(crate) exhaust_allowed: bool,
pub(crate) bytes: Vec<u8>,
}

Expand All @@ -38,6 +40,9 @@ pub(crate) struct Command<T = Document> {
#[serde(skip)]
pub(crate) name: String,

#[serde(skip)]
pub(crate) exhaust_allowed: bool,

#[serde(flatten)]
pub(crate) body: T,

Expand Down Expand Up @@ -71,6 +76,7 @@ impl<T> Command<T> {
Self {
name,
target_db,
exhaust_allowed: false,
body,
lsid: None,
cluster_time: None,
Expand All @@ -93,6 +99,7 @@ impl<T> Command<T> {
Self {
name,
target_db,
exhaust_allowed: false,
body,
lsid: None,
cluster_time: None,
Expand Down Expand Up @@ -244,7 +251,7 @@ impl RawCommandResponse {
.map_err(|_| Error::invalid_authentication_response(mechanism_name))
}

pub(crate) fn into_hello_reply(self, round_trip_time: Duration) -> Result<HelloReply> {
pub(crate) fn into_hello_reply(self, round_trip_time: Option<Duration>) -> Result<HelloReply> {
match self.body::<CommandResponse<HelloCommandResponse>>() {
Ok(response) if response.is_success() => {
let server_address = self.source_address().clone();
Expand Down
59 changes: 55 additions & 4 deletions src/cmap/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use derivative::Derivative;
use serde::Serialize;
use tokio::sync::{mpsc, Mutex};

use self::wire::Message;
use self::wire::{Message, MessageFlags};
use super::manager::PoolManager;
use crate::{
bson::oid::ObjectId,
Expand Down Expand Up @@ -86,6 +86,12 @@ pub(crate) struct Connection {
/// into a pool.
error: bool,

/// Whether the most recently received message included the moreToCome flag, indicating the
/// server may send more responses without any additional requests. Attempting to send new
/// messages on this connection while this value is true will return an error. This value
/// will remain true until a server response does not include the moreToComeFlag.
more_to_come: bool,

stream: AsyncStream,

/// Compressor that the client will use before sending messages.
Expand Down Expand Up @@ -131,6 +137,7 @@ impl Connection {
error: false,
pinned_sender: None,
compressor: None,
more_to_come: false,
};

Ok(conn)
Expand All @@ -156,15 +163,14 @@ impl Connection {
/// Construct and connect a new connection used for monitoring.
pub(crate) async fn connect_monitoring(
address: ServerAddress,
connect_timeout: Option<Duration>,
tls_options: Option<TlsOptions>,
) -> Result<Self> {
Self::new(
0,
address,
0,
Some(ConnectionOptions {
connect_timeout,
connect_timeout: None, // handled by the monitor
tls_options,
event_handler: None,
}),
Expand Down Expand Up @@ -272,6 +278,13 @@ impl Connection {
message: Message,
to_compress: bool,
) -> Result<RawCommandResponse> {
if self.more_to_come {
return Err(Error::internal(format!(
"attempted to send a new message to {} but moreToCome bit was set",
self.address()
)));
}

self.command_executing = true;

// If the client has agreed on a compressor with the server, and the command
Expand All @@ -298,7 +311,10 @@ impl Connection {
self.command_executing = false;
self.error = response_message_result.is_err();

RawCommandResponse::new(self.address.clone(), response_message_result?)
let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);

RawCommandResponse::new(self.address.clone(), response_message)
}

/// Executes a `Command` and returns a `CommandResponse` containing the result from the server.
Expand Down Expand Up @@ -332,6 +348,34 @@ impl Connection {
self.send_message(message, to_compress).await
}

/// Receive the next message from the connection.
/// This will return an error if the previous response on this connection did not include the
/// moreToCome flag.
pub(crate) async fn receive_message(&mut self) -> Result<RawCommandResponse> {
if !self.more_to_come {
return Err(Error::internal(format!(
"attempted to stream response from connection to {} but moreToCome bit was not set",
self.address()
)));
}

self.command_executing = true;
let response_message_result = Message::read_from(
&mut self.stream,
self.stream_description
.as_ref()
.map(|d| d.max_message_size_bytes),
)
.await;
self.command_executing = false;
self.error = response_message_result.is_err();

let response_message = response_message_result?;
self.more_to_come = response_message.flags.contains(MessageFlags::MORE_TO_COME);

RawCommandResponse::new(self.address.clone(), response_message)
}

/// Gets the connection's StreamDescription.
pub(crate) fn stream_description(&self) -> Result<&StreamDescription> {
self.stream_description.as_ref().ok_or_else(|| {
Expand Down Expand Up @@ -394,8 +438,15 @@ impl Connection {
ready_and_available_time: None,
pinned_sender: self.pinned_sender.clone(),
compressor: self.compressor.clone(),
more_to_come: false,
}
}

/// Whether or not the previous command response indicated that the server may send
/// more responses without another request.
pub(crate) fn is_streaming(&self) -> bool {
self.more_to_come
}
}

impl Drop for Connection {
Expand Down
8 changes: 7 additions & 1 deletion src/cmap/conn/wire/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl Message {
bytes,
target_db: command.target_db,
name: command.name,
exhaust_allowed: command.exhaust_allowed,
},
request_id,
))
Expand All @@ -46,9 +47,14 @@ impl Message {
///
/// Note that `response_to` will need to be set manually.
pub(crate) fn with_raw_command(command: RawCommand, request_id: Option<i32>) -> Self {
let mut flags = MessageFlags::empty();
if command.exhaust_allowed {
flags |= MessageFlags::EXHAUST_ALLOWED;
}

Self {
response_to: 0,
flags: MessageFlags::empty(),
flags,
sections: vec![MessageSection::Document(command.bytes)],
checksum: None,
request_id,
Expand Down
5 changes: 4 additions & 1 deletion src/cmap/conn/wire/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ mod header;
mod message;
mod util;

pub(crate) use self::{message::Message, util::next_request_id};
pub(crate) use self::{
message::{Message, MessageFlags},
util::next_request_id,
};
14 changes: 3 additions & 11 deletions src/cmap/establish/handshake/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#[cfg(test)]
mod test;

use std::sync::Arc;

use lazy_static::lazy_static;
use os_info::{Type, Version};

Expand All @@ -12,10 +10,8 @@ use crate::{
cmap::{options::ConnectionPoolOptions, Command, Connection, StreamDescription},
compression::Compressor,
error::{ErrorKind, Result},
event::sdam::SdamEventHandler,
hello::{hello_command, run_hello, HelloReply},
options::{AuthMechanism, ClientOptions, Credential, DriverInfo, ServerApi},
sdam::Topology,
};

#[cfg(all(feature = "tokio-runtime", not(feature = "tokio-sync")))]
Expand Down Expand Up @@ -158,6 +154,7 @@ impl Handshaker {
options.as_ref().and_then(|opts| opts.server_api.as_ref()),
options.as_ref().and_then(|opts| opts.load_balanced.into()),
None,
None,
);

if let Some(options) = options {
Expand Down Expand Up @@ -214,17 +211,12 @@ impl Handshaker {
}

/// Handshakes a connection.
pub(crate) async fn handshake(
&self,
conn: &mut Connection,
topology: Option<&Topology>,
handler: &Option<Arc<dyn SdamEventHandler>>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The monitors now handle emitting these events, so we don't need to pipe these event handlers down into the handshaker anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, this may make things easier for SDAM logging too 😇

) -> Result<HandshakeResult> {
pub(crate) async fn handshake(&self, conn: &mut Connection) -> Result<HandshakeResult> {
let mut command = self.command.clone();

let client_first = set_speculative_auth_info(&mut command.body, self.credential.as_ref())?;

let mut hello_reply = run_hello(conn, command, topology, handler).await?;
let mut hello_reply = run_hello(conn, command).await?;

if self.command.body.contains_key("loadBalanced")
&& hello_reply.command_response.service_id.is_none()
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/establish/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ConnectionEstablisher {

let handshake = self
.handshaker
.handshake(&mut connection, None, &None)
.handshake(&mut connection)
.await
.map_err(|e| EstablishError::pre_hello(e, pool_gen.clone()))?;
let service_id = handshake.hello_reply.command_response.service_id;
Expand Down
6 changes: 1 addition & 5 deletions src/cmap/establish/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ async fn speculative_auth_test(
.await
.unwrap();

let first_round = handshaker
.handshake(&mut conn, None, &None)
.await
.unwrap()
.first_round;
let first_round = handshaker.handshake(&mut conn).await.unwrap().first_round;

// We expect that the server will return a response with the `speculativeAuthenticate` field if
// and only if it's new enough to support it.
Expand Down
20 changes: 6 additions & 14 deletions src/cmap/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,11 @@ impl PoolManager {
}

/// Mark the pool as "ready" as per the CMAP specification.
pub(super) async fn mark_as_ready(&self) {
let (message, acknowledgment_receiver) = AcknowledgedMessage::package(());
if self
.sender
.send(PoolManagementRequest::MarkAsReady {
_completion_handler: message,
})
.is_ok()
{
acknowledgment_receiver.wait_for_acknowledgment().await;
}
///
/// Since management requests are treated with the highest priority by the pool and will be
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the pool worker task to use a biased select!, which means that it will poll in a certain order. Specifically for this, I updated it to check for management requests (clearing, marking as ready, etc) before checking for check-out requests. This means that we don't actually have to wait for acknowledgment that the pool is ready before returning here, since we know that no check-out requests will be processed before this MarkAsReady is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As it turns out, this wasn't completely race-proof, so I reverted back to waiting for acknowledgment.

/// processed before all others, there's no need to wait for acknowledgment.
pub(super) fn mark_as_ready(&self) {
let _ = self.sender.send(PoolManagementRequest::MarkAsReady);
}

/// Check in the given connection to the pool.
Expand Down Expand Up @@ -108,9 +102,7 @@ pub(super) enum PoolManagementRequest {
},

/// Mark the pool as Ready, allowing connections to be created and checked out.
MarkAsReady {
_completion_handler: AcknowledgedMessage<()>,
},
MarkAsReady,

/// Check in the given connection.
CheckIn(Box<Connection>),
Expand Down
11 changes: 6 additions & 5 deletions src/cmap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ use crate::runtime::WorkerHandle;

const DEFAULT_MAX_POOL_SIZE: u32 = 10;

/// A pool of connections implementing the CMAP spec. All state is kept internally in an `Arc`, and
/// internal state that is mutable is additionally wrapped by a lock.
/// A pool of connections implementing the CMAP spec.
/// This type is actually a handle to task that manages the connections and is cheap to clone and
/// pass around.
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub(crate) struct ConnectionPool {
Expand Down Expand Up @@ -166,9 +167,9 @@ impl ConnectionPool {
self.manager.clear(cause, service_id).await
}

/// Mark the pool as "ready", allowing connections to be created and checked out.
pub(crate) async fn mark_as_ready(&self) {
self.manager.mark_as_ready().await;
/// Mark the pool as "ready" as per the CMAP specification.
pub(crate) fn mark_as_ready(&self) {
self.manager.mark_as_ready()
}

pub(crate) fn generation(&self) -> PoolGeneration {
Expand Down
2 changes: 1 addition & 1 deletion src/cmap/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ impl Operation {
}
Operation::Ready => {
if let Some(pool) = state.pool.read().await.deref() {
pool.mark_as_ready().await;
pool.mark_as_ready();
}
}
Operation::Close => {
Expand Down
Loading