Skip to content

Commit 67b4887

Browse files
author
Daniel Mewes
committed
Merge branch 'v2.3.x' into windows
2 parents 952e9a1 + 239c958 commit 67b4887

File tree

3 files changed

+282
-100
lines changed

3 files changed

+282
-100
lines changed

cursor.coffee

Lines changed: 95 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class IterableResult
3838
@_closeCbPromise = null
3939

4040
@next = @_next
41-
@each = @_each
42-
@eachAsync = @_eachAsync
4341

4442
_addResponse: (response) ->
4543
if response.t is @_type or response.t is protoResponseType.SUCCESS_SEQUENCE
@@ -101,6 +99,9 @@ class IterableResult
10199
@_responses.length is 0 or @_responses[0].r.length <= @_responseIndex
102100

103101
_promptNext: ->
102+
if @_closeCbPromise?
103+
cb = @_getCallback()
104+
cb new error.ReqlDriverError "Cursor is closed."
104105
# If there are no more waiting callbacks, just wait until the next event
105106
while @_cbQueue[0]?
106107
if @bufferEmpty() is true
@@ -188,11 +189,19 @@ class IterableResult
188189
# called. Just return a promise that resolves
189190
# immediately.
190191
@_closeCbPromise = Promise.resolve().nodeify(cb)
192+
# Also clear any buffered results, so future calls to
193+
# `next` fail.
194+
@_responses = []
195+
@_responseIndex = 0
191196
else
192197
# We aren't ended, and we need to. Create a promise
193198
# that's resolved when the END query is acknowledged.
194199
@_closeCbPromise = new Promise((resolve, reject) =>
195200
@_closeCb = (err) =>
201+
# Clear any buffered results, so future calls to
202+
# `next` fail.
203+
@_responses = []
204+
@_responseIndex = 0
196205
# Clear all callbacks for outstanding requests
197206
while @_cbQueue.length > 0
198207
@_cbQueue.shift()
@@ -211,8 +220,7 @@ class IterableResult
211220
).nodeify(cb)
212221
return @_closeCbPromise
213222

