Skip to content

events: add addDisposableListener method to EventEmitter #58453

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions doc/api/events.md
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Experimental?

Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,31 @@ changes:

The `'removeListener'` event is emitted _after_ the `listener` is removed.

### `emitter.addDisposableListener(eventName, listener[, options])`

<!-- YAML
added: REPLACEME
-->

* `eventName` {string|symbol} The name of the event.
* `listener` {Function} The callback function
* `options` {Object}
* `once` {boolean} If `true`, the listener will be removed after being called
once.
* Returns: {Function} A function that, when called, will remove the listener.
The function will also have a `Symbol.dispose` method so the function can
be used with the `using` keyword.

```mjs
import { EventEmitter } from 'node:events';
const myEmitter = new EventEmitter();
{
using disposer = myEmitter.addDisposableListener('event', console.log);
console.log(myEmitter.listenerCount('event')); // Prints: 1
}
console.log(myEmitter.listenerCount('event')); // Prints: 0
```

### `emitter.addListener(eventName, listener)`

<!-- YAML
Expand Down
34 changes: 34 additions & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -1210,3 +1210,37 @@ function listenersController() {
},
};
}

/**
* A variation on `addListener` that returns a function that can be called
* to remove the listener. The function includes a Symbol.dispose property
* that allows the function to be used with `using` statements.
* @param {string|symbol} type
* @param {Function} listener
* @param {{
* once?: boolean;
* }} [options]
* @returns {Function}
*/
function addDisposableListener(type, listener, options = kEmptyObject) {
validateObject(options, 'options');
const {
once = false,
} = options;
validateBoolean(once, 'options.once');
if (once) {
this.once(type, listener);
} else {
this.on(type, listener);
}

let disposed = false;
const dispose = () => {
if (disposed) return;
disposed = true;
this.removeListener(type, listener);
};
dispose[SymbolDispose] = dispose;
return dispose;
};
EventEmitter.prototype.addDisposableListener = addDisposableListener;
38 changes: 16 additions & 22 deletions lib/internal/streams/end-of-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,36 +177,38 @@ function eos(stream, options, callback) {
callback.call(stream);
};

const disposableStack = new DisposableStack(); // eslint-disable-line no-undef

