Skip to content

RUST-1588: Add RunCursorCommand #912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Jul 19, 2023
47 changes: 46 additions & 1 deletion src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,23 @@ use crate::{
cursor::Cursor,
error::{Error, ErrorKind, Result},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
operation::{
Aggregate,
AggregateTarget,
Create,
DropDatabase,
ListCollections,
RunCommand,
RunCursorCommand,
},
options::{
AggregateOptions,
CollectionOptions,
CreateCollectionOptions,
DatabaseOptions,
DropDatabaseOptions,
ListCollectionsOptions,
RunCursorCommandOptions,
},
results::CollectionSpecification,
selection_criteria::SelectionCriteria,
Expand Down Expand Up @@ -469,6 +478,42 @@ impl Database {
.await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command(
&self,
command: Document,
options: impl Into<Option<RunCursorCommandOptions>>,
) -> Result<Cursor<Document>> {
let options: Option<RunCursorCommandOptions> = options.into();
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client.execute_cursor_operation(rc_command).await
}

/// Runs a database-level command and returns a cursor to the response.
pub async fn run_cursor_command_with_session(
&self,
command: Document,
options: impl Into<Option<RunCursorCommandOptions>>,
session: &mut ClientSession,
) -> Result<SessionCursor<Document>> {
let mut options: Option<RunCursorCommandOptions> = options.into();
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
let selection_criteria = options
.as_ref()
.and_then(|options| options.selection_criteria.clone());
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
let rc_command = RunCursorCommand::new(rcc, options)?;
let client = self.client();
client
.execute_session_cursor_operation(rc_command, session)
.await
}

/// Runs a database-level command using the provided `ClientSession`.
///
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a
Expand Down
22 changes: 21 additions & 1 deletion src/db/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use typed_builder::TypedBuilder;
use crate::{
bson::{Bson, Document},
concern::{ReadConcern, WriteConcern},
options::Collation,
options::{Collation, CursorType},
selection_criteria::SelectionCriteria,
serde_util,
};
Expand Down Expand Up @@ -312,3 +312,23 @@ pub struct ChangeStreamPreAndPostImages {
/// If `true`, change streams will be able to include pre- and post-images.
pub enabled: bool,
}

/// Specifies the options to a
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
#[derive(Clone, Debug, Default, TypedBuilder)]
#[builder(field_defaults(default, setter(into)))]
#[non_exhaustive]
pub struct RunCursorCommandOptions {
/// The default read preference for operations.
pub selection_criteria: Option<SelectionCriteria>,
/// The type of cursor to return.
pub cursor_type: Option<CursorType>,
/// Number of documents to return per batch.
pub batch_size: Option<u32>,
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
/// on subsequent getMore commands.
pub max_time: Option<Duration>,
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
/// getMore commands.
pub comment: Option<Bson>,
}
2 changes: 2 additions & 0 deletions src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod list_databases;
mod list_indexes;
mod raw_output;
mod run_command;
mod run_cursor_command;
mod update;

#[cfg(test)]
Expand Down Expand Up @@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes;
#[cfg(feature = "in-use-encryption-unstable")]
pub(crate) use raw_output::RawOutput;
pub(crate) use run_command::RunCommand;
pub(crate) use run_cursor_command::RunCursorCommand;
pub(crate) use update::Update;

const SERVER_4_2_0_WIRE_VERSION: i32 = 8;
Expand Down
121 changes: 121 additions & 0 deletions src/operation/run_cursor_command.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#[cfg(feature = "in-use-encryption-unstable")]
use bson::doc;
use bson::RawDocumentBuf;

use crate::{
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
concern::WriteConcern,
cursor::CursorSpecification,
error::{Error, Result},
operation::{Operation, RunCommand},
options::RunCursorCommandOptions,
selection_criteria::SelectionCriteria,
};

#[derive(Debug, Clone)]
pub(crate) struct RunCursorCommand<'conn> {
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
}

impl<'conn> RunCursorCommand<'conn> {
pub(crate) fn new(
run_command: RunCommand<'conn>,
options: Option<RunCursorCommandOptions>,
) -> Result<Self> {
Ok(Self {
run_command,
options,
})
}
}

impl<'conn> Operation for RunCursorCommand<'conn> {
type O = CursorSpecification;
type Command = RawDocumentBuf;

const NAME: &'static str = "run_cursor_command";

fn build(&mut self, description: &StreamDescription) -> Result<Command<Self::Command>> {
self.run_command.build(description)
}

fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
self.run_command.serialize_command(cmd)
}

fn extract_at_cluster_time(
&self,
response: &bson::RawDocument,
) -> Result<Option<bson::Timestamp>> {
self.run_command.extract_at_cluster_time(response)
}

fn handle_error(&self, error: Error) -> Result<Self::O> {
Err(error)
}

fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.run_command.selection_criteria()
}

fn is_acknowledged(&self) -> bool {
self.run_command.is_acknowledged()
}

fn write_concern(&self) -> Option<&WriteConcern> {
self.run_command.write_concern()
}

fn supports_read_concern(&self, description: &StreamDescription) -> bool {
self.run_command.supports_read_concern(description)
}

fn supports_sessions(&self) -> bool {
self.run_command.supports_sessions()
}

fn retryability(&self) -> crate::operation::Retryability {
self.run_command.retryability()
}

fn update_for_retry(&mut self) {
self.run_command.update_for_retry()
}

fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
self.run_command.pinned_connection()
}

fn name(&self) -> &str {
self.run_command.name()
}

fn handle_response(
&self,
response: RawCommandResponse,
description: &StreamDescription,
) -> Result<Self::O> {
let doc = Operation::handle_response(&self.run_command, response, description)?;
let cursor_info = bson::from_document(doc)?;
let batch_size = match &self.options {
Some(options) => options.batch_size.clone(),
None => None,
};
let max_time = match &self.options {
Some(options) => options.max_time.clone(),
None => None,
};
let comment = match &self.options {
Some(options) => options.comment.clone(),
None => None,
};
Ok(CursorSpecification::new(
cursor_info,
description.server_address.clone(),
batch_size,
max_time,
comment,
))
}
}
2 changes: 1 addition & 1 deletion src/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use crate::{
bson::{serde_helpers, Bson, Document},
change_stream::event::ResumeToken,
db::options::CreateCollectionOptions,
Namespace,
serde_util,
Namespace,
};

use bson::{Binary, RawDocumentBuf};
Expand Down