Skip to content

Commit bb1c8b4

Browse files
committed
change queueSizeLimit, buffSizeLimit to limit
1 parent 1b19af6 commit bb1c8b4

File tree

4 files changed

+32
-26
lines changed

4 files changed

+32
-26
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,7 @@ The constructor takes a `QueueConfig` object as an argument.
2424
### QueueConfig<T, R>
2525

2626
- `worker` (optional): Worker function that processes tasks of type `T` and returns a Promise of type `R`.
27-
- `queueSizeLimit` (optional): Limits the maximum allowed size of the task queue.
28-
- `buffSizeLimit` (optional): Limits the maximum allowed size of the results buffer that have not yet been retrieved.
27+
- `limit` (optional): Limits the maximum allowed size of the tasks in queue.
2928
- `workPolicy`: Configures how tasks are processed in the queue.
3029
- `after-add` (default): The worker is started immediately after calling the `push()` method.
3130
- `async-cycle-one`: The worker is started in a separate asynchronous call with an optional `interval`.

src/Queue.spec.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { describe, expect, test } from '@jest/globals'
22
import { Queue } from './Queue'
33
import { QueueConfig } from './QueueConfig'
44
import { WorkerFn } from './WorkerFn'
5-
5+
const wait = async (timeout: number) =>
6+
new Promise((done) => setTimeout(done, timeout))
67
describe('Queue', () => {
78
let queue: Queue<any, any>
89

@@ -41,16 +42,31 @@ describe('Queue', () => {
4142
expect(result).toBe('Custom: 1')
4243
})
4344

44-
test('should block push when queue size limit reached', async () => {
45-
// TODO
46-
})
45+
test('should block push when limit reached', async () => {
46+
const tmp: any = {}
4747

48-
test('should block push when buffer size limit reached', async () => {
49-
// TODO
50-
})
48+
const worker: WorkerFn<number, string> = async (item) => {
49+
tmp[item] = true
50+
return `Processed: ${item}`
51+
}
52+
queue = new Queue<number, string>({ worker, limit: 3 })
53+
54+
const p = queue.push([1, 2, 3, 4, 5])
55+
56+
await wait(10)
57+
58+
expect(queue.buff).toBe(3)
59+
expect(tmp[1]).toBe(true)
60+
expect(tmp[2]).toBe(true)
61+
expect(tmp[3]).toBe(true)
62+
expect(tmp[4]).toBe(undefined)
63+
await queue.get()
64+
65+
await wait(10)
5166

52-
test('should push successfully after getting an item when limit reached', async () => {
53-
// TODO
67+
expect(queue.buff).toBe(3)
68+
expect(tmp[4]).toBe(true)
69+
expect(tmp[5]).toBe(undefined)
5470
})
5571

5672
test('should work with after-add policy', async () => {

src/Queue.ts

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ import { Task } from './Task'
44
import { WorkerFn } from './WorkerFn'
55
import { Response } from './Respose'
66
export class Queue<T, R> {
7-
public readonly queueSizeLimit?: number
8-
public readonly buffSizeLimit?: number
7+
public readonly limit?: number
98
public readonly workPolicy: string
109

1110
private readonly _worker?: WorkerFn<T, R>
@@ -33,11 +32,10 @@ export class Queue<T, R> {
3332
...config,
3433
}
3534

36-
const { queueSizeLimit, buffSizeLimit, worker, workPolicy } = config
35+
const { limit, worker, workPolicy } = config
3736

3837
this.workPolicy = workPolicy
39-
this.queueSizeLimit = queueSizeLimit
40-
this.buffSizeLimit = buffSizeLimit
38+
this.limit = limit
4139
this._worker = worker
4240

4341
if (config.workPolicy === 'async-cycle-one') {
@@ -157,15 +155,9 @@ export class Queue<T, R> {
157155

158156
const release = await this._mu.acquire()
159157

160-
if (this._musl.isLocked()) {
161-
await this._musl.waitForUnlock()
162-
}
158+
await this._musl.waitForUnlock()
163159

164-
if (
165-
(this.queueSizeLimit &&
166-
this._queue.length >= this.queueSizeLimit) ||
167-
(this.buffSizeLimit && this._buff.length >= this.buffSizeLimit)
168-
) {
160+
if (this.limit && this.length >= this.limit) {
169161
await this._musl.acquire()
170162
await this._musl.waitForUnlock()
171163
}

src/QueueConfig.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,7 @@ import { WorkerFn } from './WorkerFn'
22

33
export type QueueConfig<T, R> = {
44
worker?: WorkerFn<T, R>
5-
queueSizeLimit?: number
6-
buffSizeLimit?: number
5+
limit?: number
76
} & (
87
| {
98
workPolicy: 'after-add'

0 commit comments

Comments
 (0)