Skip to content

Commit

Permalink
Adding diagnostics channels to Fetch
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx committed Jun 12, 2024
1 parent 78a0c24 commit 56c36f7
Show file tree
Hide file tree
Showing 5 changed files with 277 additions and 2 deletions.
57 changes: 57 additions & 0 deletions docs/docs/api/DiagnosticsChannel.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,60 @@ diagnosticsChannel.channel('undici:websocket:pong').subscribe(({ payload }) => {
console.log(payload)
})
```
The below channels collectively act as [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args) on `fetch`. So all of them will publish the arguments passed to `fetch`.

## `tracing:undici:fetch:start`

This message is published when `fetch` is called, and will publish the arguments passed to `fetch`.

```js
import diagnosticsChannel from 'diagnostics_channel'
diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ req, input, init, }) => {
console.log('input', input)
console.log('init', init)
})
```

## `tracing:undici:fetch:end`

This message is published at the end of `fetch`'s execution, and will publish any `error` from the synchronous part of `fetch`. Since `fetch` is asynchronous, this should be empty. This channel will publish the same values as `undici:fetch:start`, but we are including it to track when `fetch` finishes execution and to be consistent with [`TracingChannel`](https://nodejs.org/api/diagnostics_channel.html#class-tracingchannel).
```js
import diagnosticsChannel from 'diagnostics_channel'
diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ req, input, init, error }) => {
console.log('input', input)
console.log('init', init)
console.log('error', error) // should be empty
})
```
## `tracing:undici:fetch:asyncStart`
This message is published after `fetch` resolves or rejects. If `fetch` resolves, it publishes the response in `result`. If it rejects, it publishes the error in `error`.
```js
import diagnosticsChannel from 'diagnostics_channel'
diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ req, input, init, result, error }) => {
console.log('input', input)
console.log('init', init)
console.log('response', result)
console.log('error', error)
})
```
## `tracing:undici:fetch:asyncEnd`
This channel gets published the same values as and at the same time as `tracing:undici:fetch:asyncStart` in the case of [`tracingChannel.tracePromise`](https://nodejs.org/api/diagnostics_channel.html#tracingchanneltracepromisefn-context-thisarg-args)
```js
import diagnosticsChannel from 'diagnostics_channel'
diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(({ req, input, init, result, error }) => {
console.log('input', input)
console.log('init', init)
console.log('response', result)
console.log('error', error)
})
```
## `tracing:undici:fetch:error`
This message is published when an error is thrown or promise rejects while calling `fetch`.
```js
import diagnosticsChannel from 'diagnostics_channel'
diagnosticsChannel.channel('tracing:undici:fetch:error').subscribe(({ req, input, init, error }) => {
console.log('input', input)
console.log('init', init)
console.log('error', error)
})
```
79 changes: 78 additions & 1 deletion lib/core/diagnostics.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ const undiciDebugLog = util.debuglog('undici')
const fetchDebuglog = util.debuglog('fetch')
const websocketDebuglog = util.debuglog('websocket')
let isClientSet = false
let tracingChannel

if (diagnosticsChannel.tracingChannel) {
tracingChannel = diagnosticsChannel.tracingChannel('undici:fetch')
}

const channels = {
// Client
beforeConnect: diagnosticsChannel.channel('undici:client:beforeConnect'),
Expand All @@ -23,7 +29,9 @@ const channels = {
close: diagnosticsChannel.channel('undici:websocket:close'),
socketError: diagnosticsChannel.channel('undici:websocket:socket_error'),
ping: diagnosticsChannel.channel('undici:websocket:ping'),
pong: diagnosticsChannel.channel('undici:websocket:pong')
pong: diagnosticsChannel.channel('undici:websocket:pong'),
// Fetch channels
tracingChannel
}

if (undiciDebugLog.enabled || fetchDebuglog.enabled) {
Expand Down Expand Up @@ -114,6 +122,75 @@ if (undiciDebugLog.enabled || fetchDebuglog.enabled) {
isClientSet = true
}

// Track fetch requests
if (fetchDebuglog.enabled && diagnosticsChannel.tracingChannel) {
const debuglog = fetchDebuglog

tracingChannel.start.subscribe(evt => {
const {
input
} = evt
debuglog(
'fetch has started request to %s',
input
)
})

tracingChannel.end.subscribe(evt => {
const {
input
} = evt
debuglog(
'fetch has received response from %s',
input
)
})

tracingChannel.asyncStart.subscribe(evt => {
const {
input,
result,
error
} = evt
if (result && error) {
debuglog(
'fetch has received response for %s - HTTP %d, error is %s',
input,
result.status,
error.message
)
} else if (result) {
debuglog(
'fetch has received response for %s - HTTP %d',
input,
result.status
)
} else if (error) {
debuglog(
'fetch has errored for %s - %s',
input,
error.message
)
} else {
debuglog(
'fetch has started request to %s',
input
)
}
})

tracingChannel.error.subscribe(evt => {
const {
error
} = evt
debuglog(
'fetch error event received response %s',
error.message
)
})
isClientSet = true
}

if (websocketDebuglog.enabled) {
if (!isClientSet) {
const debuglog = undiciDebugLog.enabled ? undiciDebugLog : websocketDebuglog
Expand Down
32 changes: 32 additions & 0 deletions lib/web/fetch/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esb
? 'node'
: 'undici'

const channels = require('../../core/diagnostics.js').channels.tracingChannel
/** @type {import('buffer').resolveObjectURL} */
let resolveObjectURL

Expand Down Expand Up @@ -124,6 +125,16 @@ function handleFetchDone (response) {
finalizeAndReportTiming(response, 'fetch')
}

// subscribersCheck will be called at the beginning of the fetch call
// and will check if we have subscribers
function subscribersCheck () {
return channels && (channels.start.hasSubscribers ||
channels.end.hasSubscribers ||
channels.asyncStart.hasSubscribers ||
channels.asyncEnd.hasSubscribers ||
channels.error.hasSubscribers)
}

// https://fetch.spec.whatwg.org/#fetch-method
function fetch (input, init = undefined) {
webidl.argumentLengthCheck(arguments, 1, 'globalThis.fetch')
Expand All @@ -143,6 +154,27 @@ function fetch (input, init = undefined) {
return p.promise
}

// This will publish all diagnostic events only when we have subscribers.
if (subscribersCheck()) {
const context = { req: requestObject, input, init, result: null, error: null }
const { resolve, reject } = p
p.resolve = function (result) {
context.result = result
channels.asyncStart.publish(context)
channels.asyncEnd.publish(context)
resolve(result)
}
p.reject = function (error) {
context.error = error
channels.error.publish(context)
channels.asyncStart.publish(context)
channels.asyncEnd.publish(context)
reject(error)
}
channels.start.publish(context)
queueMicrotask(() => { channels.end.publish(context) })
}

// 3. Let request be requestObject’s request.
const request = requestObject[kState]

Expand Down
5 changes: 4 additions & 1 deletion test/node-test/debug.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ test('debug#websocket', { skip: !process.versions.icu }, async t => {
})

test('debug#fetch', async t => {
const assert = tspl(t, { plan: 7 })
const assert = tspl(t, { plan: 10 })
const child = spawn(
process.execPath,
[join(__dirname, '../fixtures/fetch.js')],
Expand All @@ -55,11 +55,14 @@ test('debug#fetch', async t => {
)
const chunks = []
const assertions = [
/(FETCH [0-9]+:) (fetch has started)/,
/(FETCH [0-9]+:) (connecting to)/,
/(FETCH [0-9]+:) (fetch has received)/,
/(FETCH [0-9]+:) (connected to)/,
/(FETCH [0-9]+:) (sending request)/,
/(FETCH [0-9]+:) (received response)/,
/(FETCH [0-9]+:) (trailers received)/,
/(FETCH [0-9]+:) (fetch has received)/,
/^$/
]

Expand Down
106 changes: 106 additions & 0 deletions test/node-test/diagnostics-channel/fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict'

const { tspl } = require('@matteo.collina/tspl')
const { describe, test, before, after } = require('node:test')
const { fetch } = require('../../..')

let diagnosticsChannel
let skip = false
try {
diagnosticsChannel = require('node:diagnostics_channel')
} catch {
skip = true
}

const { createServer } = require('http')

describe('diagnosticsChannel for fetch', { skip }, () => {
let server
before(() => {
server = createServer((req, res) => {
res.setHeader('Content-Type', 'text/plain')
res.setHeader('trailer', 'foo')
res.write('hello')
res.addTrailers({
foo: 'oof'
})
res.end()
})
})

after(() => { server.close() })

test('fetch', async t => {
t = tspl(t, { plan: 17 })

let startCalled = 0
diagnosticsChannel.channel('tracing:undici:fetch:start').subscribe(({ req, input, init, result, error }) => {
startCalled += 1
if (input.redirect) {
t.strictEqual(input, 'badrequest')
t.deepStrictEqual(init, { redirect: 'error' })
} else {
t.strictEqual(input, `http://localhost:${server.address().port}`)
t.deepStrictEqual(init, undefined)
}
})

