Skip to content

Commit

Permalink
Simplify the BufferedAsyncIterator and runner fetch driver
Browse files Browse the repository at this point in the history
The goal of that iterator is to take a number of promises in unknown states, and generate
their results in the order of these promises resolving/rejecting.

This commit changes the implementation in such a way that we do not need an event emitter *and*
a result queue and a (fairly complex) logic of synchronizing these. Instead the iterator
builds two parallel queues, one containing unresolved promises and one containing the resolve and reject
functions for these promises. The incoming promises then are configured to pop the next {resolve, reject}
structure, and call that with the respective result. The iterator at the same time picks the next unresolved
promise from its own queue, and `awaits` the result.

On the runner side this requires to `await` the iterator.next(), which actually simplifies more code
by removing the need for a separate queue of "enqueued tasks".

Note that this code also removes the configurable "handleError" logic without replacement: No caller
ever provided that, so the default of "throw the error further" is good enough.
  • Loading branch information
ankon committed Mar 19, 2020
1 parent 897715f commit 3986628
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 120 deletions.
59 changes: 26 additions & 33 deletions src/consumer/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -312,56 +312,49 @@ module.exports = class Runner {
return
}

const enqueuedTasks = []
let expectedNumberOfExecutions = 0
let numberOfExecutions = 0
const { lock, unlock, unlockWithError } = barrier()

while (true) {
const result = iterator.next()
const result = await iterator.next()
if (result.done) {
break
}

enqueuedTasks.push(async () => {
if (!this.running) {
return
}
const batches = result.value
expectedNumberOfExecutions += batches.length

const batches = await result.value
expectedNumberOfExecutions += batches.length

batches.map(batch =>
concurrently(async () => {
try {
if (!this.running) {
return
}

if (batch.isEmpty()) {
return
}

await onBatch(batch)
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
unlockWithError(e)
} finally {
numberOfExecutions++
if (numberOfExecutions === expectedNumberOfExecutions) {
unlock()
}
batches.forEach(batch =>
concurrently(async () => {
try {
if (!this.running) {
return
}
}).catch(unlockWithError)
)
})

if (batch.isEmpty()) {
return
}

await onBatch(batch)
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
} catch (e) {
unlockWithError(e)
} finally {
numberOfExecutions++
if (numberOfExecutions === expectedNumberOfExecutions) {
unlock()
}
}
}).catch(unlockWithError)
)
}

if (!this.running) {
return
}

await Promise.all(enqueuedTasks.map(fn => fn()))
// FIXME: See https://github.com/tulios/kafkajs/pull/670
await lock
await this.autoCommitOffsets()
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
Expand Down
126 changes: 39 additions & 87 deletions src/utils/bufferedAsyncIterator.js
Original file line number Diff line number Diff line change
@@ -1,92 +1,44 @@
const EventEmitter = require('events')

const createPromiseNotifier = (emitter, results, handleError) => async (promise, i) => {
try {
const result = await promise
if (emitter.listenerCount('data') > 0) {
emitter.emit('data', { result, id: i })
} else {
results.push(result)
}
} catch (e) {
try {
await handleError(e)
emitter.emit('error', e)
} catch (newError) {
emitter.emit('error', newError)
}
}
}

const createGetResolvedPromise = (emitter, results) => {
let runningPromises = 0
const fulfilledPromises = []

return () => {
runningPromises++
return new Promise((resolve, reject) => {
if (results.length > 0) {
return resolve(results.shift())
}

const handler = ({ result, id }) => {
/**
* Since we have a single emitter for all running promises we have to skip
* already delivered results as we will have one listener per promise
* running, so once one promise resolves all of the listeners will receive
* the same value
*/
if (fulfilledPromises.includes(id)) {
return
}

/**
* When there is a single promise running, we can safely deliver the result
* of the emitter since we won't have the risk of getting results from
* other promises
*/
if (runningPromises <= 1) {
runningPromises--
emitter.off('data', handler)

return resolve(result)
}

/**
* When multiple promises are running the emitter will receive data from all
* running promises, thus the results can get mixed up.
*
* To avoid that and always unblock the first promises with the fastest results,
* we need to keep track of the id, so we don't accidentally resolve the same
* value multiple times.
*/
runningPromises--
emitter.off('data', handler)
fulfilledPromises.push(id)

/**
* Generator that processes the given promises, and yields their result in the order of them resolving.
*/
async function* BufferedAsyncIterator(promises) {
/** Queue of promises in order of resolution */
const promisesQueue = []
/** Queue of {resolve, reject} in the same order as `promisesQueue` */
const resolveRejectQueue = []

promises.forEach(promise => {
// Create a new promise into the promises queue, and keep the {resolve,reject}
// in the resolveRejectQueue
let resolvePromise
let rejectPromise
promisesQueue.push(
new Promise((resolve, reject) => {
resolvePromise = resolve
rejectPromise = reject
})
)
resolveRejectQueue.push({ resolve: resolvePromise, reject: rejectPromise })

// When the promise resolves pick the next available {resolve, reject}, and
// through that resolve the next promise in the queue
promise.then(
result => {
const { resolve } = resolveRejectQueue.pop()
resolve(result)
},
err => {
const { reject } = resolveRejectQueue.pop()
reject(err)
}

emitter.on('data', handler)
emitter.on('error', reject)
})
}
}
const defaultErrorHandler = e => {
throw e
}

function* BufferedAsyncIterator(promises, handleError = defaultErrorHandler) {
const results = []
const emitter = new EventEmitter()
const wrap = createPromiseNotifier(emitter, results, handleError)
const getResolvedPromise = createGetResolvedPromise(emitter, results)
const wrappedPromises = promises.map(wrap)

emitter.setMaxListeners(wrappedPromises.length)

for (let i = 0; i < wrappedPromises.length; i++) {
yield getResolvedPromise()
)
})

// While there are promises left pick the next one, wait for it, and yield the result
while (promisesQueue.length > 0) {
const nextPromise = promisesQueue.pop()
const result = await nextPromise
yield result
}
}

Expand Down

0 comments on commit 3986628

Please sign in to comment.