Skip to content

RUST-522 Implement resume functionality for change streams #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/change_stream/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ pub struct ResumeToken(pub(crate) RawBson);

impl ResumeToken {
pub(crate) fn initial(
options: &Option<ChangeStreamOptions>,
options: Option<&ChangeStreamOptions>,
spec: &CursorSpecification,
) -> Option<ResumeToken> {
match &spec.post_batch_resume_token {
// Token from initial response from `aggregate`
Some(token) if spec.initial_buffer.is_empty() => Some(token.clone()),
// Token from options passed to `watch`
_ => options
.as_ref()
.and_then(|o| o.start_after.as_ref().or_else(|| o.resume_after.as_ref()))
.cloned(),
}
Expand Down
134 changes: 90 additions & 44 deletions src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use std::{
task::{Context, Poll},
};

use bson::Document;
use futures_core::Stream;
use bson::{Document, Timestamp};
use derivative::Derivative;
use futures_core::{future::BoxFuture, Stream};
use serde::{de::DeserializeOwned, Deserialize};

use crate::{
Expand All @@ -20,11 +21,12 @@ use crate::{
options::ChangeStreamOptions,
},
cursor::{stream_poll_next, BatchValue, CursorStream, NextInBatchFuture},
error::Result,
error::{Error, Result},
operation::AggregateTarget,
options::AggregateOptions,
selection_criteria::{ReadPreference, SelectionCriteria},
Client,
ClientSession,
Collection,
Cursor,
Database,
Expand Down Expand Up @@ -75,34 +77,37 @@ use crate::{
///
/// See the documentation [here](https://docs.mongodb.com/manual/changeStreams) for more
/// details. Also see the documentation on [usage recommendations](https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/).
#[derive(Debug)]
#[derive(Derivative)]
#[derivative(Debug)]
pub struct ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
/// The cursor to iterate over event instances.
cursor: Cursor<T>,

/// The information associate with this change stream.
/// Arguments to `watch` that created this change stream.
args: WatchArgs,

/// Dynamic information associated with this change stream.
data: ChangeStreamData,

/// The cached resume token.
resume_token: Option<ResumeToken>,
/// A pending future for a resume.
#[derivative(Debug = "ignore")]
pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>>,
}

impl<T> ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
{
pub(crate) fn new(
cursor: Cursor<T>,
data: ChangeStreamData,
resume_token: Option<ResumeToken>,
) -> Self {
pub(crate) fn new(cursor: Cursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
let pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>> = None;
Self {
cursor,
args,
data,
resume_token,
pending_resume,
}
}

Expand All @@ -112,16 +117,17 @@ where
/// See the documentation
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
/// information on change stream resume tokens.
pub fn resume_token(&self) -> Option<&ResumeToken> {
self.resume_token.as_ref()
pub fn resume_token(&self) -> Option<ResumeToken> {
self.data.resume_token.clone()
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
ChangeStream {
cursor: self.cursor.with_type(),
args: self.args,
data: self.data,
resume_token: self.resume_token,
pending_resume: None,
}
}

Expand Down Expand Up @@ -162,45 +168,48 @@ where
}
}

#[derive(Debug)]
pub(crate) struct ChangeStreamData {
/// Arguments passed to a `watch` method, captured to allow resume.
#[derive(Debug, Clone)]
pub(crate) struct WatchArgs {
/// The pipeline of stages to append to an initial `$changeStream` stage.
pipeline: Vec<Document>,

/// The client that was used for the initial `$changeStream` aggregation, used for server
/// selection during an automatic resume.
client: Client,
pub(crate) pipeline: Vec<Document>,

/// The original target of the change stream, used for re-issuing the aggregation during
/// an automatic resume.
target: AggregateTarget,
/// The original target of the change stream.
pub(crate) target: AggregateTarget,

/// The options provided to the initial `$changeStream` stage.
options: Option<ChangeStreamOptions>,
pub(crate) options: Option<ChangeStreamOptions>,
}

/// Dynamic change stream data needed for resume.
#[derive(Debug, Default)]
pub(crate) struct ChangeStreamData {
/// The `operationTime` returned by the initial `aggregate` command.
pub(crate) initial_operation_time: Option<Timestamp>,

/// The cached resume token.
pub(crate) resume_token: Option<ResumeToken>,

/// Whether or not the change stream has attempted a resume, used to attempt a resume only
/// once.
resume_attempted: bool,
pub(crate) resume_attempted: bool,

/// Whether or not the change stream has returned a document, used to update resume token
/// during an automatic resume.
document_returned: bool,
pub(crate) document_returned: bool,

/// The implicit session used to create the original cursor.
pub(crate) implicit_session: Option<ClientSession>,
}

impl ChangeStreamData {
pub(crate) fn new(
pipeline: Vec<Document>,
client: Client,
target: AggregateTarget,
options: Option<ChangeStreamOptions>,
) -> Self {
fn take(&mut self) -> Self {
Self {
pipeline,
client,
target,
options,
resume_attempted: false,
document_returned: false,
initial_operation_time: self.initial_operation_time,
resume_token: self.resume_token.clone(),
resume_attempted: self.resume_attempted,
document_returned: self.document_returned,
implicit_session: self.implicit_session.take(),
}
}
}
Expand All @@ -227,11 +236,48 @@ where
T: DeserializeOwned + Unpin + Send + Sync,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is essentially the manual state machine version of SessionChangeStream::next_if_any. Hopefully at some point Rust gets reasonable first-class syntax for streams.

I considered trying to write this by wrapping an async function in a stream adaptor for clarity, but that ended up needing to be self-referential.

if let Some(mut pending) = self.pending_resume.take() {
match Pin::new(&mut pending).poll(cx) {
Poll::Pending => {
self.pending_resume = Some(pending);
return Poll::Pending;
}
Poll::Ready(Ok(new_stream)) => {
// Ensure that the old cursor is killed on the server selected for the new one.
self.cursor
.set_drop_address(new_stream.cursor.address().clone());
self.cursor = new_stream.cursor;
self.args = new_stream.args;
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
}
}
let out = self.cursor.poll_next_in_batch(cx);
if let Poll::Ready(Ok(bv)) = &out {
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
self.resume_token = Some(token);
match &out {
Poll::Ready(Ok(bv)) => {
if let Some(token) = get_resume_token(bv, self.cursor.post_batch_resume_token())? {
self.data.resume_token = Some(token);
}
if matches!(bv, BatchValue::Some { .. }) {
self.data.document_returned = true;
}
}
Poll::Ready(Err(e)) if e.is_resumable() && !self.data.resume_attempted => {
self.data.resume_attempted = true;
let client = self.cursor.client().clone();
let args = self.args.clone();
let mut data = self.data.take();
data.implicit_session = self.cursor.take_implicit_session();
self.pending_resume = Some(Box::pin(async move {
let new_stream: Result<ChangeStream<ChangeStreamEvent<()>>> = client
.execute_watch(args.pipeline, args.options, args.target, Some(data))
.await;
new_stream.map(|cs| cs.with_type::<T>())
}));
return Poll::Pending;
}
_ => {}
}
out
}
Expand Down
Loading