Useful for rate-limiting async (or sync) operations. For example, when interacting with a REST API or when doing CPU/memory intensive tasks.
Here we run only one promise at the time. For example, set concurrency
to 4 to run four promises at the same time.
import PQueue from "https://deno.land/x/p_queue@1.0.1/mod.ts"
const queue = new PQueue({
concurrency: 1,
})
async function one() {
await queue.add(() => fetch("https://sindresorhus.com"))
console.log("Done: sindresorhus.com")
}
async function two() {
await queue.add(() => fetch("https://avajs.dev"))
console.log("Done: avajs.dev")
}
async function three() {
const task = await getUnicornTask()
await queue.add(task)
console.log("Done: Unicorn task")
}
one()
two()
three()
See https://doc.deno.land/https/deno.land/x/p_queue@1.0.1/mod.ts
Emitted as each item is processed in the queue for the purpose of tracking progress.
import PQueue from "https://deno.land/x/p_queue@1.0.1/mod.ts"
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue({
concurrency: 2,
})
let count = 0
queue.on("active", () => {
console.log(
`Working on item #${++count}. Size: ${queue.size} Pending: ${queue.pending}`,
)
})
queue.add(() => Promise.resolve())
queue.add(() => delay(2000))
queue.add(() => Promise.resolve())
queue.add(() => Promise.resolve())
queue.add(() => delay(500))
Emitted every time the queue becomes empty and all promises have completed queue.size === 0 && queue.pending === 0
.
import PQueue from "https://deno.land/x/p_queue@1.0.1/mod.ts"
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue()
queue.on("idle", () => {
console.log(
`Queue is idle. Size: ${queue.size} Pending: ${queue.pending}`,
)
})
const job1 = queue.add(() => delay(2000))
const job2 = queue.add(() => delay(500))
await job1
await job2
// => 'Queue is idle. Size: 0 Pending: 0'
await queue.add(() => delay(600))
// => 'Queue is idle. Size: 0 Pending: 0'
The idle
event is emitted every time the queue reaches an idle state. On the other hand, the promise the onIdle()
function returns resolves once the queue becomes idle instead of every time the queue is idle.
Emitted every time the add method is called and the number of pending or queued tasks is increased.
Emitted every time a task is completed and the number of pending or queued tasks is decreased.
import PQueue from "https://deno.land/x/p_queue@1.0.1/mod.ts"
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue()
queue.on("add", () => {
console.log(
`Task is added. Size: ${queue.size} Pending: ${queue.pending}`,
)
})
queue.on("next", () => {
console.log(
`Task is completed. Size: ${queue.size} Pending: ${queue.pending}`,
)
})
const job1 = queue.add(() => delay(2000))
const job2 = queue.add(() => delay(500))
await job1
await job2
// => 'Task is added. Size: 0 Pending: 1'
// => 'Task is added. Size: 0 Pending: 2'
await queue.add(() => delay(600))
// => 'Task is completed. Size: 0 Pending: 1'
// => 'Task is completed. Size: 0 Pending: 0'
A more advanced example to help you understand the flow.
import PQueue from "https://deno.land/x/p_queue@1.0.1/mod.ts"
const delay = (ms: number) => new Promise(r => setTimeout(r, ms))
const queue = new PQueue({
concurrency: 1,
})
async function taskOne() {
await delay(200)
console.log(`8. Pending promises: ${queue.pending}`)(// => '8. Pending promises: 0'
async () => {
await queue.add(async () => "π")
console.log("11. Resolved")
})()
console.log("9. Added π")
console.log(`10. Pending promises: ${queue.pending}`)
// => '10. Pending promises: 1'
await queue.onIdle()
console.log("12. All work is done")
}
async function taskTwo() {
await queue.add(async () => "π¦")
console.log("5. Resolved")
}
console.log("1. Added π¦")
;(async () => {
await queue.add(async () => "π΄")
console.log("6. Resolved")
})()
console.log("2. Added π΄")
;(async () => {
await queue.onEmpty()
console.log("7. Queue is empty")
})()
console.log(`3. Queue size: ${queue.size}`)
// => '3. Queue size: 1`
console.log(`4. Pending promises: ${queue.pending}`)
// => '4. Pending promises: 1'
$ deno run -A test.ts
01. Added π¦
02. Added π΄
03. Queue size: 1
04. Pending promises: 1
05. Resolved π¦
06. Resolved π΄
07. Queue is empty
08. Pending promises: 0
09. Added π
10. Pending promises: 1
11. Resolved π
12. All work is done
For implementing more complex scheduling policies, you can provide a QueueClass in the options:
class QueueClass {
constructor() {
this._queue = []
}
enqueue(run, options) {
this._queue.push(run)
}
dequeue() {
return this._queue.shift()
}
get size() {
return this._queue.length
}
filter(options) {
return this._queue
}
}
const queue = new PQueue({
queueClass: QueueClass,
})
p_queue
will call corresponding methods to put and get operations from this queue.
They are just different constraints. The concurrency
option limits how many things run at the same time. The intervalCap
option limits how many things run in total during the interval (over time).
- HUGE thanks to @sindresorhus -- this repository is mostly his code, modified to work with Deno
- P(romise) Queue is licensed under the MIT license.
- Code is adapted from Sindre's p-queue for node (also under the MIT license)