Skip to content

Commit 7da9195

Browse files
authored
RUST-1104 sync wrapper for the change stream API (#566)
1 parent 724a227 commit 7da9195

File tree

7 files changed

+402
-16
lines changed

7 files changed

+402
-16
lines changed

src/change_stream/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ use crate::{
3434
};
3535

3636
/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
37-
/// deployment. `ChangeStream` instances should be created with method `watch` or
38-
/// `watch_with_pipeline` against the relevant target.
37+
/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
38+
/// target.
3939
///
4040
/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
4141
/// stream of events. This is done automatically when the `ChangeStream` encounters certain

src/change_stream/session.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,28 +26,20 @@ use super::{
2626
};
2727

2828
/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
29-
/// be iterated using one. To iterate, use [`SessionChangeStream::next`] or retrieve a
30-
/// [`SessionCursorStream`] using [`SessionChangeStream::stream`]:
29+
/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
3130
///
3231
/// ```ignore
33-
/// # use mongodb::{bson::Document, Client, error::Result, ClientSession, SessionCursor};
32+
/// # use mongodb::{bson::Document, Client, error::Result};
3433
/// #
3534
/// # async fn do_stuff() -> Result<()> {
3635
/// # let client = Client::with_uri_str("mongodb://example.com").await?;
3736
/// # let mut session = client.start_session(None).await?;
3837
/// # let coll = client.database("foo").collection::<Document>("bar");
3938
/// #
40-
/// // iterate using next()
4139
/// let mut cs = coll.watch_with_session(None, None, &mut session).await?;
42-
/// while let Some(event) = cs.next(&mut session).await.transpose()? {
40+
/// while let Some(event) = cs.next(&mut session).await? {
4341
/// println!("{:?}", event)
4442
/// }
45-
///
46-
/// // iterate using `Stream`:
47-
/// use futures::stream::TryStreamExt;
48-
///
49-
/// let mut cs = coll.watch_with_session(None, None, &mut session).await?;
50-
/// let results: Vec<_> = cs.values(&mut session).try_collect().await?;
5143
/// #
5244
/// # Ok(())
5345
/// # }

src/sync/change_stream.rs

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
use futures_util::stream::StreamExt;
2+
use serde::de::DeserializeOwned;
3+
4+
use crate::{
5+
change_stream::{
6+
event::ResumeToken,
7+
session::SessionChangeStream as AsyncSessionChangeStream,
8+
ChangeStream as AsyncChangeStream,
9+
},
10+
error::Result,
11+
RUNTIME,
12+
};
13+
14+
use super::ClientSession;
15+
16+
/// A `ChangeStream` streams the ongoing changes of its associated collection, database or
17+
/// deployment. `ChangeStream` instances should be created with method `watch` against the relevant
18+
/// target.
19+
///
20+
/// `ChangeStream`s are "resumable", meaning that they can be restarted at a given place in the
21+
/// stream of events. This is done automatically when the `ChangeStream` encounters certain
22+
/// ["resumable"](https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resumable-error)
23+
/// errors, such as transient network failures. It can also be done manually by passing
24+
/// a [`ResumeToken`] retrieved from a past event into either the
25+
/// [`resume_after`](ChangeStreamOptions::resume_after) or
26+
/// [`start_after`](ChangeStreamOptions::start_after) (4.2+) options used to create the
27+
/// `ChangeStream`. Issuing a raw change stream aggregation is discouraged unless users wish to
28+
/// explicitly opt out of resumability.
29+
///
30+
/// A `ChangeStream` can be iterated like any other [`Iterator`]:
31+
///
32+
/// ```ignore
33+
/// # use mongodb::{sync::Client, error::Result, bson::doc,
34+
/// # change_stream::event::ChangeStreamEvent};
35+
/// #
36+
/// # fn func() -> Result<()> {
37+
/// # let client = Client::with_uri_str("mongodb://example.com")?;
38+
/// # let coll = client.database("foo").collection("bar");
39+
/// let mut change_stream = coll.watch(None, None)?;
40+
/// coll.insert_one(doc! { "x": 1 }, None)?;
41+
/// for event in change_stream {
42+
/// let event = event?;
43+
/// println!("operation performed: {:?}, document: {:?}", event.operation_type, event.full_document);
44+
/// // operation performed: Insert, document: Some(Document({"x": Int32(1)}))
45+
/// }
46+
/// #
47+
/// # Ok(())
48+
/// # }
49+
/// ```
50+
///
51+
/// See the documentation [here](https://docs.mongodb.com/manual/changeStreams) for more
52+
/// details. Also see the documentation on [usage recommendations](https://docs.mongodb.com/manual/administration/change-streams-production-recommendations/).
53+
pub struct ChangeStream<T>
54+
where
55+
T: DeserializeOwned + Unpin + Send + Sync,
56+
{
57+
async_stream: AsyncChangeStream<T>,
58+
}
59+
60+
impl<T> ChangeStream<T>
61+
where
62+
T: DeserializeOwned + Unpin + Send + Sync,
63+
{
64+
pub(crate) fn new(async_stream: AsyncChangeStream<T>) -> Self {
65+
Self { async_stream }
66+
}
67+
68+
/// Returns the cached resume token that can be used to resume after the most recently returned
69+
/// change.
70+
///
71+
/// See the documentation
72+
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
73+
/// information on change stream resume tokens.
74+
pub fn resume_token(&self) -> Option<ResumeToken> {
75+
self.async_stream.resume_token()
76+
}
77+
78+
/// Update the type streamed values will be parsed as.
79+
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
80+
ChangeStream {
81+
async_stream: self.async_stream.with_type(),
82+
}
83+
}
84+
85+
/// Returns whether the change stream will continue to receive events.
86+
pub fn is_alive(&self) -> bool {
87+
self.async_stream.is_alive()
88+
}
89+
90+
/// Retrieves the next result from the change stream, if any.
91+
///
92+
/// Where calling `Iterator::next` will internally loop until a change document is received,
93+
/// this will make at most one request and return `None` if the returned document batch is
94+
/// empty. This method should be used when storing the resume token in order to ensure the
95+
/// most up to date token is received, e.g.
96+
///
97+
/// ```ignore
98+
/// # use mongodb::{sync::Client, error::Result};
99+
/// # fn func() -> Result<()> {
100+
/// # let client = Client::with_uri_str("mongodb://example.com")?;
101+
/// # let coll = client.database("foo").collection("bar");
102+
/// let mut change_stream = coll.watch(None, None)?;
103+
/// let mut resume_token = None;
104+
/// while change_stream.is_alive() {
105+
/// if let Some(event) = change_stream.next_if_any() {
106+
/// // process event
107+
/// }
108+
/// resume_token = change_stream.resume_token().cloned();
109+
/// }
110+
/// #
111+
/// # Ok(())
112+
/// # }
113+
/// ```
114+
pub fn next_if_any(&mut self) -> Result<Option<T>> {
115+
RUNTIME.block_on(self.async_stream.next_if_any())
116+
}
117+
}
118+
119+
impl<T> Iterator for ChangeStream<T>
120+
where
121+
T: DeserializeOwned + Unpin + Send + Sync,
122+
{
123+
type Item = Result<T>;
124+
125+
fn next(&mut self) -> Option<Self::Item> {
126+
RUNTIME.block_on(self.async_stream.next())
127+
}
128+
}
129+
130+
/// A [`SessionChangeStream`] is a change stream that was created with a [`ClientSession`] that must
131+
/// be iterated using one. To iterate, use [`SessionChangeStream::next`]:
132+
///
133+
/// ```ignore
134+
/// # use mongodb::{bson::Document, sync::Client, error::Result};
135+
/// #
136+
/// # async fn do_stuff() -> Result<()> {
137+
/// # let client = Client::with_uri_str("mongodb://example.com")?;
138+
/// # let mut session = client.start_session(None)?;
139+
/// # let coll = client.database("foo").collection::<Document>("bar");
140+
/// #
141+
/// let mut cs = coll.watch_with_session(None, None, &mut session)?;
142+
/// while let Some(event) = cs.next(&mut session)? {
143+
/// println!("{:?}", event)
144+
/// }
145+
/// #
146+
/// # Ok(())
147+
/// # }
148+
/// ```
149+
pub struct SessionChangeStream<T>
150+
where
151+
T: DeserializeOwned + Unpin,
152+
{
153+
async_stream: AsyncSessionChangeStream<T>,
154+
}
155+
156+
impl<T> SessionChangeStream<T>
157+
where
158+
T: DeserializeOwned + Unpin + Send + Sync,
159+
{
160+
pub(crate) fn new(async_stream: AsyncSessionChangeStream<T>) -> Self {
161+
Self { async_stream }
162+
}
163+
164+
/// Returns the cached resume token that can be used to resume after the most recently returned
165+
/// change.
166+
///
167+
/// See the documentation
168+
/// [here](https://docs.mongodb.com/manual/changeStreams/#change-stream-resume-token) for more
169+
/// information on change stream resume tokens.
170+
pub fn resume_token(&self) -> Option<ResumeToken> {
171+
self.async_stream.resume_token()
172+
}
173+
174+
/// Update the type streamed values will be parsed as.
175+
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> SessionChangeStream<D> {
176+
SessionChangeStream {
177+
async_stream: self.async_stream.with_type(),
178+
}
179+
}
180+
181+
/// Retrieve the next result from the change stream.
182+
/// The session provided must be the same session used to create the change stream.
183+
///
184+
/// ```ignore
185+
/// # use bson::{doc, Document};
186+
/// # use mongodb::sync::Client;
187+
/// # fn main() {
188+
/// # async {
189+
/// # let client = Client::with_uri_str("foo")?;
190+
/// # let coll = client.database("foo").collection::<Document>("bar");
191+
/// # let other_coll = coll.clone();
192+
/// # let mut session = client.start_session(None)?;
193+
/// let mut cs = coll.watch_with_session(None, None, &mut session)?;
194+
/// while let Some(event) = cs.next(&mut session)? {
195+
/// let id = bson::to_bson(&event.id)?;
196+
/// other_coll.insert_one_with_session(doc! { "id": id }, None, &mut session)?;
197+
/// }
198+
/// # Ok::<(), mongodb::error::Error>(())
199+
/// # };
200+
/// # }
201+
/// ```
202+
pub fn next(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
203+
RUNTIME.block_on(self.async_stream.next(&mut session.async_client_session))
204+
}
205+
206+
/// Returns whether the change stream will continue to receive events.
207+
pub fn is_alive(&self) -> bool {
208+
self.async_stream.is_alive()
209+
}
210+
211+
/// Retrieve the next result from the change stream, if any.
212+
///
213+
/// Where calling `next` will internally loop until a change document is received,
214+
/// this will make at most one request and return `None` if the returned document batch is
215+
/// empty. This method should be used when storing the resume token in order to ensure the
216+
/// most up to date token is received, e.g.
217+
///
218+
/// ```ignore
219+
/// # use mongodb::{sync::Client, error::Result};
220+
/// # async fn func() -> Result<()> {
221+
/// # let client = Client::with_uri_str("mongodb://example.com")?;
222+
/// # let coll = client.database("foo").collection("bar");
223+
/// # let mut session = client.start_session(None)?;
224+
/// let mut change_stream = coll.watch_with_session(None, None, &mut session)?;
225+
/// let mut resume_token = None;
226+
/// while change_stream.is_alive() {
227+
/// if let Some(event) = change_stream.next_if_any(&mut session) {
228+
/// // process event
229+
/// }
230+
/// resume_token = change_stream.resume_token();
231+
/// }
232+
/// #
233+
/// # Ok(())
234+
/// # }
235+
/// ```
236+
pub fn next_if_any(&mut self, session: &mut ClientSession) -> Result<Option<T>> {
237+
RUNTIME.block_on(
238+
self.async_stream
239+
.next_if_any(&mut session.async_client_session),
240+
)
241+
}
242+
}

