Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diagnostics_channel: improved subscribe/unsubscribe #42714

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions doc/api/deprecations.md
Original file line number Diff line number Diff line change
Expand Up @@ -3145,6 +3145,24 @@ parameter in [`fs.write()`][], [`fs.writeFile()`][], [`fs.appendFile()`][],
[`fs.writeFileSync()`][], and [`fs.appendFileSync()`][] is deprecated.
Convert them to primitive strings.

### DEP0163: `channel.subscribe(onMessage)`, `channel.unsubscribe(onMessage)`

<!-- YAML
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/42714
description: Documentation-only deprecation.
-->

Type: Documentation-only

These methods were deprecated because they can be used in a way which does not
hold the channel reference alive long enough to receive the events.

Use [`diagnostics_channel.subscribe(name, onMessage)`][] or
[`diagnostics_channel.unsubscribe(name, onMessage)`][] which does the same
thing instead.

[Legacy URL API]: url.md#legacy-url-api
[NIST SP 800-38D]: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-38d.pdf
[RFC 6066]: https://tools.ietf.org/html/rfc6066#section-3
Expand Down Expand Up @@ -3185,6 +3203,8 @@ Convert them to primitive strings.
[`crypto.scrypt()`]: crypto.md#cryptoscryptpassword-salt-keylen-options-callback
[`decipher.final()`]: crypto.md#decipherfinaloutputencoding
[`decipher.setAuthTag()`]: crypto.md#deciphersetauthtagbuffer-encoding
[`diagnostics_channel.subscribe(name, onMessage)`]: diagnostics_channel.md#diagnostics_channelsubscribename-onmessage
[`diagnostics_channel.unsubscribe(name, onMessage)`]: diagnostics_channel.md#diagnostics_channelunsubscribename-onmessage
[`dns.lookup()`]: dns.md#dnslookuphostname-options-callback
[`dnsPromises.lookup()`]: dns.md#dnspromiseslookuphostname-options
[`domain`]: domain.md
Expand Down
102 changes: 95 additions & 7 deletions doc/api/diagnostics_channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ import diagnostics_channel from 'node:diagnostics_channel';
// Get a reusable channel object
const channel = diagnostics_channel.channel('my-channel');

// Subscribe to the channel
channel.subscribe((message, name) => {
function onMessage(message, name) {
// Received data
});
}

// Subscribe to the channel
diagnostics_channel.subscribe('my-channel', onMessage);

// Check if the channel has an active subscriber
if (channel.hasSubscribers) {
Expand All @@ -55,6 +57,9 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Unsubscribe from the channel
diagnostics_channel.unsubscribe('my-channel', onMessage);
```

```cjs
Expand All @@ -63,10 +68,12 @@ const diagnostics_channel = require('node:diagnostics_channel');
// Get a reusable channel object
const channel = diagnostics_channel.channel('my-channel');

// Subscribe to the channel
channel.subscribe((message, name) => {
function onMessage(message, name) {
// Received data
});
}

// Subscribe to the channel
diagnostics_channel.subscribe('my-channel', onMessage);

// Check if the channel has an active subscriber
if (channel.hasSubscribers) {
Expand All @@ -75,6 +82,9 @@ if (channel.hasSubscribers) {
some: 'data'
});
}

// Unsubscribe from the channel
diagnostics_channel.unsubscribe('my-channel', onMessage);
```

#### `diagnostics_channel.hasSubscribers(name)`
Expand Down Expand Up @@ -121,7 +131,7 @@ added:
* `name` {string|symbol} The channel name
* Returns: {Channel} The named channel object

This is the primary entry-point for anyone wanting to interact with a named
This is the primary entry-point for anyone wanting to publish to a named
Qard marked this conversation as resolved.
Show resolved Hide resolved
channel. It produces a channel object which is optimized to reduce overhead at
publish time as much as possible.

Expand All @@ -137,6 +147,76 @@ const diagnostics_channel = require('node:diagnostics_channel');
const channel = diagnostics_channel.channel('my-channel');
```

#### `diagnostics_channel.subscribe(name, onMessage)`

<!-- YAML
added:
- REPLACEME
-->

* `name` {string|symbol} The channel name
* `onMessage` {Function} The handler to receive channel messages
* `message` {any} The message data
* `name` {string|symbol} The name of the channel

Register a message handler to subscribe to this channel. This message handler
will be run synchronously whenever a message is published to the channel. Any
errors thrown in the message handler will trigger an [`'uncaughtException'`][].

```mjs
import diagnostics_channel from 'diagnostics_channel';

diagnostics_channel.subscribe('my-channel', (message, name) => {
// Received data
});
```

