Skip to content

Commit e9074f0

Browse files
authored
Fix: Remove abort listener when operation completes (#235)
1 parent 5e40017 commit e9074f0

File tree

2 files changed

+36
-9
lines changed

2 files changed

+36
-9
lines changed

source/index.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -287,14 +287,6 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
287287
this.#processQueue();
288288
}
289289

290-
async #throwOnAbort(signal: AbortSignal): Promise<never> {
291-
return new Promise((_resolve, reject) => {
292-
signal.addEventListener('abort', () => {
293-
reject(signal.reason);
294-
}, {once: true});
295-
});
296-
}
297-
298290
/**
299291
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.
300292
@@ -367,6 +359,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
367359
timeout: options.timeout,
368360
});
369361

362+
let eventListener: (() => void) | undefined;
363+
370364
try {
371365
// Check abort signal - if aborted, need to decrement the counter
372366
// that was incremented in tryToStartAnother
@@ -394,7 +388,13 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
394388
}
395389

396390
if (options.signal) {
397-
operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
391+
operation = Promise.race([operation, new Promise<never>((_resolve, reject) => {
392+
eventListener = () => {
393+
reject(options.signal!.reason);
394+
};
395+
396+
options.signal!.addEventListener('abort', eventListener);
397+
})]);
398398
}
399399

400400
const result = await operation;
@@ -404,6 +404,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
404404
reject(error);
405405
this.emit('error', error);
406406
} finally {
407+
// Clean up abort event listener
408+
if (eventListener) {
409+
options.signal!.removeEventListener('abort', eventListener);
410+
}
411+
407412
// Remove from running tasks
408413
this.#runningTasks.delete(taskSymbol);
409414

test/advanced.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,28 @@ test('aborting multiple jobs at the same time', async () => {
342342
assert.equal(queue.pending, 0);
343343
});
344344

345+
test('abort listener is removed when job is completed', async () => {
346+
const queue = new PQueue();
347+
const mockSignal = {
348+
aborted: false,
349+
numberOfListeners: 0,
350+
throwIfAborted() {
351+
if (this.aborted) {
352+
throw new Error('Aborted');
353+
}
354+
},
355+
addEventListener() {
356+
this.numberOfListeners++;
357+
},
358+
removeEventListener() {
359+
this.numberOfListeners--;
360+
},
361+
};
362+
await queue.add(async () => delay(10), {signal: mockSignal as unknown as AbortSignal});
363+
assert.equal(queue.size, 0);
364+
assert.equal(mockSignal.numberOfListeners, 0);
365+
});
366+
345367
test('pending promises counted fast enough', async () => {
346368
const queue = new PQueue({autoStart: false, concurrency: 2});
347369

0 commit comments

Comments
 (0)