src/sync/client/mod.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
pub mod session;
22

3-
use super::{ClientSession, Database};
3+
use super::{ChangeStream, ClientSession, Database, SessionChangeStream};
44
use crate::{
55
bson::Document,
6+
change_stream::{event::ChangeStreamEvent, options::ChangeStreamOptions},
67
concern::{ReadConcern, WriteConcern},
78
error::Result,
89
options::{
@@ -157,4 +158,51 @@ impl Client {
157158
.block_on(self.async_client.start_session(options))
158159
.map(Into::into)
159160
}
161+
162+
/// Starts a new [`ChangeStream`] that receives events for all changes in the cluster. The
163+
/// stream does not observe changes from system collections or the "config", "local" or
164+
/// "admin" databases. Note that this method (`watch` on a cluster) is only supported in
165+
/// MongoDB 4.0 or greater.
166+
///
167+
/// See the documentation [here](https://docs.mongodb.com/manual/changeStreams/) on change
168+
/// streams.
169+
///
170+
/// Change streams require either a "majority" read concern or no read
171+
/// concern. Anything else will cause a server error.
172+
///
173+
/// Note that using a `$project` stage to remove any of the `_id` `operationType` or `ns` fields
174+
/// will cause an error. The driver requires these fields to support resumability. For
175+
/// more information on resumability, see the documentation for
176+
/// [`ChangeStream`](change_stream/struct.ChangeStream.html)
177+
///
178+
/// If the pipeline alters the structure of the returned events, the parsed type will need to be
179+
/// changed via [`ChangeStream::with_type`].
180+
#[allow(unused)]
181+
pub(crate) fn watch(
182+
&self,
183+
pipeline: impl IntoIterator<Item = Document>,
184+
options: impl Into<Option<ChangeStreamOptions>>,
185+
) -> Result<ChangeStream<ChangeStreamEvent<Document>>> {
186+
RUNTIME
187+
.block_on(self.async_client.watch(pipeline, options))
188+
.map(ChangeStream::new)
189+
}
190+
191+
/// Starts a new [`SessionChangeStream`] that receives events for all changes in the cluster
192+
/// using the provided [`ClientSession`]. See [`Client::watch`] for more information.
193+
#[allow(unused)]
194+
pub(crate) fn watch_with_session(
195+
&self,
196+
pipeline: impl IntoIterator<Item = Document>,
197+
options: impl Into<Option<ChangeStreamOptions>>,
198+
session: &mut ClientSession,
199+
) -> Result<SessionChangeStream<ChangeStreamEvent<Document>>> {
200+
RUNTIME
201+
.block_on(self.async_client.watch_with_session(
202+
pipeline,
203+
options,
204+
&mut session.async_client_session,
205+
))
206+
.map(SessionChangeStream::new)
207+
}
160208
}

0 commit comments

Comments
 (0)