Skip to content

Commit c2c9f50

Browse files
authored
Support schema extraction in fastify response objects (#5894)
1 parent 020c925 commit c2c9f50

File tree

4 files changed

+184
-8
lines changed

4 files changed

+184
-8
lines changed

packages/datadog-instrumentations/src/fastify.js

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const handleChannel = channel('apm:fastify:request:handle')
88
const routeAddedChannel = channel('apm:fastify:route:added')
99
const bodyParserReadCh = channel('datadog:fastify:body-parser:finish')
1010
const queryParamsReadCh = channel('datadog:fastify:query-params:finish')
11+
const responsePayloadReadCh = channel('datadog:fastify:response:finish')
1112
const pathParamsReadCh = channel('datadog:fastify:path-params:finish')
1213

1314
const parsingResources = new WeakMap()
@@ -167,9 +168,12 @@ function preParsing (request, reply, payload, done) {
167168
}
168169

169170
function wrapSend (send, req) {
170-
return function sendWithTrace (error) {
171-
if (error instanceof Error) {
172-
errorChannel.publish({ req, error })
171+
return function sendWithTrace (payload) {
172+
if (payload instanceof Error) {
173+
errorChannel.publish({ req, error: payload })
174+
} else if (canPublishResponsePayload(payload)) {
175+
const res = getRes(this)
176+
responsePayloadReadCh.publish({ req, res, body: payload })
173177
}
174178

175179
return send.apply(this, arguments)
@@ -200,6 +204,18 @@ function onRoute (routeOptions) {
200204
routeAddedChannel.publish({ routeOptions, onRoute })
201205
}
202206

207+
// send() payload types: https://fastify.dev/docs/latest/Reference/Reply/#senddata
208+
function canPublishResponsePayload (payload) {
209+
return responsePayloadReadCh.hasSubscribers &&
210+
payload &&
211+
typeof payload === 'object' &&
212+
typeof payload.pipe !== 'function' && // Node streams
213+
typeof payload.body?.pipe !== 'function' && // Response with body stream
214+
!Buffer.isBuffer(payload) && // Buffer
215+
!(payload instanceof ArrayBuffer) && // ArrayBuffer
216+
!ArrayBuffer.isView(payload) // TypedArray
217+
}
218+
203219
addHook({ name: 'fastify', versions: ['>=3'] }, fastify => {
204220
const wrapped = shimmer.wrapFunction(fastify, fastify => wrapFastify(fastify, true))
205221

packages/dd-trace/src/appsec/channels.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ module.exports = {
1313
expressProcessParams: dc.channel('datadog:express:process_params:start'),
1414
expressSession: dc.channel('datadog:express-session:middleware:finish'),
1515
fastifyBodyParser: dc.channel('datadog:fastify:body-parser:finish'),
16+
fastifyResponseChannel: dc.channel('datadog:fastify:response:finish'),
1617
fastifyQueryParams: dc.channel('datadog:fastify:query-params:finish'),
1718
fastifyPathParams: dc.channel('datadog:fastify:path-params:finish'),
1819
fsOperationStart: dc.channel('apm:fs:operation:start'),

packages/dd-trace/src/appsec/index.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const {
2222
responseWriteHead,
2323
responseSetHeader,
2424
routerParam,
25+
fastifyResponseChannel,
2526
fastifyPathParams
2627
} = require('./channels')
2728
const waf = require('./waf')
@@ -85,6 +86,7 @@ function enable (_config) {
8586
fastifyPathParams.subscribe(onRequestProcessParams)
8687
routerParam.subscribe(onRequestProcessParams)
8788
responseBody.subscribe(onResponseBody)
89+
fastifyResponseChannel.subscribe(onResponseBody)
8890
responseWriteHead.subscribe(onResponseWriteHead)
8991
responseSetHeader.subscribe(onResponseSetHeader)
9092

@@ -378,6 +380,7 @@ function disable () {
378380
if (fastifyPathParams.hasSubscribers) fastifyPathParams.unsubscribe(onRequestProcessParams)
379381
if (routerParam.hasSubscribers) routerParam.unsubscribe(onRequestProcessParams)
380382
if (responseBody.hasSubscribers) responseBody.unsubscribe(onResponseBody)
383+
if (fastifyResponseChannel.hasSubscribers) fastifyResponseChannel.unsubscribe(onResponseBody)
381384
if (responseWriteHead.hasSubscribers) responseWriteHead.unsubscribe(onResponseWriteHead)
382385
if (responseSetHeader.hasSubscribers) responseSetHeader.unsubscribe(onResponseSetHeader)
383386
}

packages/dd-trace/test/appsec/index.fastify.plugin.spec.js

Lines changed: 161 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ const { assert } = require('chai')
55
const getPort = require('get-port')
66
const path = require('path')
77
const semver = require('semver')
8+
const zlib = require('zlib')
9+
const fs = require('node:fs')
810
const agent = require('../plugins/agent')
911
const appsec = require('../../src/appsec')
1012
const Config = require('../../src/config')
@@ -167,11 +169,12 @@ withVersions('fastify', 'fastify', version => {
167169
assert.deepEqual(e.response.data, JSON.parse(json))
168170
sinon.assert.notCalled(requestBody)
169171

170-
await agent.assertSomeTraces((traces) => {
171-
const span = traces[0][0]
172-
assert.equal(span.metrics['_dd.appsec.truncated.string_length'], 5000)
173-
assert.equal(span.metrics['_dd.appsec.truncated.container_size'], 300)
174-
assert.equal(span.metrics['_dd.appsec.truncated.container_depth'], 20)
172+
await agent.assertFirstTraceSpan({
173+
metrics: {
174+
'_dd.appsec.truncated.string_length': 5000,
175+
'_dd.appsec.truncated.container_size': 300,
176+
'_dd.appsec.truncated.container_depth': 20
177+
}
175178
})
176179
}
177180
})
@@ -446,6 +449,159 @@ withVersions('fastify', 'fastify', version => {
446449
})
447450
})
448451

452+
describe('Api Security - Fastify', () => {
453+
withVersions('fastify', 'fastify', version => {
454+
let config, server, axios
455+
456+
before(() => {
457+
return agent.load(['fastify', 'http'], { client: false })
458+
})
459+
460+
before((done) => {
461+
const fastify = require(`../../../../versions/fastify@${version}`).get()
462+
463+
const app = fastify()
464+
465+
app.post('/send', (request, reply) => {
466+
reply.send({ sendResKey: 'sendResValue' })
467+
})
468+
469+
app.post('/return', async (request, reply) => {
470+
return { returnResKey: 'returnResValue' }
471+
})
472+
473+
app.get('/', (request, reply) => {
474+
reply.send('DONE')
475+
})
476+
477+
app.get('/buffer', (request, reply) => {
478+
reply.send(Buffer.from('DONE'))
479+
})
480+
481+
app.get('/stream', (request, reply) => {
482+
const stream = fs.createReadStream(__filename)
483+
reply.header('Content-Type', 'application/octet-stream')
484+
reply.send(stream)
485+
})
486+
487+
app.get('/typedarray', (request, reply) => {
488+
reply.send(new Uint16Array(10))
489+
})
490+
491+
getPort().then((port) => {
492+
app.listen({ port }, () => {
493+
axios = Axios.create({ baseURL: `http://localhost:${port}` })
494+
done()
495+
})
496+
server = app.server
497+
})
498+
})
499+
500+
after(() => {
501+
server.close()
502+
return agent.close({ ritmReset: false })
503+
})
504+
505+
beforeEach(() => {
506+
config = new Config({
507+
appsec: {
508+
enabled: true,
509+
rules: path.join(__dirname, 'api_security_rules.json'),
510+
apiSecurity: {
511+
enabled: true,
512+
sampleDelay: 10
513+
}
514+
}
515+
})
516+
appsec.enable(config)
517+
})
518+
519+
afterEach(() => {
520+
appsec.disable()
521+
})
522+
523+
function formatSchema (body) {
524+
return zlib.gzipSync(JSON.stringify(body)).toString('base64')
525+
}
526+
527+
it('should get the response body schema with reply.send', async () => {
528+
const expectedResponseBodySchema = formatSchema([{ sendResKey: [8] }])
529+
const res = await axios.post('/send', { key: 'value' })
530+
531+
await agent.assertFirstTraceSpan({
532+
meta: {
533+
'_dd.appsec.s.res.body': expectedResponseBodySchema
534+
}
535+
})
536+
537+
assert.equal(res.status, 200)
538+
assert.deepEqual(res.data, { sendResKey: 'sendResValue' })
539+
})
540+
541+
it('should get the response body schema with return', async () => {
542+
const expectedResponseBodySchema = formatSchema([{ returnResKey: [8] }])
543+
const res = await axios.post('/return', { key: 'value' })
544+
545+
await agent.assertFirstTraceSpan({
546+
meta: {
547+
'_dd.appsec.s.res.body': expectedResponseBodySchema
548+
}
549+
})
550+
551+
assert.equal(res.status, 200)
552+
assert.deepEqual(res.data, { returnResKey: 'returnResValue' })
553+
})
554+
555+
it('should not get the schema for string', async () => {
556+
const res = await axios.get('/')
557+
558+
await agent.assertFirstTraceSpan(span => {
559+
assert.notProperty(span.meta, '_dd.appsec.s.res.body')
560+
})
561+
562+
assert.equal(res.status, 200)
563+
assert.equal(res.data, 'DONE')
564+
})
565+
566+
it('should not get the schema for Buffer', async () => {
567+
const res = await axios.get('/buffer')
568+
569+
await agent.assertFirstTraceSpan(span => {
570+
if (span.meta) {
571+
assert.notProperty(span.meta, '_dd.appsec.s.res.body')
572+
}
573+
})
574+
575+
assert.equal(res.status, 200)
576+
assert.equal(res.data, 'DONE')
577+
})
578+
579+
it('should not get the schema for stream', async () => {
580+
const res = await axios.get('/stream', { responseType: 'arraybuffer' })
581+
582+
await agent.assertFirstTraceSpan(span => {
583+
if (span.meta) {
584+
assert.notProperty(span.meta, '_dd.appsec.s.res.body')
585+
}
586+
})
587+
588+
assert.equal(res.status, 200)
589+
})
590+
591+
it('should not get the schema for TypedArray', async () => {
592+
const res = await axios.get('/typedarray', { responseType: 'arraybuffer' })
593+
594+
await agent.assertFirstTraceSpan(span => {
595+
if (span.meta) {
596+
assert.notProperty(span.meta, '_dd.appsec.s.res.body')
597+
}
598+
})
599+
600+
assert.equal(res.status, 200)
601+
})
602+
})
603+
})
604+
449605
const createNestedObject = (n, obj) => {
450606
if (n > 0) {
451607
return { a: createNestedObject(n - 1, obj) }

0 commit comments

Comments
 (0)