diff --git a/src/consumer/runner.js b/src/consumer/runner.js index c0fc5abf6..66e995e03 100644 --- a/src/consumer/runner.js +++ b/src/consumer/runner.js @@ -312,56 +312,49 @@ module.exports = class Runner { return } - const enqueuedTasks = [] let expectedNumberOfExecutions = 0 let numberOfExecutions = 0 const { lock, unlock, unlockWithError } = barrier() while (true) { - const result = iterator.next() + const result = await iterator.next() if (result.done) { break } - enqueuedTasks.push(async () => { - if (!this.running) { - return - } + const batches = result.value + expectedNumberOfExecutions += batches.length - const batches = await result.value - expectedNumberOfExecutions += batches.length - - batches.map(batch => - concurrently(async () => { - try { - if (!this.running) { - return - } - - if (batch.isEmpty()) { - return - } - - await onBatch(batch) - await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) - } catch (e) { - unlockWithError(e) - } finally { - numberOfExecutions++ - if (numberOfExecutions === expectedNumberOfExecutions) { - unlock() - } + batches.forEach(batch => + concurrently(async () => { + try { + if (!this.running) { + return } - }).catch(unlockWithError) - ) - }) + + if (batch.isEmpty()) { + return + } + + await onBatch(batch) + await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) + } catch (e) { + unlockWithError(e) + } finally { + numberOfExecutions++ + if (numberOfExecutions === expectedNumberOfExecutions) { + unlock() + } + } + }).catch(unlockWithError) + ) } if (!this.running) { return } - await Promise.all(enqueuedTasks.map(fn => fn())) + // FIXME: See https://github.com/tulios/kafkajs/pull/670 await lock await this.autoCommitOffsets() await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval }) diff --git a/src/utils/bufferedAsyncIterator.js b/src/utils/bufferedAsyncIterator.js index f5e14d509..545219341 100644 --- a/src/utils/bufferedAsyncIterator.js +++ b/src/utils/bufferedAsyncIterator.js @@ -1,92 +1,44 @@ -const EventEmitter = require('events') - -const createPromiseNotifier = (emitter, results, handleError) => async (promise, i) => { - try { - const result = await promise - if (emitter.listenerCount('data') > 0) { - emitter.emit('data', { result, id: i }) - } else { - results.push(result) - } - } catch (e) { - try { - await handleError(e) - emitter.emit('error', e) - } catch (newError) { - emitter.emit('error', newError) - } - } -} - -const createGetResolvedPromise = (emitter, results) => { - let runningPromises = 0 - const fulfilledPromises = [] - - return () => { - runningPromises++ - return new Promise((resolve, reject) => { - if (results.length > 0) { - return resolve(results.shift()) - } - - const handler = ({ result, id }) => { - /** - * Since we have a single emitter for all running promises we have to skip - * already delivered results as we will have one listener per promise - * running, so once one promise resolves all of the listeners will receive - * the same value - */ - if (fulfilledPromises.includes(id)) { - return - } - - /** - * When there is a single promise running, we can safely deliver the result - * of the emitter since we won't have the risk of getting results from - * other promises - */ - if (runningPromises <= 1) { - runningPromises-- - emitter.off('data', handler) - - return resolve(result) - } - - /** - * When multiple promises are running the emitter will receive data from all - * running promises, thus the results can get mixed up. - * - * To avoid that and always unblock the first promises with the fastest results, - * we need to keep track of the id, so we don't accidentally resolve the same - * value multiple times. - */ - runningPromises-- - emitter.off('data', handler) - fulfilledPromises.push(id) - +/** + * Generator that processes the given promises, and yields their result in the order of them resolving. + */ +async function* BufferedAsyncIterator(promises) { + /** Queue of promises in order of resolution */ + const promisesQueue = [] + /** Queue of {resolve, reject} in the same order as `promisesQueue` */ + const resolveRejectQueue = [] + + promises.forEach(promise => { + // Create a new promise into the promises queue, and keep the {resolve,reject} + // in the resolveRejectQueue + let resolvePromise + let rejectPromise + promisesQueue.push( + new Promise((resolve, reject) => { + resolvePromise = resolve + rejectPromise = reject + }) + ) + resolveRejectQueue.push({ resolve: resolvePromise, reject: rejectPromise }) + + // When the promise resolves pick the next available {resolve, reject}, and + // through that resolve the next promise in the queue + promise.then( + result => { + const { resolve } = resolveRejectQueue.pop() resolve(result) + }, + err => { + const { reject } = resolveRejectQueue.pop() + reject(err) } - - emitter.on('data', handler) - emitter.on('error', reject) - }) - } -} -const defaultErrorHandler = e => { - throw e -} - -function* BufferedAsyncIterator(promises, handleError = defaultErrorHandler) { - const results = [] - const emitter = new EventEmitter() - const wrap = createPromiseNotifier(emitter, results, handleError) - const getResolvedPromise = createGetResolvedPromise(emitter, results) - const wrappedPromises = promises.map(wrap) - - emitter.setMaxListeners(wrappedPromises.length) - - for (let i = 0; i < wrappedPromises.length; i++) { - yield getResolvedPromise() + ) + }) + + // While there are promises left pick the next one, wait for it, and yield the result + while (promisesQueue.length > 0) { + const nextPromise = promisesQueue.pop() + const result = await nextPromise + yield result } }