```cjs
const diagnostics_channel = require('diagnostics_channel');

diagnostics_channel.subscribe('my-channel', (message, name) => {
// Received data
});
```

#### `diagnostics_channel.unsubscribe(name, onMessage)`

<!-- YAML
added:
- REPLACEME
-->

* `name` {string|symbol} The channel name
* `onMessage` {Function} The previous subscribed handler to remove
* Returns: {boolean} `true` if the handler was found, `false` otherwise.

Remove a message handler previously registered to this channel with
[`diagnostics_channel.subscribe(name, onMessage)`][].

```mjs
import diagnostics_channel from 'diagnostics_channel';

function onMessage(message, name) {
// Received data
}

diagnostics_channel.subscribe('my-channel', onMessage);

diagnostics_channel.unsubscribe('my-channel', onMessage);
```

```cjs
const diagnostics_channel = require('diagnostics_channel');

function onMessage(message, name) {
// Received data
}

diagnostics_channel.subscribe('my-channel', onMessage);

diagnostics_channel.unsubscribe('my-channel', onMessage);
```

### Class: `Channel`

<!-- YAML
Expand Down Expand Up @@ -228,8 +308,11 @@ channel.publish({
added:
- v15.1.0
- v14.17.0
deprecated: REPLACEME
Qard marked this conversation as resolved.
Show resolved Hide resolved
-->

> Stability: 0 - Deprecated: Use [`diagnostics_channel.subscribe(name, onMessage)`][]

* `onMessage` {Function} The handler to receive channel messages
* `message` {any} The message data
* `name` {string|symbol} The name of the channel
Expand Down Expand Up @@ -264,6 +347,7 @@ channel.subscribe((message, name) => {
added:
- v15.1.0
- v14.17.0
deprecated: REPLACEME
changes:
- version:
- v17.1.0
Expand All @@ -273,6 +357,8 @@ changes:
description: Added return value. Added to channels without subscribers.
-->

> Stability: 0 - Deprecated: Use [`diagnostics_channel.unsubscribe(name, onMessage)`][]

* `onMessage` {Function} The previous subscribed handler to remove
* Returns: {boolean} `true` if the handler was found, `false` otherwise.

Expand Down Expand Up @@ -345,3 +431,5 @@ Emitted when server sends a response.
[`'uncaughtException'`]: process.md#event-uncaughtexception
[`channel.subscribe(onMessage)`]: #channelsubscribeonmessage
[`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname
[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelsubscribename-onmessage
[`diagnostics_channel.unsubscribe(name, onMessage)`]: #diagnostics_channelunsubscribename-onmessage
18 changes: 18 additions & 0 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,22 @@ function channel(name) {
return channel;
}

function subscribe(name, subscription) {
const chan = channel(name);
channels[name].incRef();
chan.subscribe(subscription);
}

function unsubscribe(name, subscription) {
const chan = channel(name);
if (!chan.unsubscribe(subscription)) {
return false;
}

channels[name].decRef();
return true;
}

function hasSubscribers(name) {
let channel;
const ref = channels[name];
Expand All @@ -123,5 +139,7 @@ function hasSubscribers(name) {
module.exports = {
channel,
hasSubscribers,
subscribe,
unsubscribe,
Channel
};
44 changes: 44 additions & 0 deletions test/parallel/test-diagnostics-channel-pub-sub.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
'use strict';

const common = require('../common');
const dc = require('diagnostics_channel');
const assert = require('assert');
const { Channel } = dc;

const name = 'test';
const input = {
foo: 'bar'
};

// Individual channel objects can be created to avoid future lookups
const channel = dc.channel(name);
assert.ok(channel instanceof Channel);

// No subscribers yet, should not publish
assert.ok(!channel.hasSubscribers);

const subscriber = common.mustCall((message, name) => {
assert.strictEqual(name, channel.name);
assert.deepStrictEqual(message, input);
});

// Now there's a subscriber, should publish
dc.subscribe(name, subscriber);
assert.ok(channel.hasSubscribers);

// The ActiveChannel prototype swap should not fail instanceof
assert.ok(channel instanceof Channel);

// Should trigger the subscriber once
channel.publish(input);

// Should not publish after subscriber is unsubscribed
assert.ok(dc.unsubscribe(name, subscriber));
assert.ok(!channel.hasSubscribers);

// unsubscribe() should return false when subscriber is not found
assert.ok(!dc.unsubscribe(name, subscriber));

assert.throws(() => {
dc.subscribe(name, null);
}, { code: 'ERR_INVALID_ARG_TYPE' });