Skip to content

Commit 3c1ed86

Browse files
Stephen Belangertargos
Stephen Belanger
authored andcommitted
lib: improved diagnostics_channel subscribe/unsubscribe
Adds a new top-level subscribe/unsubscribe which will ref/unref the channel WeakReference to prevent subscriptions from getting garbage collected. PR-URL: #42714 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Chengzhong Wu <legendecas@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com> Reviewed-By: Vladimir de Turckheim <vlad2t@hotmail.com>
1 parent 44291af commit 3c1ed86

File tree

3 files changed

+150
-7
lines changed

3 files changed

+150
-7
lines changed

doc/api/diagnostics_channel.md

+88-7
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ import diagnostics_channel from 'node:diagnostics_channel';
4343
// Get a reusable channel object
4444
const channel = diagnostics_channel.channel('my-channel');
4545

46-
// Subscribe to the channel
47-
channel.subscribe((message, name) => {
46+
function onMessage(message, name) {
4847
// Received data
49-
});
48+
}
49+
50+
// Subscribe to the channel
51+
diagnostics_channel.subscribe('my-channel', onMessage);
5052

5153
// Check if the channel has an active subscriber
5254
if (channel.hasSubscribers) {
@@ -55,6 +57,9 @@ if (channel.hasSubscribers) {
5557
some: 'data'
5658
});
5759
}
60+
61+
// Unsubscribe from the channel
62+
diagnostics_channel.unsubscribe('my-channel', onMessage);
5863
```
5964

6065
```cjs
@@ -63,10 +68,12 @@ const diagnostics_channel = require('node:diagnostics_channel');
6368
// Get a reusable channel object
6469
const channel = diagnostics_channel.channel('my-channel');
6570

66-
// Subscribe to the channel
67-
channel.subscribe((message, name) => {
71+
function onMessage(message, name) {
6872
// Received data
69-
});
73+
}
74+
75+
// Subscribe to the channel
76+
diagnostics_channel.subscribe('my-channel', onMessage);
7077

7178
// Check if the channel has an active subscriber
7279
if (channel.hasSubscribers) {
@@ -75,6 +82,9 @@ if (channel.hasSubscribers) {
7582
some: 'data'
7683
});
7784
}
85+
86+
// Unsubscribe from the channel
87+
diagnostics_channel.unsubscribe('my-channel', onMessage);
7888
```
7989

8090
#### `diagnostics_channel.hasSubscribers(name)`
@@ -121,7 +131,7 @@ added:
121131
* `name` {string|symbol} The channel name
122132
* Returns: {Channel} The named channel object
123133

124-
This is the primary entry-point for anyone wanting to interact with a named
134+
This is the primary entry-point for anyone wanting to publish to a named
125135
channel. It produces a channel object which is optimized to reduce overhead at
126136
publish time as much as possible.
127137

@@ -137,6 +147,76 @@ const diagnostics_channel = require('node:diagnostics_channel');
137147
const channel = diagnostics_channel.channel('my-channel');
138148
```
139149