const onrequest = () => {
stream.req.on('finish', onfinish);
disposableStack.use(stream.req.addDisposableListener('finish', onfinish));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would need a stream benchmark run before landing

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The results are mixed and it's just not clear if the difference is worth being concerned about... some benchmarks are faster, others are slower.

streams/compose.js n=1000                                                                       ***    -19.41 %       ±1.17%  ±1.57%  ±2.06%
streams/creation.js kind='duplex' n=50000000                                                    ***      4.39 %       ±1.62%  ±2.16%  ±2.81%
streams/creation.js kind='readable' n=50000000                                                           2.46 %       ±4.12%  ±5.48%  ±7.14%
streams/creation.js kind='transform' n=50000000                                                   *      2.21 %       ±1.69%  ±2.25%  ±2.94%
streams/creation.js kind='writable' n=50000000                                                  ***      6.72 %       ±2.15%  ±2.85%  ±3.71%
streams/destroy.js kind='duplex' n=1000000                                                              -0.50 %       ±3.36%  ±4.48%  ±5.84%
streams/destroy.js kind='readable' n=1000000                                                             1.07 %       ±2.43%  ±3.24%  ±4.22%
streams/destroy.js kind='transform' n=1000000                                                           -0.55 %       ±2.33%  ±3.10%  ±4.03%
streams/destroy.js kind='writable' n=1000000                                                             0.16 %       ±2.75%  ±3.66%  ±4.77%
streams/pipe-object-mode.js n=5000000                                                           ***      4.68 %       ±2.00%  ±2.67%  ±3.50%
streams/pipe.js n=5000000                                                                       ***     10.30 %       ±1.19%  ±1.59%  ±2.07%
streams/readable-async-iterator.js sync='no' n=100000                                                    1.07 %       ±1.92%  ±2.55%  ±3.32%
streams/readable-async-iterator.js sync='yes' n=100000                                          ***      8.45 %       ±3.80%  ±5.06%  ±6.59%
streams/readable-bigread.js n=1000                                                                       0.55 %       ±2.27%  ±3.02%  ±3.94%
streams/readable-bigunevenread.js n=1000                                                        ***     -2.12 %       ±1.13%  ±1.52%  ±1.99%
streams/readable-boundaryread.js type='buffer' n=2000                                             *      1.03 %       ±0.84%  ±1.12%  ±1.46%
streams/readable-boundaryread.js type='string' n=2000                                             *      2.08 %       ±1.69%  ±2.26%  ±2.97%
streams/readable-from.js type='array' n=10000000                                                ***     -5.20 %       ±2.30%  ±3.06%  ±3.99%
streams/readable-from.js type='async-generator' n=10000000                                               1.73 %       ±2.22%  ±2.96%  ±3.86%
streams/readable-from.js type='sync-generator-with-async-values' n=10000000                              1.88 %       ±2.05%  ±2.73%  ±3.56%
streams/readable-from.js type='sync-generator-with-sync-values' n=10000000                              -1.75 %       ±2.41%  ±3.24%  ±4.28%
streams/readable-readall.js n=5000                                                               **     -4.53 %       ±3.18%  ±4.24%  ±5.51%
streams/readable-uint8array.js kind='encoding' n=1000000                                         **      2.40 %       ±1.52%  ±2.02%  ±2.63%
streams/readable-uint8array.js kind='read' n=1000000                                            ***     -2.98 %       ±1.59%  ±2.11%  ±2.75%
streams/readable-unevenread.js n=1000                                                             *     -1.81 %       ±1.45%  ±1.93%  ±2.51%
streams/writable-manywrites.js len=1024 callback='no' writev='no' sync='no' n=100000                     1.10 %       ±3.76%  ±5.00%  ±6.51%
streams/writable-manywrites.js len=1024 callback='no' writev='no' sync='yes' n=100000           ***     27.55 %       ±9.28% ±12.36% ±16.09%
streams/writable-manywrites.js len=1024 callback='no' writev='yes' sync='no' n=100000                    2.88 %       ±5.18%  ±6.90%  ±8.98%
streams/writable-manywrites.js len=1024 callback='no' writev='yes' sync='yes' n=100000            *     11.47 %       ±9.08% ±12.08% ±15.73%
streams/writable-manywrites.js len=1024 callback='yes' writev='no' sync='no' n=100000                   -1.45 %       ±3.28%  ±4.36%  ±5.68%
streams/writable-manywrites.js len=1024 callback='yes' writev='no' sync='yes' n=100000          ***     23.32 %       ±8.66% ±11.53% ±15.01%
streams/writable-manywrites.js len=1024 callback='yes' writev='yes' sync='no' n=100000                   3.83 %       ±4.48%  ±5.96%  ±7.76%
streams/writable-manywrites.js len=1024 callback='yes' writev='yes' sync='yes' n=100000                  1.32 %       ±6.89%  ±9.17% ±11.94%
streams/writable-manywrites.js len=32768 callback='no' writev='no' sync='no' n=100000                   -1.30 %       ±3.51%  ±4.68%  ±6.09%
streams/writable-manywrites.js len=32768 callback='no' writev='no' sync='yes' n=100000          ***     30.32 %      ±10.19% ±13.57% ±17.67%
streams/writable-manywrites.js len=32768 callback='no' writev='yes' sync='no' n=100000                   0.12 %       ±3.88%  ±5.17%  ±6.73%
streams/writable-manywrites.js len=32768 callback='no' writev='yes' sync='yes' n=100000          **     13.40 %       ±9.34% ±12.47% ±16.32%
streams/writable-manywrites.js len=32768 callback='yes' writev='no' sync='no' n=100000                  -1.46 %       ±4.38%  ±5.83%  ±7.61%
streams/writable-manywrites.js len=32768 callback='yes' writev='no' sync='yes' n=100000         ***     16.67 %       ±8.21% ±10.94% ±14.28%
streams/writable-manywrites.js len=32768 callback='yes' writev='yes' sync='no' n=100000                  1.19 %       ±4.05%  ±5.39%  ±7.01%
streams/writable-manywrites.js len=32768 callback='yes' writev='yes' sync='yes' n=100000        ***     21.43 %       ±9.84% ±13.14% ±17.21%
streams/writable-uint8array.js kind='object-mode' n=50000000                                             0.64 %       ±3.66%  ±4.87%  ±6.34%
streams/writable-uint8array.js kind='write' n=50000000                                            *      3.85 %       ±3.32%  ±4.42%  ±5.76%
streams/writable-uint8array.js kind='writev' n=50000000                                           *      3.15 %       ±2.99%  ±3.98%  ±5.18%

};

if (isRequest(stream)) {
stream.on('complete', onfinish);
disposableStack.use(stream.addDisposableListener('complete', onfinish));
if (!willEmitClose) {
stream.on('abort', onclose);
disposableStack.use(stream.addDisposableListener('abort', onclose));
}
if (stream.req) {
onrequest();
} else {
stream.on('request', onrequest);
disposableStack.use(stream.addDisposableListener('request', onrequest));
}
} else if (writable && !wState) { // legacy streams
stream.on('end', onlegacyfinish);
stream.on('close', onlegacyfinish);
disposableStack.use(stream.addDisposableListener('end', onlegacyfinish));
disposableStack.use(stream.addDisposableListener('close', onlegacyfinish));
}

// Not all streams will emit 'close' after 'aborted'.
if (!willEmitClose && typeof stream.aborted === 'boolean') {
stream.on('aborted', onclose);
disposableStack.use(stream.addDisposableListener('aborted', onclose));
}

