Skip to content

Commit 4d78548

Browse files
authored
RUST-1046 Fix iteration of cursors when batchSize doesn't divide result size (#483)
1 parent 5a0192c commit 4d78548

File tree

2 files changed

+60
-6
lines changed

2 files changed

+60
-6
lines changed

src/cursor/session.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ pub struct SessionCursor<T>
6161
where
6262
T: DeserializeOwned + Unpin,
6363
{
64-
exhausted: bool,
6564
client: Client,
6665
info: CursorInformation,
6766
buffer: VecDeque<T>,
@@ -79,10 +78,7 @@ where
7978
spec: CursorSpecification<T>,
8079
pinned: Option<PinnedConnectionHandle>,
8180
) -> Self {
82-
let exhausted = spec.id() == 0;
83-
8481
Self {
85-
exhausted,
8682
client,
8783
info: spec.info,
8884
buffer: spec.initial_buffer,
@@ -198,12 +194,25 @@ where
198194
}
199195
}
200196

197+
impl<T> SessionCursor<T>
198+
where
199+
T: DeserializeOwned + Unpin,
200+
{
201+
fn mark_exhausted(&mut self) {
202+
self.info.id = 0;
203+
}
204+
205+
fn is_exhausted(&self) -> bool {
206+
self.info.id == 0
207+
}
208+
}
209+
201210
impl<T> Drop for SessionCursor<T>
202211
where
203212
T: DeserializeOwned + Unpin,
204213
{
205214
fn drop(&mut self) {
206-
if self.exhausted {
215+
if self.is_exhausted() {
207216
return;
208217
}
209218

@@ -254,7 +263,9 @@ where
254263
fn drop(&mut self) {
255264
// Update the parent cursor's state based on any iteration performed on this handle.
256265
self.session_cursor.buffer = self.generic_cursor.take_buffer();
257-
self.session_cursor.exhausted = self.generic_cursor.is_exhausted();
266+
if self.generic_cursor.is_exhausted() {
267+
self.session_cursor.mark_exhausted();
268+
}
258269
}
259270
}
260271

src/test/coll.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,3 +1049,46 @@ async fn collection_generic_bounds() {
10491049
.collection(function_name!());
10501050
let _result = coll.insert_one(Bar {}, None).await;
10511051
}
1052+
1053+
/// Verify that a cursor with multiple batches whose last batch isn't full
1054+
/// iterates without errors.
1055+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
1056+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
1057+
async fn cursor_batch_size() {
1058+
let _guard: RwLockReadGuard<()> = LOCK.run_concurrently().await;
1059+
1060+
let client = TestClient::new().await;
1061+
let coll = client
1062+
.init_db_and_coll("cursor_batch_size", "cursor_batch_size")
1063+
.await;
1064+
1065+
let doc = Document::new();
1066+
coll.insert_many(vec![&doc; 10], None).await.unwrap();
1067+
1068+
let opts = FindOptions::builder().batch_size(3).build();
1069+
let cursor_no_session = coll.find(doc! {}, opts.clone()).await.unwrap();
1070+
let docs: Vec<_> = cursor_no_session.try_collect().await.unwrap();
1071+
assert_eq!(docs.len(), 10);
1072+
1073+
// test session cursors
1074+
if client.is_standalone() {
1075+
return;
1076+
}
1077+
let mut session = client.start_session(None).await.unwrap();
1078+
let mut cursor = coll
1079+
.find_with_session(doc! {}, opts.clone(), &mut session)
1080+
.await
1081+
.unwrap();
1082+
let mut docs = Vec::new();
1083+
while let Some(doc) = cursor.next(&mut session).await {
1084+
docs.push(doc.unwrap());
1085+
}
1086+
assert_eq!(docs.len(), 10);
1087+
1088+
let mut cursor = coll
1089+
.find_with_session(doc! {}, opts, &mut session)
1090+
.await
1091+
.unwrap();
1092+
let docs: Vec<_> = cursor.stream(&mut session).try_collect().await.unwrap();
1093+
assert_eq!(docs.len(), 10);
1094+
}

0 commit comments

Comments
 (0)