Skip to content

Commit a47ac98

Browse files
committed
fix(cursor): fix issue with long-running eachAsync() cursor
Fix #8235
1 parent ba18b9d commit a47ac98

File tree

2 files changed

+55
-51
lines changed

2 files changed

+55
-51
lines changed

lib/helpers/cursor/eachAsync.js

Lines changed: 54 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const utils = require('../../utils');
2222

2323
module.exports = function eachAsync(next, fn, options, callback) {
2424
const parallel = options.parallel || 1;
25+
const enqueue = asyncQueue();
2526

2627
const handleNextResult = function(doc, callback) {
2728
const promise = fn(doc);
@@ -37,71 +38,74 @@ module.exports = function eachAsync(next, fn, options, callback) {
3738
const iterate = function(callback) {
3839
let drained = false;
3940

40-
const getAndRun = function(cb) {
41-
_next(function(err, doc) {
42-
if (err) return cb(err);
43-
if (drained) {
44-
return;
45-
}
46-
if (doc == null) {
47-
drained = true;
48-
return callback(null);
49-
}
50-
handleNextResult(doc, function(err) {
51-
if (err) return cb(err);
52-
// Make sure to clear the stack re: gh-4697
53-
setTimeout(function() {
54-
getAndRun(cb);
55-
}, 0);
56-
});
57-
});
58-
};
59-
6041
let error = null;
6142
for (let i = 0; i < parallel; ++i) {
62-
getAndRun(err => {
63-
if (error != null) {
64-
return;
43+
enqueue(fetch);
44+
}
45+
46+
function fetch(done) {
47+
if (drained || error) {
48+
return done();
49+
}
50+
51+
next(function(err, doc) {
52+
if (drained || error) {
53+
return done();
6554
}
6655
if (err != null) {
6756
error = err;
68-
return callback(err);
57+
callback(err);
58+
return done();
59+
}
60+
if (doc == null) {
61+
drained = true;
62+
callback(null);
63+
return done();
6964
}
65+
66+
done();
67+
68+
handleNextResult(doc, function(err) {
69+
if (err != null) {
70+
error = err;
71+
return callback(err);
72+
}
73+
74+
setTimeout(() => enqueue(fetch), 0);
75+
});
7076
});
7177
}
7278
};
7379

74-
const _nextQueue = [];
7580
return utils.promiseOrCallback(callback, cb => {
7681
iterate(cb);
7782
});
83+
};
7884

79-
// `next()` can only execute one at a time, so make sure we always execute
80-
// `next()` in series, while still allowing multiple `fn()` instances to run
81-
// in parallel.
82-
function _next(cb) {
83-
if (_nextQueue.length === 0) {
84-
return next(_step(cb));
85-
}
86-
_nextQueue.push(cb);
87-
}
85+
// `next()` can only execute one at a time, so make sure we always execute
86+
// `next()` in series, while still allowing multiple `fn()` instances to run
87+
// in parallel.
88+
function asyncQueue() {
89+
const _queue = [];
90+
let inProgress = null;
91+
let id = 0;
8892

89-
function _step(cb) {
90-
return function(err, doc) {
91-
if (err != null) {
92-
return cb(err);
93-
}
94-
cb(null, doc);
93+
return function enqueue(fn) {
94+
if (_queue.length === 0 && inProgress == null) {
95+
inProgress = id++;
96+
return fn(_step);
97+
}
98+
_queue.push(fn);
99+
};
95100

96-
if (doc == null) {
97-
return;
101+
function _step() {
102+
setTimeout(() => {
103+
inProgress = null;
104+
if (_queue.length > 0) {
105+
inProgress = id++;
106+
const fn = _queue.shift();
107+
fn(_step);
98108
}
99-
100-
setTimeout(() => {
101-
if (_nextQueue.length > 0) {
102-
next(_step(_nextQueue.unshift()));
103-
}
104-
}, 0);
105-
};
109+
}, 0);
106110
}
107-
};
111+
}

test/helpers/cursor.eachAsync.test.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const eachAsync = require('../../lib/helpers/cursor/eachAsync');
66
describe('eachAsync()', function() {
77
it('exhausts large cursor without parallel calls (gh-8235)', function() {
88
this.timeout(10000);
9-
9+
1010
let numInProgress = 0;
1111
let num = 0;
1212
const max = 1000;

0 commit comments

Comments
 (0)