Skip to content

Commit

Permalink
feat: add an internal tryNext method (#2638)
Browse files Browse the repository at this point in the history
The default cursor iteration behavior is to block until either a
cursor is dead (has a cursor id of 0), or a non-empty batch is
returned from the server for a `getMore` command. `tryNext` allows
users to iterate a cursor optionally returning `null` if the
`getMore` returns an empty batch (likely on a tailable + awaitData
cursor).

NODE-2917
  • Loading branch information
mbroadst authored Nov 30, 2020
1 parent ca0124d commit 43c94b6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 9 deletions.
40 changes: 32 additions & 8 deletions src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ export abstract class AbstractCursor extends EventEmitter {
return done(undefined, true);
}

next(this, (err, doc) => {
next(this, true, (err, doc) => {
if (err) return done(err);

if (doc) {
Expand All @@ -236,7 +236,23 @@ export abstract class AbstractCursor extends EventEmitter {
return done(new MongoError('Cursor is exhausted'));
}

next(this, done);
next(this, true, done);
});
}

/**
* Try to get the next available document from the cursor or `null` if an empty batch is returned
* @internal
*/
tryNext(): Promise<Document | null>;
tryNext(callback: Callback<Document | null>): void;
tryNext(callback?: Callback<Document | null>): Promise<Document | null> | void {
return maybePromise(callback, done => {
if (this[kId] === Long.ZERO) {
return done(new MongoError('Cursor is exhausted'));
}

next(this, false, done);
});
}

Expand All @@ -259,7 +275,7 @@ export abstract class AbstractCursor extends EventEmitter {
return maybePromise(callback, done => {
const transform = this[kTransform];
const fetchDocs = () => {
next(this, (err, doc) => {
next(this, true, (err, doc) => {
if (err || doc == null) return done(err);
if (doc == null) return done();

Expand Down Expand Up @@ -350,7 +366,7 @@ export abstract class AbstractCursor extends EventEmitter {
const transform = this[kTransform];
const fetchDocs = () => {
// NOTE: if we add a `nextBatch` then we should use it here
next(this, (err, doc) => {
next(this, true, (err, doc) => {
if (err) return done(err);
if (doc == null) return done(undefined, docs);

Expand Down Expand Up @@ -518,7 +534,11 @@ function nextDocument(cursor: AbstractCursor): Document | null | undefined {
return null;
}

function next(cursor: AbstractCursor, callback: Callback<Document | null>): void {
function next(
cursor: AbstractCursor,
blocking: boolean,
callback: Callback<Document | null>
): void {
const cursorId = cursor[kId];
if (cursor.closed) {
return callback(undefined, null);
Expand Down Expand Up @@ -577,7 +597,7 @@ function next(cursor: AbstractCursor, callback: Callback<Document | null>): void
return cleanupCursor(cursor, () => callback(err, nextDocument(cursor)));
}

next(cursor, callback);
next(cursor, blocking, callback);
});

return;
Expand All @@ -604,7 +624,11 @@ function next(cursor: AbstractCursor, callback: Callback<Document | null>): void
return cleanupCursor(cursor, () => callback(err, nextDocument(cursor)));
}

next(cursor, callback);
if (cursor[kDocuments].length === 0 && blocking === false) {
return callback(undefined, null);
}

next(cursor, blocking, callback);
});
}

Expand Down Expand Up @@ -666,7 +690,7 @@ function makeCursorStream(cursor: AbstractCursor) {

function readNext() {
needToClose = false;
next(cursor, (err, result) => {
next(cursor, true, (err, result) => {
needToClose = err ? !cursor.closed : result !== null;

if (err) {
Expand Down
37 changes: 36 additions & 1 deletion test/functional/abstract_cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ describe('AbstractCursor', function () {
withClientV2((client, done) => {
const docs = [{ a: 1 }, { a: 2 }, { a: 3 }, { a: 4 }, { a: 5 }, { a: 6 }];
const coll = client.db().collection('find_cursor');
coll.drop(() => coll.insertMany(docs, done));
const tryNextColl = client.db().collection('try_next');
coll.drop(() => tryNextColl.drop(() => coll.insertMany(docs, done)));
})
);

Expand Down Expand Up @@ -124,4 +125,38 @@ describe('AbstractCursor', function () {
})
);
});

context('#tryNext', function () {
it(
'should return control to the user if an empty batch is returned',
withClientV2(function (client, done) {
const db = client.db();
db.createCollection('try_next', { capped: true, size: 10000000 }, () => {
const coll = db.collection('try_next');
coll.insertMany([{}, {}], err => {
expect(err).to.not.exist;

const cursor = coll.find({}, { tailable: true, awaitData: true });
this.defer(() => cursor.close());

cursor.tryNext((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

cursor.tryNext((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.exist;

cursor.tryNext((err, doc) => {
expect(err).to.not.exist;
expect(doc).to.be.null;
done();
});
});
});
});
});
})
);
});
});

0 comments on commit 43c94b6

Please sign in to comment.