Skip to content

Commit 2466ede

Browse files
author
Eetay Natan
committed
refactored - less closure; added pool.poolState
1 parent 43f760a commit 2466ede

File tree

3 files changed

+58
-62
lines changed

3 files changed

+58
-62
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,6 @@ pool.then(function(result) {
113113
```
114114
115115
## TODO list:
116-
- refactor to use the master promise itself as the state holder rather than a separate closure
117116
- API similar to Promise.all(): "Promise.pool([promise, ...], threads)"
118117
- options for behavior on reject:
119118
- reject as soon as one is rejected (same as Promise.all)

promise-pool.js

Lines changed: 46 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,52 @@
1-
function promisePool(options) {
2-
function PromisePoolResolver({max_parallel, next_promise, next_promise_data, threads}) {
3-
const self = this
4-
this.resolver = function(resolve, _reject) {
5-
self.max = max_parallel || threads
6-
self.started = 0
7-
self.ended = 0
8-
self.next_promise = Array.isArray(next_promise) ? [...next_promise] : next_promise
9-
self.next_promise_data = next_promise_data
10-
self.results = []
11-
function startNext(self, thread) {
12-
const context = {
13-
index: self.started,
14-
thread,
15-
data: self.next_promise_data,
16-
ended: false
17-
}
18-
const next = Array.isArray(self.next_promise) ? self.next_promise.shift() : self.next_promise({ index: self.started, data: self.next_promise_data })
19-
self.started += 1
20-
if (next && next.then) {
21-
//console.log('promise ' + JSON.stringify(context))
22-
next.then(function(result) {
23-
context.ended = self.ended
24-
self.ended += 1
25-
//console.log(`promise ${context.index} resolved`)
26-
self.results[context.index] = { context, promise: next, result: result }
27-
startNext(self, thread)
28-
}).catch(function(err) {
29-
context.ended = self.ended
30-
self.ended += 1
31-
//console.log(`promise ${context.index} rejected`)
32-
self.results[context.index] = { context, promise: next, error: err }
33-
startNext(self, thread)
34-
})
35-
} else {
36-
self.live -= 1
37-
if (self.live <= 0) {
38-
resolve(self.results)
39-
}
40-
}
1+
function promisePool({max_parallel, next_promise, next_promise_data, threads, promises, user_data}) {
2+
var promises_generator = promises || next_promise
3+
var self = {
4+
threads: max_parallel || threads,
5+
started: 0,
6+
ended: 0,
7+
promises_generator: Array.isArray(promises_generator) ? [...promises_generator] : promises_generator,
8+
next_promise_data: next_promise_data || user_data,
9+
results: []
10+
}
11+
self.live = self.threads
12+
var promise = new Promise(function(resolve, _reject) {
13+
function startNext(self, thread) {
14+
const context = {
15+
index: self.started,
16+
thread,
17+
data: self.next_promise_data,
18+
ended: false
4119
}
42-
self.live = self.max
43-
for (var i=0; i<self.max; i+=1) {
44-
startNext(self, i)
20+
const next = Array.isArray(self.promises_generator) ? self.promises_generator.shift() : self.promises_generator({ index: self.started, data: self.next_promise_data })
21+
self.started += 1
22+
if (next && next.then) {
23+
//console.log('promise ' + JSON.stringify(context))
24+
next.then(function(result) {
25+
context.ended = self.ended
26+
self.ended += 1
27+
//console.log(`promise ${context.index} resolved`)
28+
self.results[context.index] = { context, promise: next, result: result }
29+
startNext(self, thread)
30+
}).catch(function(err) {
31+
context.ended = self.ended
32+
self.ended += 1
33+
//console.log(`promise ${context.index} rejected`)
34+
self.results[context.index] = { context, promise: next, error: err }
35+
startNext(self, thread)
36+
})
37+
} else {
38+
self.live -= 1
39+
if (self.live <= 0) {
40+
resolve(self.results)
41+
}
4542
}
4643
}
47-
}
48-
return new Promise(new PromisePoolResolver(options).resolver)
44+
for (var i=0; i<self.threads; i+=1) {
45+
startNext(self, i)
46+
}
47+
})
48+
promise.poolState = self
49+
return promise
4950
}
5051

5152
module.exports = promisePool

spec/promise-pool.spec.js

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,19 +6,16 @@ function randomTimeout() {
66
}
77

88
function makePromise(i, timeout = 0) {
9-
const closure = function () {
10-
var context = {}
11-
var promise = new Promise(function (resolve, _reject) {
12-
setTimeout(() => {
13-
context.promise.done = true
14-
resolve(i)
15-
}, timeout)
16-
})
17-
context.promise = promise
18-
context.promise.done = false
19-
return promise
20-
}
21-
return closure()
9+
var context = {}
10+
var promise = new Promise(function (resolve, _reject) {
11+
setTimeout(() => {
12+
context.promise.done = true
13+
resolve(i)
14+
}, timeout)
15+
})
16+
context.promise = promise
17+
promise.done = false
18+
return promise
2219
}
2320

2421
test('Nested promise pools', (done) => {
@@ -45,7 +42,6 @@ test('Nested promise pools', (done) => {
4542
next_promise_data: 'primary promise pool'
4643
})
4744
pool.then(function(result) {
48-
console.log(JSON.stringify(result))
4945
expect(result.length).toBe(primaryPromiseList.length)
5046
const secondaryResult = result[2].result
5147
expect(secondaryResult.length).toBe(secondaryPromiseList.length)
@@ -75,7 +71,7 @@ test('Array of promises', (done) => {
7571
})
7672
})
7773

78-
test('Array of promises; one is not resolved', (done) => {
74+
test('Array of promises; one is not done yet', (done) => {
7975
expect.assertions(6)
8076
var resolved=false
8177
const promiseList = [
@@ -105,7 +101,7 @@ test('Array of promises; one is not resolved', (done) => {
105101
})
106102
})
107103

108-
test('20 promises; max parallel 3', (done) => {
104+
test('20 promises; threads parallel 3', (done) => {
109105
expect.assertions(2)
110106
const numPromises = 20
111107
const pool = promisePool({

0 commit comments

Comments
 (0)