Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 23 additions & 72 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,84 +366,35 @@ map.series = fn => {
}
}

const mapPoolIndexedWorker = insert => (size, fn, resolve, reject, x, y, i) => {
let point
try {
point = fn(x[i])
} catch (err) {
return reject(err)
}
if (isPromise(point)) {
point.then(res => {
insert(y, res, i)
if (isDefined(x[i + size])) {
mapPoolIndexedWorker(insert)(size, fn, resolve, reject, x, y, i + size)
} else if (i === x.length - 1) {
resolve(y)
}
}).catch(reject)
} else {
insert(y, point, i)
if (isDefined(x[i + size])) {
mapPoolIndexedWorker(insert)(size, fn, resolve, reject, x, y, i + size)
} else if (i === x.length - 1) {
resolve(y)
}
}
}

const mapPoolArrayWorker = mapPoolIndexedWorker((y, xi, i) => { y[i] = xi })

const mapPoolArray = (size, fn, x) => new Promise((resolve, reject) => {
// https://stackoverflow.com/questions/62037349/rubicos-map-pool-array-implementation
// https://stackoverflow.com/questions/39195441/limited-parallelism-with-async-await-in-typescript-es7
const mapPoolConstructor = construct => async (size, fn, x) => {
const y = []
for (let i = 0; i < Math.min(x.length, size); i++) {
mapPoolArrayWorker(size, fn, resolve, reject, x, y, i, i) // start off the workers
}
})

const mapPoolUnorderedWorker = insert => (
size, fn, resolve, reject, iter, y, promises,
) => {
const { value, done } = iter.next()
if (done) {
if (resolve._called) return
resolve._called = true
return Promise.all(promises).then(() => resolve(y))
}
let point
try {
point = fn(value)
} catch (err) {
return reject(err)
}
if (isPromise(point)) {
promises.push(point.then(res => insert(y, res)).catch(reject))
mapPoolUnorderedWorker(insert)(size, fn, resolve, reject, iter, y, promises)
} else {
insert(y, point)
mapPoolUnorderedWorker(insert)(size, fn, resolve, reject, iter, y, promises)
const promises = new Set()
for (const xi of x) {
if (promises.size >= size) {
await Promise.race(promises)
}
const yi = fn(xi)
if (isPromise(yi)) {
const p = yi.then(res => {
promises.delete(p)
return res
})
promises.add(p)
y.push(p)
} else {
y.push(yi)
}
}
return construct(await Promise.all(y))
}

const mapPoolSetWorker = mapPoolUnorderedWorker((y, xi) => y.add(xi))
const mapPoolArray = mapPoolConstructor(y => y)

const mapPoolSet = (size, fn, x) => new Promise((resolve, reject) => {
const iter = x[Symbol.iterator].bind(x)()
const y = new Set()
for (let i = 0; i < Math.min(x.size, size); i++) {
mapPoolSetWorker(size, fn, resolve, reject, iter, y, []) // start off the workers
}
})

const mapPoolMapWorker = mapPoolUnorderedWorker((y, xi) => y.set(xi[0], xi[1]))
const mapPoolSet = mapPoolConstructor(y => new Set(y))

const mapPoolMap = (size, fn, x) => new Promise((resolve, reject) => {
const iter = x[Symbol.iterator].bind(x)()
const y = new Map()
for (let i = 0; i < Math.min(x.size, size); i++) {
mapPoolMapWorker(size, fn, resolve, reject, iter, y, []) // start off the workers
}
})
const mapPoolMap = mapPoolConstructor(y => new Map(y))

map.pool = (size, fn) => {
if (!isNumber(size)) {
Expand Down
34 changes: 27 additions & 7 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,15 @@ describe('rubico', () => {
ade(await r.map.pool(1, asyncSquare)([1, 2, 3, 4, 5]), [1, 4, 9, 16, 25])
ade(await r.map.pool(9, asyncSquare)([1, 2, 3, 4, 5]), [1, 4, 9, 16, 25])
})
it('=> [] for empty array', async () => {
aok(r.map.pool(1, square)([]) instanceof Promise)
ade(await r.map.pool(1, square)([]), [])
})
it('works for arrays of undefined values', async () => {
ade(await r.map.pool(1, x => x)([,,,,,]), Array(5).fill(undefined))
ade(await r.map.pool(1, x => x)(Array(5)), Array(5).fill(undefined))
ade(await r.map.pool(1, x => x)(Array(5).fill(null)), Array(5).fill(null))
})
it('maps with asynchronous limit for Sets', async () => {
const numbersSet = new Set([1, 2, 3, 4, 5])
const squaresSet = new Set([1, 4, 9, 16, 25])
Expand All @@ -795,20 +804,31 @@ describe('rubico', () => {
ade(await r.map.pool(1, asyncSquareEntry)(numbersMap), squaresMap)
ade(await r.map.pool(9, asyncSquareEntry)(numbersMap), squaresMap)
})
it('abides by asynchronous limit', async () => {
let i = 0, maxi = 0
it('abides by asynchronous limit for arrays and sets', async () => {
const numbers = [1, 2, 3, 4, 5, 6]
let i = 0, maxi = 0, period = 10
const plusSleepMinus = n => (async () => {
i += 1
maxi = Math.max(maxi, i)
})().then(() => sleep(10)).then(() => {
})().then(() => sleep(period)).then(() => {
i -=1
return n
})
await r.map.pool(2, plusSleepMinus)([1, 2, 3, 4, 5, 6])
ade(await r.map.pool(2, plusSleepMinus)(numbers), numbers)
assert.strictEqual(maxi, 2)
await r.map.pool(6, plusSleepMinus)([1, 2, 3, 4, 5, 6])
assert.strictEqual(maxi, 6)
})
assert.strictEqual(i, 0)
maxi = 0
ade(await r.map.pool(3, plusSleepMinus)([1, 2, 3, 4, 5, 6]), numbers)
assert.strictEqual(maxi, 3)
assert.strictEqual(i, 0)
maxi = 0
const x = await r.map.pool(2, plusSleepMinus)(new Set([1, 2, 3, 4, 5, 6]))
assert.strictEqual(maxi, 2)
assert.strictEqual(i, 0)
maxi = 0
await r.map.pool(3, plusSleepMinus)(new Set([1, 2, 3, 4, 5, 6]))
assert.strictEqual(maxi, 3)
}).timeout(20000)
it('throws TypeError on map.pool(NaN)', async () => {
assert.throws(
() => r.map.pool(NaN),
Expand Down