-
Notifications
You must be signed in to change notification settings - Fork 181
RUST-1588: Add RunCursorCommand #912
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
4efc673
e6626f5
dcee793
9f9737f
9e17b35
5d74bfa
69c8d51
ee5492a
960c800
6fac64b
5eeafec
7afc8a5
54185f8
0836a45
9c690a6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,8 +2,8 @@ pub mod options; | |
|
||
use std::{fmt::Debug, sync::Arc}; | ||
|
||
use futures_util::TryStreamExt; | ||
#[cfg(feature = "in-use-encryption-unstable")] | ||
use bson::doc; | ||
use futures_util::stream::TryStreamExt; | ||
|
||
use crate::{ | ||
|
@@ -15,19 +15,28 @@ use crate::{ | |
ChangeStream, | ||
}, | ||
client::session::TransactionState, | ||
cmap::conn::PinnedConnectionHandle, | ||
cmap::{conn::PinnedConnectionHandle}, | ||
concern::{ReadConcern, WriteConcern}, | ||
cursor::Cursor, | ||
cursor::{Cursor}, | ||
error::{Error, ErrorKind, Result}, | ||
gridfs::{options::GridFsBucketOptions, GridFsBucket}, | ||
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand}, | ||
operation::{ | ||
Aggregate, | ||
AggregateTarget, | ||
Create, | ||
DropDatabase, | ||
ListCollections, | ||
RunCommand, | ||
RunCursorCommand, | ||
}, | ||
options::{ | ||
AggregateOptions, | ||
CollectionOptions, | ||
CreateCollectionOptions, | ||
DatabaseOptions, | ||
DropDatabaseOptions, | ||
ListCollectionsOptions, | ||
RunCursorCommandOptions, | ||
}, | ||
results::CollectionSpecification, | ||
selection_criteria::SelectionCriteria, | ||
|
@@ -469,6 +478,41 @@ impl Database { | |
.await | ||
} | ||
|
||
/// Runs a database-level command and returns a cursor to the response. | ||
pub async fn run_cursor_command( | ||
&self, | ||
command: Document, | ||
options: RunCursorCommandOptions, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The option type still needs to be updated here |
||
) -> Result<Cursor<Document>> { | ||
let rcc = RunCommand::new(self.name().to_string(), command, options.selection_criteria.clone(), None)?; | ||
let rc_command = RunCursorCommand::new(rcc, options)?; | ||
let client = self.client(); | ||
client.execute_cursor_operation(rc_command).await | ||
} | ||
|
||
/// Runs a database-level command and returns a cursor to the response. | ||
pub async fn run_cursor_command_with_session( | ||
&self, | ||
command: Document, | ||
options: impl Into<Option<RunCursorCommandOptions>>, | ||
session: &mut ClientSession, | ||
) -> Result<SessionCursor<Document>> { | ||
let mut options: Option<RunCursorCommandOptions> = options.into().clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cloning shouldn't be necessary here |
||
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?; | ||
let selection_criteria = match options.clone() { | ||
Some(options) => options.selection_criteria, | ||
None => None, | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than cloning the entire options struct, it would be more efficient only to clone the
or this:
(I slightly prefer the first one for no good reason; either is totally fine.) You can also use
We use All three of these options would work here, but it's generally considered best practice to use |
||
let option = match options.clone() { | ||
Some(options) => options, | ||
None => RunCursorCommandOptions::default(), | ||
}; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rather than creating an empty options struct when the user hasn't provided any options, we can just make the |
||
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?; | ||
let rc_command = RunCursorCommand::new(rcc, option)?; | ||
let client = self.client(); | ||
client.execute_session_cursor_operation(rc_command, session).await | ||
} | ||
|
||
/// Runs a database-level command using the provided `ClientSession`. | ||
/// | ||
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a | ||
|
@@ -606,4 +650,4 @@ impl Database { | |
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket { | ||
GridFsBucket::new(self.clone(), options.into().unwrap_or_default()) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
|
||
|
||
#[cfg(feature = "in-use-encryption-unstable")] | ||
use bson::doc; | ||
use bson::RawDocumentBuf; | ||
|
||
|
||
use crate::{ | ||
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription}, | ||
concern::{WriteConcern}, | ||
cursor::{CursorSpecification}, | ||
error::{Error, Result}, | ||
operation::{ | ||
Operation, | ||
RunCommand, | ||
}, | ||
options::{ | ||
RunCursorCommandOptions, | ||
}, | ||
selection_criteria::SelectionCriteria, | ||
}; | ||
|
||
#[derive(Debug, Clone)] | ||
pub(crate) struct RunCursorCommand<'conn> { | ||
run_command: RunCommand<'conn>, | ||
options: RunCursorCommandOptions, | ||
} | ||
|
||
impl<'conn> RunCursorCommand<'conn>{ | ||
pub(crate) fn new ( | ||
run_command: RunCommand<'conn>, | ||
options: RunCursorCommandOptions, | ||
) -> Result<Self> { | ||
Ok(Self{ | ||
run_command, | ||
options, | ||
}) | ||
} | ||
} | ||
|
||
impl<'conn> Operation for RunCursorCommand<'conn> { | ||
type O = CursorSpecification; | ||
type Command = RawDocumentBuf; | ||
|
||
const NAME: &'static str = "run_cursor_command"; | ||
|
||
fn build(&mut self, description: &StreamDescription) -> Result<Command<Self::Command>> { | ||
self.run_command.build(description) | ||
} | ||
|
||
fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> { | ||
self.run_command.serialize_command(cmd) | ||
} | ||
|
||
fn extract_at_cluster_time( | ||
&self, | ||
response: &bson::RawDocument, | ||
) -> Result<Option<bson::Timestamp>> { | ||
self.run_command.extract_at_cluster_time(response) | ||
} | ||
|
||
fn handle_error(&self, error: Error) -> Result<Self::O> { | ||
Err(error) | ||
} | ||
|
||
fn selection_criteria(&self) -> Option<&SelectionCriteria> { | ||
self.run_command.selection_criteria() | ||
} | ||
|
||
fn is_acknowledged(&self) -> bool { | ||
self.run_command.is_acknowledged() | ||
} | ||
|
||
fn write_concern(&self) -> Option<&WriteConcern> { | ||
self.run_command.write_concern() | ||
} | ||
|
||
fn supports_read_concern(&self, description: &StreamDescription) -> bool { | ||
self.run_command.supports_read_concern(description) | ||
} | ||
|
||
fn supports_sessions(&self) -> bool { | ||
self.run_command.supports_sessions() | ||
} | ||
|
||
fn retryability(&self) -> crate::operation::Retryability { | ||
self.run_command.retryability() | ||
} | ||
|
||
fn update_for_retry(&mut self) { | ||
self.run_command.update_for_retry() | ||
} | ||
|
||
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> { | ||
self.run_command.pinned_connection() | ||
} | ||
|
||
fn name(&self) -> &str { | ||
self.run_command.name() | ||
} | ||
|
||
fn handle_response( | ||
&self, | ||
response: RawCommandResponse, | ||
description: &StreamDescription, | ||
) -> Result<Self::O> { | ||
let doc = Operation::handle_response(&self.run_command, response, description)?; | ||
let cursor_info = bson::from_document(doc)?; | ||
Ok(CursorSpecification::new( | ||
cursor_info, | ||
description.server_address.clone(), | ||
self.options.batch_size, | ||
self.options.max_time, | ||
self.options.comment.clone(), | ||
)) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.