Skip to content

Commit f2b167d

Browse files
authored
tracing: add support for pg cursors / streams (#5680)
* add support for pg cursors / streams
1 parent daf4277 commit f2b167d

File tree

4 files changed

+99
-6
lines changed

4 files changed

+99
-6
lines changed

packages/datadog-instrumentations/src/pg.js

+19-5
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ const errorCh = channel('apm:pg:query:error')
1414
const startPoolQueryCh = channel('datadog:pg:pool:query:start')
1515
const finishPoolQueryCh = channel('datadog:pg:pool:query:finish')
1616

17+
const { errorMonitor } = require('node:events')
18+
1719
addHook({ name: 'pg', versions: ['>=8.0.3'] }, pg => {
1820
shimmer.wrap(pg.Client.prototype, 'query', query => wrapQuery(query))
1921
shimmer.wrap(pg.Pool.prototype, 'query', query => wrapPoolQuery(query))
@@ -39,13 +41,15 @@ function wrapQuery (query) {
3941
? arguments[0]
4042
: { text: arguments[0] }
4143

42-
const textProp = Object.getOwnPropertyDescriptor(pgQuery, 'text')
44+
const textPropObj = pgQuery.cursor ?? pgQuery
45+
const textProp = Object.getOwnPropertyDescriptor(textPropObj, 'text')
46+
const stream = typeof textPropObj.read === 'function'
4347

4448
// Only alter `text` property if safe to do so.
4549
if (!textProp || textProp.configurable) {
46-
const originalText = pgQuery.text
50+
const originalText = textPropObj.text
4751

48-
Object.defineProperty(pgQuery, 'text', {
52+
Object.defineProperty(textPropObj, 'text', {
4953
get () {
5054
return this?.__ddInjectableQuery || originalText
5155
}
@@ -57,9 +61,10 @@ function wrapQuery (query) {
5761

5862
startCh.publish({
5963
params: this.connectionParameters,
60-
query: pgQuery,
64+
query: textPropObj,
6165
processId,
62-
abortController
66+
abortController,
67+
stream
6368
})
6469

6570
const finish = asyncResource.bind(function (error, res) {
@@ -130,6 +135,15 @@ function wrapQuery (query) {
130135
newQuery.then((res) => finish(null, res), finish)
131136
}
132137

138+
if (stream) {
139+
newQuery.on('end', () => {
140+
finish(null, [])
141+
})
142+
newQuery.on(errorMonitor, (err) => {
143+
finish(err)
144+
})
145+
}
146+
133147
try {
134148
return retval
135149
} catch (err) {

packages/datadog-plugin-pg/src/index.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class PGPlugin extends DatabasePlugin {
88
static get operation () { return 'query' }
99
static get system () { return 'postgres' }
1010

11-
start ({ params = {}, query, processId }) {
11+
start ({ params = {}, query, processId, stream }) {
1212
const service = this.serviceName({ pluginConfig: this.config, params })
1313
const originalStatement = this.maybeTruncate(query.text)
1414

@@ -27,6 +27,10 @@ class PGPlugin extends DatabasePlugin {
2727
}
2828
})
2929

30+
if (stream) {
31+
span.setTag('db.stream', 1)
32+
}
33+
3034
query.__ddInjectableQuery = this.injectDbmQuery(span, query.text, service, !!query.name)
3135
}
3236
}

packages/datadog-plugin-pg/test/index.spec.js

+67
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,73 @@ describe('Plugin', () => {
216216
.catch(done),
217217
rawExpectedSchema.outbound
218218
)
219+
220+
if (implementation !== 'pg.native') {
221+
// pg-cursor is not supported on pg.native, pg-query-stream uses pg-cursor so it is also unsupported
222+
describe('streaming capabilities', () => {
223+
withVersions('pg', 'pg-cursor', pgCursorVersion => {
224+
let Cursor
225+
226+
beforeEach(() => {
227+
Cursor = require(`../../../versions/pg-cursor@${pgCursorVersion}`).get()
228+
})
229+
230+
it('should instrument cursor-based streaming with pg-cursor', async () => {
231+
const tracingPromise = agent.use(traces => {
232+
expect(traces[0][0]).to.have.property('name', expectedSchema.outbound.opName)
233+
expect(traces[0][0]).to.have.property('service', expectedSchema.outbound.serviceName)
234+
expect(traces[0][0]).to.have.property('resource', 'SELECT * FROM generate_series(0, 1) num')
235+
expect(traces[0][0]).to.have.property('type', 'sql')
236+
expect(traces[0][0].meta).to.have.property('span.kind', 'client')
237+
expect(traces[0][0].meta).to.have.property('db.name', 'postgres')
238+
expect(traces[0][0].meta).to.have.property('db.type', 'postgres')
239+
expect(traces[0][0].meta).to.have.property('component', 'pg')
240+
expect(traces[0][0].metrics).to.have.property('db.stream', 1)
241+
expect(traces[0][0].metrics).to.have.property('network.destination.port', 5432)
242+
})
243+
244+
const cursor = client.query(new Cursor('SELECT * FROM generate_series(0, 1) num'))
245+
246+
cursor.read(1, () => {
247+
cursor.close()
248+
})
249+
await tracingPromise
250+
})
251+
})
252+
253+
withVersions('pg', 'pg-query-stream', pgQueryStreamVersion => {
254+
let QueryStream
255+
256+
beforeEach(() => {
257+
QueryStream = require(`../../../versions/pg-query-stream@${pgQueryStreamVersion}`).get()
258+
})
259+
260+
it('should instrument stream-based queries with pg-query-stream', async () => {
261+
const agentPromise = agent.use(traces => {
262+
expect(traces[0][0]).to.have.property('name', expectedSchema.outbound.opName)
263+
expect(traces[0][0]).to.have.property('service', expectedSchema.outbound.serviceName)
264+
expect(traces[0][0]).to.have.property('resource', 'SELECT * FROM generate_series(0, 1) num')
265+
expect(traces[0][0]).to.have.property('type', 'sql')
266+
expect(traces[0][0].meta).to.have.property('span.kind', 'client')
267+
expect(traces[0][0].meta).to.have.property('db.name', 'postgres')
268+
expect(traces[0][0].meta).to.have.property('db.type', 'postgres')
269+
expect(traces[0][0].meta).to.have.property('component', 'pg')
270+
expect(traces[0][0].metrics).to.have.property('db.stream', 1)
271+
expect(traces[0][0].metrics).to.have.property('network.destination.port', 5432)
272+
})
273+
274+
const query = new QueryStream('SELECT * FROM generate_series(0, 1) num', [])
275+
const stream = client.query(query)
276+
277+
for await (const row of stream) {
278+
expect(row).to.have.property('num')
279+
}
280+
281+
await agentPromise
282+
})
283+
})
284+
})
285+
}
219286
})
220287
})
221288

packages/dd-trace/test/plugins/externals.json

+8
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,14 @@
431431
{
432432
"name": "express",
433433
"versions": [">=4"]
434+
},
435+
{
436+
"name": "pg-query-stream",
437+
"versions": [">=4"]
438+
},
439+
{
440+
"name": "pg-cursor",
441+
"versions": [">=2"]
434442
}
435443
],
436444
"pino": [

0 commit comments

Comments
 (0)