Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions source/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,14 +287,6 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
this.#processQueue();
}

async #throwOnAbort(signal: AbortSignal): Promise<never> {
return new Promise((_resolve, reject) => {
signal.addEventListener('abort', () => {
reject(signal.reason);
}, {once: true});
});
}

/**
Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

Expand Down Expand Up @@ -367,6 +359,8 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
timeout: options.timeout,
});

let eventListener: (() => void) | undefined;

try {
// Check abort signal - if aborted, need to decrement the counter
// that was incremented in tryToStartAnother
Expand Down Expand Up @@ -394,7 +388,13 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
}

if (options.signal) {
operation = Promise.race([operation, this.#throwOnAbort(options.signal)]);
operation = Promise.race([operation, new Promise<never>((_resolve, reject) => {
eventListener = () => {
reject(options.signal!.reason);
};

options.signal!.addEventListener('abort', eventListener);
})]);
}

const result = await operation;
Expand All @@ -404,6 +404,11 @@ export default class PQueue<QueueType extends Queue<RunFunction, EnqueueOptionsT
reject(error);
this.emit('error', error);
} finally {
// Clean up abort event listener
if (eventListener) {
options.signal!.removeEventListener('abort', eventListener);
}

// Remove from running tasks
this.#runningTasks.delete(taskSymbol);

Expand Down
22 changes: 22 additions & 0 deletions test/advanced.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,28 @@ test('aborting multiple jobs at the same time', async () => {
assert.equal(queue.pending, 0);
});

test('abort listener is removed when job is completed', async () => {
const queue = new PQueue();
const mockSignal = {
aborted: false,
numberOfListeners: 0,
throwIfAborted() {
if (this.aborted) {
throw new Error('Aborted');
}
},
addEventListener() {
this.numberOfListeners++;
},
removeEventListener() {
this.numberOfListeners--;
},
};
await queue.add(async () => delay(10), {signal: mockSignal as unknown as AbortSignal});
assert.equal(queue.size, 0);
assert.equal(mockSignal.numberOfListeners, 0);
});

test('pending promises counted fast enough', async () => {
const queue = new PQueue({autoStart: false, concurrency: 2});

Expand Down