Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,11 @@ This message is published after the client has successfully connected to a serve
```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:open').subscribe(({ address, protocol, extensions }) => {
diagnosticsChannel.channel('undici:websocket:open').subscribe(({ address, protocol, extensions, websocket }) => {
console.log(address) // address, family, and port
console.log(protocol) // negotiated subprotocols
console.log(extensions) // negotiated extensions
console.log(websocket) // the WebSocket instance
})
```

Expand All @@ -184,7 +185,7 @@ This message is published after the connection has closed.
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:close').subscribe(({ websocket, code, reason }) => {
console.log(websocket) // the WebSocket object
console.log(websocket) // the WebSocket instance
console.log(code) // the closing status code
console.log(reason) // the closing reason
})
Expand All @@ -209,9 +210,10 @@ This message is published after the client receives a ping frame, if the connect
```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload }) => {
diagnosticsChannel.channel('undici:websocket:ping').subscribe(({ payload, websocket }) => {
// a Buffer or undefined, containing the optional application data of the frame
console.log(payload)
console.log(websocket) // the WebSocket instance
})
```

Expand All @@ -222,8 +224,9 @@ This message is published after the client receives a pong frame.
```js
import diagnosticsChannel from 'diagnostics_channel'

diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload, websocket }) => {
// a Buffer or undefined, containing the optional application data of the frame
console.log(payload)
console.log(websocket) // the WebSocket instance
})
```
9 changes: 0 additions & 9 deletions lib/web/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const { uid, states, sentCloseFrameState, emptyBuffer, opcodes } = require('./constants')
const { parseExtensions, isClosed, isClosing, isEstablished, validateCloseCodeAndReason } = require('./util')
const { channels } = require('../../core/diagnostics')
const { makeRequest } = require('../fetch/request')
const { fetching } = require('../fetch/index')
const { Headers, getHeadersList } = require('../fetch/headers')
Expand Down Expand Up @@ -200,14 +199,6 @@ function establishWebSocketConnection (url, protocols, client, handler, options)
response.socket.on('close', handler.onSocketClose)
response.socket.on('error', handler.onSocketError)

if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol: secProtocol,
extensions: secExtension
})
}

handler.wasEverConnected = true
handler.onConnectionEstablished(response, extensions)
}
Expand Down
14 changes: 2 additions & 12 deletions lib/web/websocket/receiver.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const { Writable } = require('node:stream')
const assert = require('node:assert')
const { parserStates, opcodes, states, emptyBuffer, sentCloseFrameState } = require('./constants')
const { channels } = require('../../core/diagnostics')
const {
isValidStatusCode,
isValidOpcode,
Expand Down Expand Up @@ -423,22 +422,13 @@ class ByteParser extends Writable {

this.#handler.socket.write(frame.createFrame(opcodes.PONG))

if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body
})
}
this.#handler.onPing(body)
}
} else if (opcode === opcodes.PONG) {
// A Pong frame MAY be sent unsolicited. This serves as a
// unidirectional heartbeat. A response to an unsolicited Pong frame is
// not expected.

if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body
})
}
this.#handler.onPong(body)
}

return true
Expand Down
2 changes: 2 additions & 0 deletions lib/web/websocket/stream/websocketstream.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class WebSocketStream {
this.#handler.socket.destroy()
},
onSocketClose: () => this.#onSocketClose(),
onPing: () => {},
onPong: () => {},

readyState: states.CONNECTING,
socket: null,
Expand Down
27 changes: 27 additions & 0 deletions lib/web/websocket/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const { channels } = require('../../core/diagnostics')
* @property {(chunk: Buffer) => void} onSocketData
* @property {(err: Error) => void} onSocketError
* @property {() => void} onSocketClose
* @property {(body: Buffer) => void} onPing
* @property {(body: Buffer) => void} onPong
*
* @property {number} readyState
* @property {import('stream').Duplex} socket
Expand Down Expand Up @@ -79,6 +81,22 @@ class WebSocket extends EventTarget {
this.#handler.socket.destroy()
},
onSocketClose: () => this.#onSocketClose(),
onPing: (body) => {
if (channels.ping.hasSubscribers) {
channels.ping.publish({
payload: body,
websocket: this
})
}
},
onPong: (body) => {
if (channels.pong.hasSubscribers) {
channels.pong.publish({
payload: body,
websocket: this
})
}
},

readyState: states.CONNECTING,
socket: null,
Expand Down Expand Up @@ -460,6 +478,15 @@ class WebSocket extends EventTarget {

// 4. Fire an event named open at the WebSocket object.
fireEvent('open', this)

if (channels.open.hasSubscribers) {
channels.open.publish({
address: response.socket.address(),
protocol: this.#protocol,
extensions: this.#extensions,
websocket: this
})
}
}

#onFail (code, reason, cause) {
Expand Down
42 changes: 42 additions & 0 deletions test/websocket/diagnostics-channel-open-close.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

const { test } = require('node:test')
const dc = require('node:diagnostics_channel')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const { tspl } = require('@matteo.collina/tspl')

test('diagnostics channel - undici:websocket:[open/close]', async (t) => {
const { equal, completed } = tspl(t, { plan: 6 })

const server = new WebSocketServer({ port: 0 })
const { port } = server.address()
const ws = new WebSocket(`ws://localhost:${port}`, 'chat')

server.on('connection', (ws) => {
ws.close(1000, 'goodbye')
})

const openListener = ({ extensions, protocol, websocket }) => {
equal(extensions, '')
equal(protocol, 'chat')
equal(websocket, ws)
}

const closeListener = ({ websocket, code, reason }) => {
equal(code, 1000)
equal(reason, 'goodbye')
equal(websocket, ws)
}

dc.channel('undici:websocket:open').subscribe(openListener)
dc.channel('undici:websocket:close').subscribe(closeListener)

t.after(() => {
server.close()
dc.channel('undici:websocket:open').unsubscribe(openListener)
dc.channel('undici:websocket:close').unsubscribe(closeListener)
})

await completed
})
42 changes: 42 additions & 0 deletions test/websocket/diagnostics-channel-ping-pong.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict'

const { test } = require('node:test')
const dc = require('node:diagnostics_channel')
const { WebSocketServer } = require('ws')
const { WebSocket } = require('../..')
const { tspl } = require('@matteo.collina/tspl')

test('diagnostics channel - undici:websocket:[ping/pong]', async (t) => {
const { deepStrictEqual, equal, completed } = tspl(t, { plan: 4 })

const server = new WebSocketServer({ port: 0 })
const { port } = server.address()
const ws = new WebSocket(`ws://localhost:${port}`, 'chat')

server.on('connection', (ws) => {
ws.ping('Ping')
ws.pong('Pong')
})

const pingListener = ({ websocket, payload }) => {
equal(websocket, ws)
deepStrictEqual(payload, Buffer.from('Ping'))
}

const pongListener = ({ websocket, payload }) => {
equal(websocket, ws)
deepStrictEqual(payload, Buffer.from('Pong'))
}

dc.channel('undici:websocket:ping').subscribe(pingListener)
dc.channel('undici:websocket:pong').subscribe(pongListener)

t.after(() => {
server.close()
ws.close()
dc.channel('undici:websocket:ping').unsubscribe(pingListener)
dc.channel('undici:websocket:pong').unsubscribe(pongListener)
})

await completed
})
64 changes: 0 additions & 64 deletions test/websocket/diagnostics-channel.js

This file was deleted.

Loading