Skip to content

test: clean up usage of agent.assertTelemetryReceived #5883

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jun 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions integration-tests/helpers/fake-agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const bodyParser = require('body-parser')
const msgpack = require('@msgpack/msgpack')
const upload = require('multer')()

const noop = () => {}

module.exports = class FakeAgent extends EventEmitter {
constructor (port = 0) {
// Redirect rejections to the error event
Expand Down Expand Up @@ -161,8 +163,32 @@ module.exports = class FakeAgent extends EventEmitter {
return resultPromise
}

assertTelemetryReceived (fn, timeout, requestType, expectedMessageCount = 1) {
timeout = timeout || 30000
/**
* Assert that a telemetry message is received.
*
* @overload
* @param {string} requestType - The request type to assert.
* @param {number} [timeout=30_000] - The timeout in milliseconds.
* @param {number} [expectedMessageCount=1] - The number of messages to expect.
* @returns {Promise<void>} A promise that resolves when the telemetry message of type `requestType` is received.
*
* @overload
* @param {Function} fn - The function to call with the telemetry message of type `requestType`.
* @param {string} requestType - The request type to assert.
* @param {number} [timeout=30_000] - The timeout in milliseconds.
* @param {number} [expectedMessageCount=1] - The number of messages to expect.
* @returns {Promise<void>} A promise that resolves when the telemetry message of type `requestType` is received and
* the function `fn` has finished running. If `fn` throws an error, the promise will be rejected once `timeout`
* is reached.
*/
assertTelemetryReceived (fn, requestType, timeout = 30_000, expectedMessageCount = 1) {
if (typeof fn !== 'function') {
expectedMessageCount = timeout
timeout = requestType
requestType = fn
fn = noop
}

let resultResolve
let resultReject
let msgCount = 0
Expand Down
25 changes: 25 additions & 0 deletions integration-tests/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,31 @@ function setShouldKill (value) {
}

const assertObjectContains = assert.partialDeepStrictEqual || function assertObjectContains (actual, expected) {
if (Array.isArray(expected)) {
assert.ok(Array.isArray(actual), `Expected array but got ${typeof actual}`)
let startIndex = 0
for (const expectedItem of expected) {
let found = false
for (let i = startIndex; i < actual.length; i++) {
const actualItem = actual[i]
try {
if (expectedItem !== null && typeof expectedItem === 'object') {
assertObjectContains(actualItem, expectedItem)
} else {
assert.strictEqual(actualItem, expectedItem)
}
startIndex = i + 1
found = true
break
} catch {
continue
}
}
assert.ok(found, `Expected array to contain ${JSON.stringify(expectedItem)}`)
}
return
}

for (const [key, val] of Object.entries(expected)) {
if (val !== null && typeof val === 'object') {
assert.ok(Object.hasOwn(actual, key))
Expand Down
36 changes: 19 additions & 17 deletions integration-tests/opentelemetry.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ const { join } = require('path')
const { assert } = require('chai')
const axios = require('axios')

function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
async function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
const messageReceiver = isMetrics
? agent.assertTelemetryReceived(onMessage, timeout, 'generate-metrics')
? agent.assertTelemetryReceived(onMessage, 'generate-metrics', timeout)
: agent.assertMessageReceived(onMessage, timeout)

return Promise.all([
const [res] = await Promise.all([
messageReceiver,
new Promise((resolve, reject) => {
const timer = setTimeout(() => {
Expand All @@ -30,7 +30,9 @@ function check (agent, proc, timeout, onMessage = () => { }, isMetrics) {
}
})
})
]).then(([res]) => res)
])

return res
}

function allEqual (spans, fn) {
Expand Down Expand Up @@ -76,7 +78,7 @@ describe('opentelemetry', () => {
await sandbox.remove()
})

it("should not capture telemetry DD and OTEL vars don't conflict", () => {
it("should not capture telemetry DD and OTEL vars don't conflict", async () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
Expand All @@ -94,7 +96,7 @@ describe('opentelemetry', () => {
}
})

return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
Expand All @@ -108,7 +110,7 @@ describe('opentelemetry', () => {
}, true)
})

it('should capture telemetry if both DD and OTEL env vars are set', () => {
it('should capture telemetry if both DD and OTEL env vars are set', async () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
Expand Down Expand Up @@ -136,7 +138,7 @@ describe('opentelemetry', () => {
}
})

return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
Expand Down Expand Up @@ -188,7 +190,7 @@ describe('opentelemetry', () => {
}, true)
})

it('should capture telemetry when OTEL env vars are invalid', () => {
it('should capture telemetry when OTEL env vars are invalid', async () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
Expand All @@ -209,7 +211,7 @@ describe('opentelemetry', () => {
}
})

return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
Expand Down Expand Up @@ -274,7 +276,7 @@ describe('opentelemetry', () => {
DD_TRACE_AGENT_PORT: agent.port
}
})
return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
// Should have a single trace with a single span
assert.strictEqual(payload.length, 1)
const [trace] = payload
Expand All @@ -286,7 +288,7 @@ describe('opentelemetry', () => {
})
})

it('should capture telemetry', () => {
it('should capture telemetry', async () => {
proc = fork(join(cwd, 'opentelemetry/basic.js'), {
cwd,
env: {
Expand All @@ -297,7 +299,7 @@ describe('opentelemetry', () => {
}
})

return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
Expand Down Expand Up @@ -342,7 +344,7 @@ describe('opentelemetry', () => {
await new Promise(resolve => setTimeout(resolve, 1000)) // Adjust the delay as necessary
await axios.get(`http://localhost:${SERVER_PORT}/first-endpoint`)

return check(agent, proc, 10000, ({ payload }) => {
await check(agent, proc, 10000, ({ payload }) => {
assert.strictEqual(payload.request_type, 'generate-metrics')

const metrics = payload.payload
Expand Down Expand Up @@ -379,7 +381,7 @@ describe('opentelemetry', () => {
DD_TRACE_AGENT_PORT: agent.port
}
})
return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
// Should have three spans
const [trace] = payload
assert.strictEqual(trace.length, 3)
Expand Down Expand Up @@ -414,7 +416,7 @@ describe('opentelemetry', () => {
await new Promise(resolve => setTimeout(resolve, 1000)) // Adjust the delay as necessary
await axios.get(`http://localhost:${SERVER_PORT}/first-endpoint`)

return check(agent, proc, 10000, ({ payload }) => {
await check(agent, proc, 10000, ({ payload }) => {
assert.strictEqual(payload.length, 2)
// combine the traces
const trace = payload.flat()
Expand Down Expand Up @@ -457,7 +459,7 @@ describe('opentelemetry', () => {
DD_TRACE_AGENT_PORT: agent.port
}
})
return check(agent, proc, timeout, ({ payload }) => {
await check(agent, proc, timeout, ({ payload }) => {
// Should have a single trace with a single span
assert.strictEqual(payload.length, 1)
const [trace] = payload
Expand Down
27 changes: 14 additions & 13 deletions integration-tests/profiler/profiler.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -539,21 +539,21 @@ describe('profiler', () => {
proc.kill()
})

it('records profile on process exit', () => {
it('records profile on process exit', async () => {
proc = fork(profilerTestFile, {
cwd,
env: {
DD_TRACE_AGENT_PORT: agent.port,
DD_PROFILING_ENABLED: 1
}
})
const checkTelemetry = agent.assertTelemetryReceived(_ => {}, 1000, 'generate-metrics')
const checkTelemetry = agent.assertTelemetryReceived('generate-metrics', 1000)
// SSI telemetry is not supposed to have been emitted when DD_INJECTION_ENABLED is absent,
// so expect telemetry callback to time out
return Promise.all([checkProfiles(agent, proc, timeout), expectTimeout(checkTelemetry)])
await Promise.all([checkProfiles(agent, proc, timeout), expectTimeout(checkTelemetry)])
})

it('records SSI telemetry on process exit', () => {
it('records SSI telemetry on process exit', async () => {
proc = fork(profilerTestFile, {
cwd,
env: {
Expand Down Expand Up @@ -588,8 +588,9 @@ describe('profiler', () => {
assert.equal(series[1].type, 'count')
checkTags(series[1].tags)
assert.equal(series[1].points[0][1], 1)
}, timeout, 'generate-metrics')
return Promise.all([checkProfiles(agent, proc, timeout), checkTelemetry])
}, 'generate-metrics', timeout)

await Promise.all([checkProfiles(agent, proc, timeout), checkTelemetry])
})

if (process.platform !== 'win32') { // PROF-8905
Expand Down Expand Up @@ -706,7 +707,7 @@ describe('profiler', () => {
await agent.stop()
})

it('sends profiler API telemetry', () => {
it('sends profiler API telemetry', async () => {
proc = fork(profilerTestFile, {
cwd,
env: {
Expand Down Expand Up @@ -739,7 +740,7 @@ describe('profiler', () => {

// Same number of requests and responses
assert.equal(series[1].points[0][1], requestCount)
}, timeout, 'generate-metrics')
}, 'generate-metrics', timeout)

const checkDistributions = agent.assertTelemetryReceived(({ _, payload }) => {
const pp = payload.payload
Expand All @@ -752,12 +753,12 @@ describe('profiler', () => {
// Same number of points
pointsCount = series[0].points.length
assert.equal(pointsCount, series[1].points.length)
}, timeout, 'distributions')
}, 'distributions', timeout)

return Promise.all([checkProfiles(agent, proc, timeout), checkMetrics, checkDistributions]).then(() => {
// Same number of requests and points
assert.equal(requestCount, pointsCount)
})
await Promise.all([checkProfiles(agent, proc, timeout), checkMetrics, checkDistributions])

// Same number of requests and points
assert.equal(requestCount, pointsCount)
})
})

Expand Down
37 changes: 12 additions & 25 deletions integration-tests/telemetry.spec.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
'use strict'

const { createSandbox, FakeAgent, spawnProc } = require('./helpers')
const assert = require('assert')
const { createSandbox, FakeAgent, spawnProc, assertObjectContains } = require('./helpers')
const path = require('path')

describe('telemetry', () => {
Expand Down Expand Up @@ -38,11 +37,11 @@ describe('telemetry', () => {
await agent.stop()
})

it('Test that tracer and iitm are sent as dependencies', (done) => {
it('Test that tracer and iitm are sent as dependencies', async () => {
let ddTraceFound = false
let importInTheMiddleFound = false

agent.assertTelemetryReceived(msg => {
await agent.assertTelemetryReceived(msg => {
const { payload } = msg

if (payload.request_type === 'app-dependencies-loaded') {
Expand All @@ -55,35 +54,23 @@ describe('telemetry', () => {
importInTheMiddleFound = true
}
})
if (ddTraceFound && importInTheMiddleFound) {
done()
}
}
}
}, null, 'app-dependencies-loaded', 1)
})
}, 'app-dependencies-loaded', 5_000, 1)

it('Assert configuration chaining data is sent', (done) => {
agent.assertTelemetryReceived(msg => {
if (msg.payload.request_type !== 'app-started') return
expect(ddTraceFound).to.be.true
expect(importInTheMiddleFound).to.be.true
})

it('Assert configuration chaining data is sent', async () => {
await agent.assertTelemetryReceived(msg => {
const { configuration } = msg.payload.payload

const expectedConfigs = [
assertObjectContains(configuration, [
{ name: 'DD_LOG_INJECTION', value: false, origin: 'default' },
{ name: 'DD_LOG_INJECTION', value: true, origin: 'env_var' },
{ name: 'DD_LOG_INJECTION', value: false, origin: 'code' }
]
expectedConfigs.forEach(expected => {
const found = configuration.find(config =>
config.name === expected.name &&
config.origin === expected.origin &&
config.value === expected.value
)
assert.ok(found, `Expected to find config: ${JSON.stringify(expected)}`)
})
done()
}, null, 'app-started', 1)
])
}, 'app-started', 5_000, 1)
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ describe('IAST - code_injection - integration', () => {
})
assert.isNotNull(instrumentedSink)
}
}, 30_000, 'generate-metrics', 2)
}, 'generate-metrics', 30_000, 2)

const checkMessages = agent.assertMessageReceived(({ headers, payload }) => {
assert.strictEqual(payload[0][0].metrics['_dd.iast.enabled'], 1)
Expand All @@ -67,11 +67,9 @@ describe('IAST - code_injection - integration', () => {
assert.isTrue(vulnerabilities.has('CODE_INJECTION'))
})

return Promise.all([checkMessages, checkTelemetry]).then(() => {
assert.equal(iastTelemetryReceived, true)
await Promise.all([checkMessages, checkTelemetry])

return true
})
assert.equal(iastTelemetryReceived, true)
}

describe('SourceTextModule', () => {
Expand Down
Loading