stream.on('end', onend);
stream.on('finish', onfinish);
disposableStack.use(stream.addDisposableListener('end', onend));
disposableStack.use(stream.addDisposableListener('finish', onfinish));
if (options.error !== false) {
stream.on('error', onerror);
disposableStack.use(stream.addDisposableListener('error', onerror));
}
stream.on('close', onclose);
disposableStack.use(stream.addDisposableListener('close', onclose));

if (closed) {
process.nextTick(onclose);
Expand All @@ -233,18 +235,10 @@ function eos(stream, options, callback) {

const cleanup = () => {
callback = nop;
stream.removeListener('aborted', onclose);
stream.removeListener('complete', onfinish);
stream.removeListener('abort', onclose);
stream.removeListener('request', onrequest);
if (stream.req) stream.req.removeListener('finish', onfinish);
stream.removeListener('end', onlegacyfinish);
stream.removeListener('close', onlegacyfinish);
stream.removeListener('finish', onfinish);
stream.removeListener('end', onend);
stream.removeListener('error', onerror);
stream.removeListener('close', onclose);
disposableStack.dispose();
};
// Arrange for the cleanup function to call itself when disposed.
cleanup[SymbolDispose] = cleanup;

if (options.signal && !closed) {
const abort = () => {
Expand Down
28 changes: 16 additions & 12 deletions lib/internal/test_runner/harness.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ const {
PromiseWithResolvers,
SafeMap,
SafePromiseAllReturnVoid,
globalThis: { DisposableStack },
} = primordials;

const { getCallerLocation } = internalBinding('util');
const {
createHook,
Expand Down Expand Up @@ -230,6 +232,9 @@ function setupProcessState(root, globalOptions) {
const rejectionHandler =
createProcessEventHandler('unhandledRejection', root);
const coverage = configureCoverage(root, globalOptions);

const disposableStack = new DisposableStack();

const exitHandler = async (kill) => {
if (root.subtests.length === 0 && (root.hooks.before.length > 0 || root.hooks.after.length > 0)) {
// Run global before/after hooks in case there are no tests
Expand All @@ -254,27 +259,26 @@ function setupProcessState(root, globalOptions) {
}

hook.disable();
process.removeListener('uncaughtException', exceptionHandler);
process.removeListener('unhandledRejection', rejectionHandler);
process.removeListener('beforeExit', exitHandler);
if (globalOptions.isTestRunner) {
process.removeListener('SIGINT', terminationHandler);
process.removeListener('SIGTERM', terminationHandler);
}
disposableStack.dispose();
};

const terminationHandler = async () => {
await exitHandler(true);
process.exit();
};

process.on('uncaughtException', exceptionHandler);
process.on('unhandledRejection', rejectionHandler);
process.on('beforeExit', exitHandler);
disposableStack.use(
process.addDisposableListener('uncaughtException', exceptionHandler));
disposableStack.use(
process.addDisposableListener('unhandledRejection', rejectionHandler));
disposableStack.use(
process.addDisposableListener('beforeExit', exitHandler));
// TODO(MoLow): Make it configurable to hook when isTestRunner === false.
if (globalOptions.isTestRunner) {
process.on('SIGINT', terminationHandler);
process.on('SIGTERM', terminationHandler);
disposableStack.use(
process.addDisposableListener('SIGINT', terminationHandler));
disposableStack.use(
process.addDisposableListener('SIGTERM', terminationHandler));
}

root.harness.coverage = FunctionPrototypeBind(collectCoverage, null, root, coverage);
Expand Down
45 changes: 45 additions & 0 deletions test/parallel/test-events-disposable.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict';

const common = require('../common');
const { strictEqual, throws } = require('assert');
const { EventEmitter } = require('events');

const emitter = new EventEmitter();

{
// Verify that the disposable stack removes the handlers
// when the stack is disposed.
using ds = new DisposableStack(); // eslint-disable-line no-undef
ds.use(emitter.addDisposableListener('foo', common.mustCall()));
ds.use(emitter.addDisposableListener('bar', common.mustCall()));
ds.use(emitter.addDisposableListener('baz', common.mustNotCall()),
{ once: true });
emitter.emit('foo');
emitter.emit('bar');
strictEqual(emitter.listenerCount('foo'), 1);
strictEqual(emitter.listenerCount('bar'), 1);

// The disposer returned by addDisposableListener is a function that
// can be used to remove the listener.
const disposer = emitter.addDisposableListener('foo', common.mustNotCall());
disposer();
}
emitter.emit('foo');
emitter.emit('bar');
emitter.emit('baz');
strictEqual(emitter.listenerCount('foo'), 0);
strictEqual(emitter.listenerCount('bar'), 0);

// ============================================================================
// Type checking on inputs
throws(() => emitter.addDisposableListener('foo', 'not a function'), {
code: 'ERR_INVALID_ARG_TYPE',
});

throws(() => emitter.addDisposableListener('foo', () => {}, ''), {
code: 'ERR_INVALID_ARG_TYPE',
});

throws(() => emitter.addDisposableListener('foo', () => {}, { once: '' }), {
code: 'ERR_INVALID_ARG_TYPE',
});
Loading