Skip to content

RUST-521 Implement naive streaming and resume token caching for change streams #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 32 additions & 3 deletions src/change_stream/event.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
//! Contains the types related to a `ChangeStream` event.
use crate::coll::Namespace;
use bson::{Bson, Document};
use std::convert::TryInto;

use crate::{
coll::Namespace,
cursor::CursorSpecification,
error::Result,
options::ChangeStreamOptions,
};

use bson::{Bson, Document, RawBson, RawDocument, RawDocumentBuf};
use serde::{Deserialize, Serialize};

/// An opaque token used for resuming an interrupted
Expand All @@ -15,7 +23,28 @@ use serde::{Deserialize, Serialize};
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on resume tokens.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ResumeToken(pub(crate) Bson);
pub struct ResumeToken(pub(crate) RawBson);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using RawBson here means the token can be essentially handled as a (nearly) uninterpreted byte blob.


impl ResumeToken {
pub(crate) fn initial(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we update this so that both values don't always need to be created? e.g.

match spec.post_batch_resume_token {
    Some(token) if spec.initial_buffer.is_empty() => token,
    None => // token from options
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's much nicer, thank you!

options: &Option<ChangeStreamOptions>,
spec: &CursorSpecification,
) -> Option<ResumeToken> {
match &spec.post_batch_resume_token {
// Token from initial response from `aggregate`
Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
// Token from options passed to `watch`
_ => options
.as_ref()
.and_then(|o| o.start_after.as_ref().or_else(|| o.resume_after.as_ref()))
.cloned(),
}
}

pub(crate) fn from_raw(doc: Option<RawDocumentBuf>) -> Option<ResumeToken> {
doc.map(|doc| ResumeToken(RawBson::Document(doc)))
}
}

/// A `ChangeStreamEvent` represents a
/// [change event](https://docs.mongodb.com/manual/reference/change-events/) in the associated change stream.
Expand Down
96 changes: 87 additions & 9 deletions src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub(crate) mod options;
pub mod session;

use std::{
future::Future,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
Expand All @@ -18,6 +19,7 @@ use crate::{
event::{ChangeStreamEvent, ResumeToken},
options::ChangeStreamOptions,
},
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
error::Result,
operation::AggregateTarget,
options::AggregateOptions,
Expand Down Expand Up @@ -83,14 +85,25 @@ where

/// The information associate with this change stream.
data: ChangeStreamData,

/// The cached resume token.
resume_token: Option<ResumeToken>,
}

impl<T> ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: Cursor<T>, data: ChangeStreamData) -> Self {
Self { cursor, data }
pub(crate) fn new(
cursor: Cursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
) -> Self {
Self {
cursor,
data,
resume_token,
}
}

/// Returns the cached resume token that can be used to resume after the most recently returned
Expand All @@ -100,16 +113,53 @@ where
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on change stream resume tokens.
pub fn resume_token(&self) -> Option<&ResumeToken> {
todo!()
self.resume_token.as_ref()
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
ChangeStream {
cursor: self.cursor.with_type(),
data: self.data,
resume_token: self.resume_token,
}
}

/// Returns whether the change stream will continue to receive events.
pub fn is_alive(&self) -> bool {
!self.cursor.is_exhausted()
}

/// Retrieves the next result from the change stream, if any.
///
/// Where calling `Stream::next` will internally loop until a change document is received,
/// this will make at most one request and return `None` if the returned document batch is
/// empty. This method should be used when storing the resume token in order to ensure the
/// most up to date token is received, e.g.
///
/// ```ignore
/// # use mongodb::{Client, error::Result};
/// # async fn func() -> Result<()> {
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
/// # let coll = client.database("foo").collection("bar");
/// let mut change_stream = coll.watch(None, None).await?;
/// let mut resume_token = None;
/// while change_stream.is_alive() {
/// if let Some(event) = change_stream.next_if_any() {
/// // process event
/// }
/// resume_token = change_stream.resume_token().cloned();
/// }
/// #
/// # Ok(())
/// # }
/// ```
pub async fn next_if_any(&mut self) -> Result<Option<T>> {
Ok(match NextInBatchFuture::new(self).await? {
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?),
BatchValue::Empty | BatchValue::Exhausted => None,
})
}
}

#[derive(Debug)]
Expand All @@ -125,9 +175,6 @@ pub(crate) struct ChangeStreamData {
/// an automatic resume.
target: AggregateTarget,

/// The cached resume token.
resume_token: Option<ResumeToken>,

/// The options provided to the initial `$changeStream` stage.
options: Option<ChangeStreamOptions>,

Expand All @@ -151,21 +198,52 @@ impl ChangeStreamData {
pipeline,
client,
target,
resume_token: None,
options,
resume_attempted: false,
document_returned: false,
}
}
}

fn get_resume_token(
batch_value: &BatchValue,
batch_token: Option<&ResumeToken>,
) -> Result<Option<ResumeToken>> {
Ok(match batch_value {
BatchValue::Some { doc, is_last } => {
if *is_last && batch_token.is_some() {
batch_token.cloned()
} else {
doc.get("_id")?.map(|val| ResumeToken(val.to_raw_bson()))
}
}
BatchValue::Empty => batch_token.cloned(),
_ => None,
})
}

