Skip to content

RUST-1986 Use Collection type parameter for change streams #1162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions src/action/watch.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;
use std::{marker::PhantomData, time::Duration};

use bson::{Bson, Document, Timestamp};
use serde::de::DeserializeOwned;

use super::{action_impl, deeplink, option_setters, ExplicitSession, ImplicitSession};
use crate::{
Expand Down Expand Up @@ -96,11 +97,11 @@ where
/// Change streams require either a "majority" read concern or no read concern. Anything else
/// will cause a server error.
///
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<Document>>>`] or
/// d[`Result<SessionChangeStream<ChangeStreamEvent<Document>>>`] if a
/// `await` will return d[`Result<ChangeStream<ChangeStreamEvent<T>>>`] or
/// d[`Result<SessionChangeStream<ChangeStreamEvent<T>>>`] if a
/// [`ClientSession`] has been provided.
#[deeplink]
pub fn watch(&self) -> Watch {
pub fn watch(&self) -> Watch<T> {
Watch::new(self.client(), self.namespace().into())
}
}
Expand Down Expand Up @@ -153,24 +154,25 @@ where
///
/// Change streams require either a "majority" read concern or no read concern. Anything else
/// will cause a server error.
pub fn watch(&self) -> Watch {
pub fn watch(&self) -> Watch<T> {
self.async_collection.watch()
}
}

/// Starts a new [`ChangeStream`] that receives events for all changes in a given scope. Create by
/// calling [`Client::watch`], [`Database::watch`], or [`Collection::watch`].
#[must_use]
pub struct Watch<'a, S = ImplicitSession> {
pub struct Watch<'a, T = Document, S = ImplicitSession> {
client: &'a Client,
target: AggregateTarget,
pipeline: Vec<Document>,
options: Option<ChangeStreamOptions>,
session: S,
cluster: bool,
phantom: PhantomData<fn() -> T>,
}

impl<'a> Watch<'a, ImplicitSession> {
impl<'a, T> Watch<'a, T, ImplicitSession> {
fn new(client: &'a Client, target: AggregateTarget) -> Self {
Self {
client,
Expand All @@ -179,6 +181,7 @@ impl<'a> Watch<'a, ImplicitSession> {
options: None,
session: ImplicitSession,
cluster: false,
phantom: PhantomData,
}
}

Expand All @@ -190,6 +193,7 @@ impl<'a> Watch<'a, ImplicitSession> {
options: None,
session: ImplicitSession,
cluster: true,
phantom: PhantomData,
}
}
}
Expand Down Expand Up @@ -235,28 +239,29 @@ impl<'a, S> Watch<'a, S> {
);
}

impl<'a> Watch<'a, ImplicitSession> {
impl<'a, T> Watch<'a, T, ImplicitSession> {
/// Use the provided ['ClientSession'].
pub fn session<'s>(
self,
session: impl Into<&'s mut ClientSession>,
) -> Watch<'a, ExplicitSession<'s>> {
) -> Watch<'a, T, ExplicitSession<'s>> {
Watch {
client: self.client,
target: self.target,
pipeline: self.pipeline,
options: self.options,
session: ExplicitSession(session.into()),
cluster: self.cluster,
phantom: PhantomData,
}
}
}

#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<Document>>)]
impl<'a> Action for Watch<'a, ImplicitSession> {
#[action_impl(sync = crate::sync::ChangeStream<ChangeStreamEvent<T>>)]
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ImplicitSession> {
type Future = WatchFuture;

async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
async fn execute(mut self) -> Result<ChangeStream<ChangeStreamEvent<T>>> {
resolve_options!(
self.client,
self.options,
Expand All @@ -273,11 +278,11 @@ impl<'a> Action for Watch<'a, ImplicitSession> {
}
}

#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<Document>>)]
impl<'a> Action for Watch<'a, ExplicitSession<'a>> {
#[action_impl(sync = crate::sync::SessionChangeStream<ChangeStreamEvent<T>>)]
impl<'a, T: DeserializeOwned + Unpin + Send + Sync> Action for Watch<'a, T, ExplicitSession<'a>> {
type Future = WatchSessionFuture;

async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
async fn execute(mut self) -> Result<SessionChangeStream<ChangeStreamEvent<T>>> {
resolve_read_concern_with_session!(self.client, self.options, Some(&mut *self.session.0))?;
resolve_selection_criteria_with_session!(
self.client,
Expand Down
10 changes: 10 additions & 0 deletions src/test/change_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,3 +662,13 @@ async fn split_large_event() -> Result<()> {

Ok(())
}

// Regression test: `Collection::watch` uses the type parameter. This is not flagged as a test to
// run because it's just asserting that this compiles.
#[allow(unreachable_code, unused_variables, clippy::diverging_sub_expression)]
async fn _collection_watch_typed() {
let coll: Collection<bson::RawDocumentBuf> = unimplemented!();
let mut stream = coll.watch().await.unwrap();
let _: Option<crate::error::Result<ChangeStreamEvent<bson::RawDocumentBuf>>> =
stream.next().await;
}