Skip to content

feat: add rejectLate option #12

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,24 @@ const promiseCallLimit = require('promise-call-limit')
const things = getLongListOfThingsToFrobulate()

// frobulate no more than 4 things in parallel
promiseCallLimit(things.map(thing => () => frobulateThing(thing)), 4)
promiseCallLimit(things.map(thing => () => frobulateThing(thing)), {
limit: 4 })
.then(results => console.log('frobulated 4 at a time', results))
```

## API

### promiseCallLimit(queue Array<() => Promise>, limit = defaultLimit)

The default limit is the number of CPUs on the system - 1, or 1.

The reason for subtracting one is that presumably the main thread is taking
up a CPU as well, so let's not be greedy.
### promiseCallLimit(queue Array<() => Promise>, opts<Object>)

opts can contain:
- limit: specified concurrency limit. Defaults to the number of
CPUs on the system minus one. Presumably the main thread is taking
up a CPU as well, so let's not be greedy. In the case where there
is only one cpu the limit will default to 1.
- rejectLate: if true, then any rejection will not prevent the rest of
the queue from running. Any subsequent rejections will be ignored,
and the first rejection will be what the function finally rejects
with.

Note that the array should be a list of Promise-_returning_ functions, not
Promises themselves. If you have a bunch of Promises already, you're best
Expand Down
29 changes: 19 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ const os = require('os')

/* istanbul ignore next - version-specific workaround */
const defLimit = 'availableParallelism' in os
? os.availableParallelism()
: Math.max(1, os.cpus().length)
? Math.max(1, os.availableParallelism() - 1)
: Math.max(1, os.cpus().length - 1)

const callLimit = (queue, limit = defLimit) => new Promise((res, rej) => {
const callLimit = (queue, { limit = defLimit, rejectLate } = {}) => new Promise((res, rej) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking change

let active = 0
let current = 0
const results = []

// Whether or not we rejected, distinct from the rejection just in case the rejection itself is falsey
let rejected = false
let rejection
const reject = er => {
if (rejected)
return
rejected = true
rej(er)
rejection = er
if (!rejectLate)
rej(rejection)
}

let resolved = false
Expand All @@ -31,22 +35,27 @@ const callLimit = (queue, limit = defLimit) => new Promise((res, rej) => {

const run = () => {
const c = current++
if (c >= queue.length) {
return resolve()
}
if (c >= queue.length)
return rejected ? reject() : resolve()

active ++
results[c] = queue[c]().then(result => {
active --
results[c] = result
return result
}, (er) => {
active --
reject(er)
}).then(result => {
if (rejected && active === 0)
return rej(rejection)
run()
return result
}, reject)
})
}

for (let i = 0; i < limit; i++) {
for (let i = 0; i < limit; i++)
run()
}
})

module.exports = callLimit
27 changes: 26 additions & 1 deletion test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ t.test('two by two', t => {
calledTre = true
res(3)
}, 50)),
], 2).then(res => t.strictSame(res, [1, 2, 3]))
], { limit: 2 }).then(res => t.strictSame(res, [1, 2, 3]))
})

t.test('rejection', t => t.rejects(callLimit([
Expand All @@ -39,3 +39,28 @@ t.test('triple rejection', t => t.rejects(callLimit([
() => Promise.reject(new Error('poop')),
() => Promise.reject(new Error('poop')),
]), { message: 'poop' }))

t.test('late rejection', async t => {
const results = []
await t.rejects(callLimit([
() => new Promise(resolve => setTimeout(() => {
results.push('first success')
resolve('ok')
}, 50)),
() => new Promise((_, reject) => {
setTimeout(() => {
results.push('slow rejection')
reject(new Error('slow rejection'))
}, 100)
}),
() => {
results.push('fast rejection')
return Promise.reject(new Error('fast rejection'))
},
() => new Promise(resolve => setTimeout(() => {
results.push('second success')
resolve('ok 2')
}, 50)),
], { limit: 2, rejectLate: true }), { message: 'fast rejection' })
t.match(results, ['first success', 'fast rejection', 'slow rejection', 'second success'])
})