let endCalled = 0
diagnosticsChannel.channel('tracing:undici:fetch:end').subscribe(({ req, input, init, result, error }) => {
endCalled += 1
if (init && init.redirect) {
t.strictEqual(input, 'badrequest')
t.deepStrictEqual(init, { redirect: 'error' })
} else {
t.strictEqual(input, `http://localhost:${server.address().port}`)
t.deepStrictEqual(init, undefined)
}
t.strictEqual(result, null)
})

let asyncStartCalled = 0
diagnosticsChannel.channel('tracing:undici:fetch:asyncStart').subscribe(({ req, input, init, result, error }) => {
asyncStartCalled += 1
if (init && init.redirect) {
t.strictEqual(input, 'badrequest')
t.deepStrictEqual(init, { redirect: 'error' })
} else {
t.strictEqual(input, `http://localhost:${server.address().port}`)
t.deepStrictEqual(init, undefined)
t.ok(result)
}
})

let asyncEndCalled = 0
diagnosticsChannel.channel('tracing:undici:fetch:asyncEnd').subscribe(async ({ req, input, init, result, error }) => {
asyncEndCalled += 1
if (init && init.redirect) {
t.strictEqual(input, 'badrequest')
t.deepStrictEqual(init, { redirect: 'error' })
t.strictEqual(result, null)
t.ok(error)
t.strictEqual(error.cause.code, 'ERR_INVALID_URL')
} else {
t.strictEqual(input, `http://localhost:${server.address().port}`)
t.deepStrictEqual(init, undefined)
t.ok(result)
t.strictEqual(result.status, 200)
t.strictEqual(error, null)
}
})

server.listen(0, async () => {
await fetch(`http://localhost:${server.address().port}`)
try {
await fetch('badrequest', { redirect: 'error' })
} catch (e) { }
server.close()
t.strictEqual(startCalled, 1)
t.strictEqual(endCalled, 1)
t.strictEqual(asyncStartCalled, 1)
t.strictEqual(asyncEndCalled, 1)
})

await t.completed
})
})

0 comments on commit 56c36f7

Please sign in to comment.