Skip to content

Commit

Permalink
Simplify the BufferedAsyncIterator
Browse files Browse the repository at this point in the history
The goal of that iterator is to take a number of promises in unknown states, and generate
their results in the order of these promises resolving/rejecting.

This commit changes the implementation in such a way that we do not need an event emitter *and*
a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator
builds two parallel queues, one containing unresolved promises and one containing the resolve and reject
functions for these promises. The incoming promises then are configured to pop the next {resolve, reject}
structure, and call that with the respective result. The iterator at the same time picks the next unresolved
promise from its own queue and yields that.
  • Loading branch information
ankon committed Jun 2, 2020
1 parent 67c7e95 commit 6b38635
Showing 1 changed file with 50 additions and 84 deletions.
134 changes: 50 additions & 84 deletions src/utils/bufferedAsyncIterator.js
Original file line number Diff line number Diff line change
@@ -1,92 +1,58 @@
const { EventEmitter } = require('events')

const createPromiseNotifier = (emitter, results, handleError) => async (promise, i) => {
try {
const result = await promise
if (emitter.listenerCount('data') > 0) {
emitter.emit('data', { result, id: i })
} else {
results.push(result)
}
} catch (e) {
try {
await handleError(e)
emitter.emit('error', e)
} catch (newError) {
emitter.emit('error', newError)
}
}
}

const createGetResolvedPromise = (emitter, results) => {
let runningPromises = 0
const fulfilledPromises = []

return () => {
runningPromises++
return new Promise((resolve, reject) => {
if (results.length > 0) {
return resolve(results.shift())
}

const handler = ({ result, id }) => {
/**
* Since we have a single emitter for all running promises we have to skip
* already delivered results as we will have one listener per promise
* running, so once one promise resolves all of the listeners will receive
* the same value
*/
if (fulfilledPromises.includes(id)) {
return
}

/**
* When there is a single promise running, we can safely deliver the result
* of the emitter since we won't have the risk of getting results from
* other promises
*/
if (runningPromises <= 1) {
runningPromises--
emitter.off('data', handler)

return resolve(result)
}

/**
* When multiple promises are running the emitter will receive data from all
* running promises, thus the results can get mixed up.
*
* To avoid that and always unblock the first promises with the fastest results,
* we need to keep track of the id, so we don't accidentally resolve the same
* value multiple times.
*/
runningPromises--
emitter.off('data', handler)
fulfilledPromises.push(id)

resolve(result)
}

emitter.on('data', handler)
emitter.on('error', reject)
})
}
}
const defaultErrorHandler = e => {
throw e
}

/**
* Generator that processes the given promises, and yields their result in the order of them resolving.
*
* @template T
* @param {Promise<T>[]} promises promises to process
* @param {(err: Error) => any} [handleError] optional error handler
* @returns {Generator<Promise<T>>}
*/
function* BufferedAsyncIterator(promises, handleError = defaultErrorHandler) {
const results = []
const emitter = new EventEmitter()
const wrap = createPromiseNotifier(emitter, results, handleError)
const getResolvedPromise = createGetResolvedPromise(emitter, results)
const wrappedPromises = promises.map(wrap)

emitter.setMaxListeners(wrappedPromises.length)

for (let i = 0; i < wrappedPromises.length; i++) {
yield getResolvedPromise()
/** Queue of promises in order of resolution */
const promisesQueue = []
/** Queue of {resolve, reject} in the same order as `promisesQueue` */
const resolveRejectQueue = []

promises.forEach(promise => {
// Create a new promise into the promises queue, and keep the {resolve,reject}
// in the resolveRejectQueue
let resolvePromise
let rejectPromise
promisesQueue.push(
new Promise((resolve, reject) => {
resolvePromise = resolve
rejectPromise = reject
})
)
resolveRejectQueue.push({ resolve: resolvePromise, reject: rejectPromise })

// When the promise resolves pick the next available {resolve, reject}, and
// through that resolve the next promise in the queue
promise.then(
result => {
const { resolve } = resolveRejectQueue.pop()
resolve(result)
},
async err => {
const { reject } = resolveRejectQueue.pop()
try {
await handleError(err)
reject(err)
} catch (newError) {
reject(newError)
}
}
)
})

// While there are promises left pick the next one to yield
// The caller will then wait for the value to resolve.
while (promisesQueue.length > 0) {
const nextPromise = promisesQueue.pop()
yield nextPromise
}
}

Expand Down

0 comments on commit 6b38635

Please sign in to comment.