impl<T> CursorStream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
let out = self.cursor.poll_next_in_batch(cx);
if let Poll::Ready(Ok(bv)) = &out {
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
self.resume_token = Some(token);
}
}
out
}
}

impl<T> Stream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
todo!()
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream_poll_next(Pin::into_inner(self), cx)
}
}
111 changes: 102 additions & 9 deletions src/change_stream/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,18 @@ use std::{

use bson::Document;
use futures_core::Stream;
use futures_util::StreamExt;
use serde::de::DeserializeOwned;

use crate::{error::Result, ClientSession, SessionCursor, SessionCursorStream};
use crate::{
cursor::{BatchValue, CursorStream, NextInBatchFuture},
error::Result,
ClientSession,
SessionCursor,
SessionCursorStream,
};

use super::{event::ResumeToken, ChangeStreamData};
use super::{event::ResumeToken, get_resume_token, stream_poll_next, ChangeStreamData};

/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
/// be iterated using one. To iterate, use [`SessionChangeStream::next`] or retrieve a
Expand Down Expand Up @@ -45,14 +52,23 @@ where
{
cursor: SessionCursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
}

impl<T> SessionChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(cursor: SessionCursor<T>, data: ChangeStreamData) -> Self {
Self { cursor, data }
pub(crate) fn new(
cursor: SessionCursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
) -> Self {
Self {
cursor,
data,
resume_token,
}
}

/// Returns the cached resume token that can be used to resume after the most recently returned
Expand All @@ -62,12 +78,16 @@ where
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on change stream resume tokens.
pub fn resume_token(&self) -> Option<&ResumeToken> {
todo!()
self.resume_token.as_ref()
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
todo!()
SessionChangeStream {
cursor: self.cursor.with_type(),
data: self.data,
resume_token: self.resume_token,
}
}

/// Retrieves a [`SessionCursorStream`] to iterate this change stream. The session provided must
Expand Down Expand Up @@ -117,7 +137,10 @@ where
&mut self,
session: &'session mut ClientSession,
) -> SessionChangeStreamValues<'_, 'session, T> {
todo!()
SessionChangeStreamValues {
stream: self.cursor.stream(session),
resume_token: &mut self.resume_token,
}
}

/// Retrieve the next result from the change stream.
Expand Down Expand Up @@ -145,7 +168,41 @@ where
/// # }
/// ```
pub async fn next(&mut self, session: &mut ClientSession) -> Option<Result<T>> {
todo!()
self.values(session).next().await
}

/// Returns whether the change stream will continue to receive events.
pub fn is_alive(&self) -> bool {
!self.cursor.is_exhausted()
}

/// Retrieve the next result from the change stream, if any.
///
/// Where calling `next` will internally loop until a change document is received,
/// this will make at most one request and return `None` if the returned document batch is
/// empty. This method should be used when storing the resume token in order to ensure the
/// most up to date token is received, e.g.
///
/// ```ignore
/// # use mongodb::{Client, error::Result};
/// # async fn func() -> Result<()> {
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
/// # let coll = client.database("foo").collection("bar");
/// # let mut session = client.start_session(None).await?;
/// let mut change_stream = coll.watch_with_session(None, None, &mut session).await?;
/// let mut resume_token = None;
/// while change_stream.is_alive() {
/// if let Some(event) = change_stream.next_if_any(&mut session) {
/// // process event
/// }
/// resume_token = change_stream.resume_token().cloned();
/// }
/// #
/// # Ok(())
/// # }
/// ```
pub async fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
self.values(session).next_if_any().await
}
}

Expand All @@ -160,6 +217,42 @@ where
T: DeserializeOwned + Unpin + Send + Sync,
{
stream: SessionCursorStream<'cursor, 'session, T>,
resume_token: &'cursor mut Option<ResumeToken>,
}

impl<'cursor, 'session, T> SessionChangeStreamValues<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub fn resume_token(&self) -> Option<&ResumeToken> {
self.resume_token.as_ref()
}

pub fn is_alive(&self) -> bool {
!self.stream.is_exhausted()
}

pub async fn next_if_any<'a>(&'a mut self) -> Result<Option<T>> {
Ok(match NextInBatchFuture::new(self).await? {
BatchValue::Some { doc, .. } => Some(bson::from_slice(doc.as_bytes())?),
BatchValue::Empty | BatchValue::Exhausted => None,
})
}
}

impl<'cursor, 'session, T> CursorStream for SessionChangeStreamValues<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
let out = self.stream.poll_next_in_batch(cx);
if let Poll::Ready(Ok(bv)) = &out {
if let Some(token) = get_resume_token(bv, self.stream.post_batch_resume_token())? {
*self.resume_token = Some(token);
}
}
out
}
}

impl<'cursor, 'session, T> Stream for SessionChangeStreamValues<'cursor, 'session, T>
Expand All @@ -169,6 +262,6 @@ where
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
todo!()
stream_poll_next(Pin::into_inner(self), cx)
}
}
Loading