150+
#### `diagnostics_channel.subscribe(name, onMessage)`
151+
152+
<!-- YAML
153+
added:
154+
- REPLACEME
155+
-->
156+
157+
* `name` {string|symbol} The channel name
158+
* `onMessage` {Function} The handler to receive channel messages
159+
* `message` {any} The message data
160+
* `name` {string|symbol} The name of the channel
161+
162+
Register a message handler to subscribe to this channel. This message handler
163+
will be run synchronously whenever a message is published to the channel. Any
164+
errors thrown in the message handler will trigger an [`'uncaughtException'`][].
165+
166+
```mjs
167+
import diagnostics_channel from 'diagnostics_channel';
168+
169+
diagnostics_channel.subscribe('my-channel', (message, name) => {
170+
// Received data
171+
});
172+
```
173+
174+
```cjs
175+
const diagnostics_channel = require('diagnostics_channel');
176+
177+
diagnostics_channel.subscribe('my-channel', (message, name) => {
178+
// Received data
179+
});
180+
```
181+
182+
#### `diagnostics_channel.unsubscribe(name, onMessage)`
183+
184+
<!-- YAML
185+
added:
186+
- REPLACEME
187+
-->
188+
189+
* `name` {string|symbol} The channel name
190+
* `onMessage` {Function} The previous subscribed handler to remove
191+
* Returns: {boolean} `true` if the handler was found, `false` otherwise.
192+
193+
Remove a message handler previously registered to this channel with
194+
[`diagnostics_channel.subscribe(name, onMessage)`][].
195+
196+
```mjs
197+
import diagnostics_channel from 'diagnostics_channel';
198+
199+
function onMessage(message, name) {
200+
// Received data
201+
}
202+
203+
diagnostics_channel.subscribe('my-channel', onMessage);
204+
205+
diagnostics_channel.unsubscribe('my-channel', onMessage);
206+
```
207+
208+
```cjs
209+
const diagnostics_channel = require('diagnostics_channel');
210+
211+
function onMessage(message, name) {
212+
// Received data
213+
}
214+
215+
diagnostics_channel.subscribe('my-channel', onMessage);
216+
217+
diagnostics_channel.unsubscribe('my-channel', onMessage);
218+
```
219+
140220
### Class: `Channel`
141221

142222
<!-- YAML
@@ -341,4 +421,5 @@ Emitted when server sends a response.
341421

342422
[`'uncaughtException'`]: process.md#event-uncaughtexception
343423
[`channel.subscribe(onMessage)`]: #channelsubscribeonmessage
424+
[`diagnostics_channel.subscribe(name, onMessage)`]: #diagnostics_channelunsubscribename_onmessage
344425
[`diagnostics_channel.channel(name)`]: #diagnostics_channelchannelname

lib/diagnostics_channel.js

+18
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,22 @@ function channel(name) {
109109
return channel;
110110
}
111111

112+
function subscribe(name, subscription) {
113+
const chan = channel(name);
114+
channels[name].incRef();
115+
chan.subscribe(subscription);
116+
}
117+
118+
function unsubscribe(name, subscription) {
119+
const chan = channel(name);
120+
if (!chan.unsubscribe(subscription)) {
121+
return false;
122+
}
123+
124+
channels[name].decRef();
125+
return true;
126+
}
127+
112128
function hasSubscribers(name) {
113129
let channel;
114130
const ref = channels[name];
@@ -123,5 +139,7 @@ function hasSubscribers(name) {
123139
module.exports = {
124140
channel,
125141
hasSubscribers,
142+
subscribe,
143+
unsubscribe,
126144
Channel
127145
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const dc = require('diagnostics_channel');
5+
const assert = require('assert');
6+
const { Channel } = dc;
7+
8+
const name = 'test';
9+
const input = {
10+
foo: 'bar'
11+
};
12+
13+
// Individual channel objects can be created to avoid future lookups
14+
const channel = dc.channel(name);
15+
assert.ok(channel instanceof Channel);
16+
17+
// No subscribers yet, should not publish
18+
assert.ok(!channel.hasSubscribers);
19+
20+
const subscriber = common.mustCall((message, name) => {
21+
assert.strictEqual(name, channel.name);
22+
assert.deepStrictEqual(message, input);
23+
});
24+
25+
// Now there's a subscriber, should publish
26+
dc.subscribe(name, subscriber);
27+
assert.ok(channel.hasSubscribers);
28+
29+
// The ActiveChannel prototype swap should not fail instanceof
30+
assert.ok(channel instanceof Channel);
31+
32+
// Should trigger the subscriber once
33+
channel.publish(input);
34+
35+
// Should not publish after subscriber is unsubscribed
36+
assert.ok(dc.unsubscribe(name, subscriber));
37+
assert.ok(!channel.hasSubscribers);
38+
39+
// unsubscribe() should return false when subscriber is not found
40+
assert.ok(!dc.unsubscribe(name, subscriber));
41+
42+
assert.throws(() => {
43+
dc.subscribe(name, null);
44+
}, { code: 'ERR_INVALID_ARG_TYPE' });

0 commit comments

Comments
 (0)