214-
215-
_each: varar(1, 2, (cb, onFinished) ->
223+
each: varar(1, 2, (cb, onFinished) ->
216224
unless typeof cb is 'function'
217225
throw new error.ReqlDriverError "First argument to each must be a function."
218226
if onFinished? and typeof onFinished isnt 'function'
@@ -232,35 +240,88 @@ class IterableResult
232240
@_next nextCb
233241
)
234242

235-
_eachAsync: varar(1, 2, (cb, errCb) ->
243+
eachAsync: varar(1, 3, (cb, errCb, options = { concurrency: 1 }) ->
236244
unless typeof cb is 'function'
237245
throw new error.ReqlDriverError 'First argument to eachAsync must be a function.'
238246

239-
if errCb? and typeof errCb isnt 'function'
240-
throw new error.ReqlDriverError "Optional second argument to eachAsync must be a function"
247+
if errCb?
248+
if typeof errCb is 'object'
249+
options = errCb
250+
errCb = undefined
251+
else if typeof errCb isnt 'function'
252+
throw new error.ReqlDriverError "Optional second argument to eachAsync must be a function or `options` object"
253+
254+
unless options and typeof options.concurrency is 'number' and options.concurrency > 0
255+
throw new error.ReqlDriverError "Optional `options.concurrency` argument to eachAsync must be a positive number"
241256

257+
pending = []
258+
259+
userCb = (data) ->
260+
if cb.length <= 1
261+
ret = Promise.resolve(cb(data)) # either synchronous or awaits promise
262+
else
263+
handlerCalled = false
264+
doneChecking = false
265+
handlerArg = undefined
266+
ret = Promise.fromNode (handler) ->
267+
asyncRet = cb(data, (err) ->
268+
handlerCalled = true
269+
if doneChecking
270+
handler(err)
271+
else
272+
handlerArg = err
273+
) # callback-style async
274+
unless asyncRet is undefined
275+
handler(new error.ReqlDriverError "A two-argument row handler for eachAsync may only return undefined.")
276+
else if handlerCalled
277+
handler(handlerArg)
278+
doneChecking = true
279+
return ret
280+
.then (data) ->
281+
return data if data is undefined or typeof data is Promise
282+
throw new error.ReqlDriverError "Row handler for eachAsync may only return a Promise or undefined."
242283
nextCb = =>
243284
if @_closeCbPromise?
244-
return Promise.reject(new error.ReqlDriverError("Cursor is closed."))
285+
return Promise.resolve().then (data) ->
286+
throw new error.ReqlDriverError "Cursor is closed."
245287
else
246-
@_next().then (data) ->
247-
return cb(data) if cb.length <= 1 # either synchronous or awaits promise
248-
return Promise.fromNode (handler) -> cb(data, handler) # callback-style async
249-
.then (result) ->
250-
return nextCb()
288+
return @_next().then (data) ->
289+
return data if pending.length < options.concurrency
290+
return Promise.any(pending)
291+
.catch Promise.AggregateError, (errs) -> throw errs[0]
292+
.return(data)
293+
.then (data) ->
294+
p = userCb(data).then ->
295+
pending.splice pending.indexOf(p), 1
296+
pending.push p
297+
.then nextCb
251298
.catch (err) ->
252-
return if err?.message is 'No more rows in the cursor.'
253-
throw err
254-
255-
return nextCb().nodeify(errCb)
299+
throw err if err?.message isnt 'No more rows in the cursor.'
300+
return Promise.all(pending) # await any queued promises before returning
301+
302+
resPromise = nextCb().then () ->
303+
errCb(null) if errCb?
304+
.catch (err) ->
305+
return errCb(err) if errCb?
306+
throw err
307+
return resPromise unless errCb?
308+
return null
256309
)
257310

311+
_each: @::each
312+
_eachAsync: @::eachAsync
313+
258314
toArray: varar 0, 1, (cb) ->
259315
if cb? and typeof cb isnt 'function'
260316
throw new error.ReqlDriverCompileError "First argument to `toArray` must be a function or undefined."
261317

262318
results = []
263-
return @eachAsync(results.push.bind(results)).return(results).nodeify(cb)
319+
wrapper = (res) =>
320+
results.push(res)
321+
return undefined
322+
return @eachAsync(wrapper).then(() =>
323+
return results
324+
).nodeify(cb)
264325

265326
_makeEmitter: ->
266327
@emitter = new EventEmitter
@@ -391,6 +452,8 @@ class ArrayResult extends IterableResult
391452

392453
_next: varar 0, 1, (cb) ->
393454
fn = (cb) =>
455+
if @_closeCbPromise?
456+
cb new error.ReqlDriverError "Cursor is closed."
394457
if @_hasNext() is true
395458
self = @
396459
if self.__index%@stackSize is @stackSize-1
@@ -402,45 +465,35 @@ class ArrayResult extends IterableResult
402465
else
403466
cb new error.ReqlDriverError "No more rows in the cursor."
404467

405-
new Promise( (resolve, reject) ->
406-
nextCb = (err, result) ->
407-
if (err)
408-
reject(err)
409-
else
410-
resolve(result)
411-
fn(nextCb)
412-
).nodeify cb
413-
468+
return Promise.fromNode(fn).nodeify(cb)
414469

415470
toArray: varar 0, 1, (cb) ->
416471
fn = (cb) =>
472+
if @_closeCbPromise?
473+
cb(new error.ReqlDriverError("Cursor is closed."))
474+
417475
# IterableResult.toArray would create a copy
418476
if @__index?
419477
cb(null, @.slice(@__index, @.length))
420478
else
421479
cb(null, @)
422480

423-
new Promise( (resolve, reject) ->
424-
toArrayCb = (err, result) ->
425-
if (err)
426-
reject(err)
427-
else
428-
resolve(result)
429-
fn(toArrayCb)
430-
).nodeify cb
431-
481+
return Promise.fromNode(fn).nodeify(cb)
432482

433-
close: ->
434-
return @
483+
close: varar 0, 1, (cb) ->
484+
# Clear the array
485+
@.length = 0
486+
@__index = 0
487+
# We set @_closeCbPromise so that functions such as `eachAsync`
488+
# know that we have been closed and can error accordingly.
489+
@_closeCbPromise = Promise.resolve().nodeify(cb)
490+
return @_closeCbPromise
435491

436492
makeIterable: (response) ->
437493
response.__proto__ = {}
438494
for name, method of ArrayResult.prototype
439495
if name isnt 'constructor'
440-
if name is '_each'
441-
response.__proto__['each'] = method
442-
response.__proto__['_each'] = method
443-
else if name is '_next'
496+
if name is '_next'
444497
response.__proto__['next'] = method
445498
response.__proto__['_next'] = method
446499
else

0 commit comments

Comments
 (0)