diff --git a/.github/workflows/collector.yml b/.github/workflows/collector.yml new file mode 100644 index 00000000000..460900cb67b --- /dev/null +++ b/.github/workflows/collector.yml @@ -0,0 +1,40 @@ +name: Collector + +on: + pull_request: + push: + branches: [master] + schedule: + - cron: '0 4 * * *' + +concurrency: + group: ${{ github.workflow }}-${{ github.ref || github.run_id }} + cancel-in-progress: true + +# TODO: upstream jobs + +jobs: + express: + runs-on: ubuntu-latest + env: + PLUGINS: express|body-parser|cookie-parser + NODE_OPTIONS: -r ./libdatadog-nodejs/global + steps: + - uses: actions/checkout@v4 + - uses: actions/checkout@v4 + with: + - repository: https://github.com/DataDog/libdatadog-nodejs.git + - path: libdatadog-nodejs + - run: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh + - run: npm run build + working-directory: ./libdatadog-nodejs + - uses: ./.github/actions/testagent/start + - uses: ./.github/actions/node/setup + - run: yarn install + - uses: ./.github/actions/node/oldest + - run: yarn test:plugins:ci + - uses: ./.github/actions/node/latest + - run: yarn test:plugins:ci + - if: always() + uses: ./.github/actions/testagent/logs + - uses: codecov/codecov-action@v3 diff --git a/integration-tests/helpers.js b/integration-tests/helpers.js index 37838e774e4..974c8302d78 100644 --- a/integration-tests/helpers.js +++ b/integration-tests/helpers.js @@ -37,6 +37,40 @@ class FakeAgent extends EventEmitter { payload: msgpack.decode(req.body, { codec }) }) }) + app.put('/v0.5/traces', (req, res) => { + if (req.body.length === 0) return res.status(200).send() + res.status(200).send({ rate_by_service: { 'service:,env:': 1 } }) + const [strings, traces] = msgpack.decode(req.body, { codec }) + this.emit('message', { + headers: req.headers, + payload: traces.map(trace => { + return trace.map(span => { + const meta = {} + const metrics = {} + + Object.keys(span[9]).forEach(key => { meta[strings[key]] = strings[span[9][key]] }) + Object.keys(span[10]).forEach(key => { metrics[strings[key]] = span[10][key] }) + + span = { + service: strings[span[0]], + name: strings[span[1]], + resource: strings[span[2]], + trace_id: span[3], + span_id: span[4], + parent_id: span[5], + start: span[6], + duration: span[7], + error: span[8], + meta, + metrics, + type: strings[span[11]] + } + + return span + }) + }) + }) + }) app.post('/profiling/v1/input', upload.any(), (req, res) => { res.status(200).send() this.emit('message', { diff --git a/packages/datadog-plugin-express/test/index.spec.js b/packages/datadog-plugin-express/test/index.spec.js index 4dbc1776a61..6f4129eac98 100644 --- a/packages/datadog-plugin-express/test/index.spec.js +++ b/packages/datadog-plugin-express/test/index.spec.js @@ -1471,13 +1471,13 @@ describe('Plugin', () => { }) describe('with configuration for middleware disabled', () => { - before(() => { + beforeEach(() => { return agent.load(['express', 'http'], [{ middleware: false }, { client: false }]) }) - after(() => { + afterEach(() => { return agent.close({ ritmReset: false }) }) diff --git a/packages/dd-trace/src/collector/encoder.js b/packages/dd-trace/src/collector/encoder.js new file mode 100644 index 00000000000..f2d74ecc843 --- /dev/null +++ b/packages/dd-trace/src/collector/encoder.js @@ -0,0 +1,578 @@ +'use strict' + +const collector = globalThis.__dd_collector +const Chunk = require('../../../../packages/dd-trace/src/encode/chunk') +const { zeroId } = require('../id') +const { format } = require('url') +const tracerVersion = require('../../../../package.json').version + +const SOFT_LIMIT = 8 * 1024 * 1024 // 8MB + +const { DD_TRACE_COLLECTOR_DEBUG } = process.env + +const eventTypes = { + PROCESS_INFO: 129, + START_SEGMENT: 130, + START_SPAN: 131, + FINISH_SPAN: 132, + ADD_TAGS: 133, + SAMPLING_PRIORITY: 135, + EXCEPTION: 136, + ADD_LINKS: 137, + ERROR: 138, + FINISH_SEGMENT: 139, + CONFIG: 140, + DISCARD_SEGMENT: 141 +} + +const float64Array = new Float64Array(1) +const uInt8Float64Array = new Uint8Array(float64Array.buffer) + +float64Array[0] = -1 + +const bigEndian = uInt8Float64Array[7] === 0 + +class Encoder { + constructor (config) { + const { url, hostname, port, limit = SOFT_LIMIT, flushInterval } = config + + this._flushInterval = flushInterval + this._limit = limit + this._metadataBytes = new Chunk(1024) + this._eventBytes = new Chunk() + this._segmentBytes = new Chunk(64 * 1024) + this._stringBytes = new Chunk() + this._reset() + + this.setUrl(url || new URL(format({ + protocol: 'http:', + hostname: hostname || 'localhost', + port + }))) + + process.once('beforeExit', () => this.flush()) + } + + count () { + return this._eventCount + } + + setUrl (url) { + const host = new URL(url).origin // TODO: Rename and cleanup. + + this.encodeConfig({ host }) + } + + encodeConfig (options) { + const bytes = this._eventBytes + + this._encodeUnsigned(bytes, eventTypes.CONFIG) + this._encodeMap(bytes, options) + } + + encodeProcessInfo () { + const bytes = this._eventBytes + + this._encodeUnsigned(bytes, eventTypes.PROCESS_INFO) + this._encodeMap(bytes, { + tracer_version: tracerVersion, + language: 'nodejs', + language_interpreter: process.jsEngine || 'v8', + language_version: process.version + }) + } + + encodeSegmentStart (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.START_SEGMENT, event) + + this._encodeUnsigned(bytes, eventTypes.START_SEGMENT) + this._encodeFixArray(bytes, 4) + this._encodeTime(bytes, event.time) + this._encodeId(bytes, event.traceId) + this._encodeSegmentId(bytes, event.segmentId) + this._encodeId(bytes, event.parentId) + + this._afterEncode() + } + + encodeSegmentDiscard (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.START_SEGMENT, event) + + this._encodeUnsigned(bytes, eventTypes.DISCARD_SEGMENT) + this._encodeFixArray(bytes, 1) + this._encodeSegmentId(bytes, event.segmentId) + + this._afterEncode() + } + + encodeSpanStart (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.START_SPAN, event) + + this._encodeUnsigned(bytes, eventTypes.START_SPAN) + this._encodeFixArray(bytes, 10) + this._encodeTime(bytes, event.ticks) + this._encodeSegmentId(bytes, event.segmentId) + this._encodeId(bytes, event.spanId) + this._encodeUnsigned(bytes, event.parentIndex) + this._encodeString(bytes, event.service) + this._encodeString(bytes, event.name) + this._encodeString(bytes, event.resource) + this._encodeMeta(bytes, event.meta) + this._encodeMetrics(bytes, event.metrics) + this._encodeString(bytes, event.type) + + this._afterEncode() + } + + encodeSpanFinish (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.FINISH_SPAN, event) + + this._encodeUnsigned(bytes, eventTypes.FINISH_SPAN) + this._encodeFixArray(bytes, 3) + this._encodeTime(bytes, event.ticks) + this._encodeSegmentId(bytes, event.segmentId) + this._encodeUnsigned(bytes, event.spanIndex) + + this._afterEncode() + } + + encodeAddTags (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.ADD_TAGS, event) + + this._encodeUnsigned(bytes, eventTypes.ADD_TAGS) + this._encodeFixArray(bytes, 4) + this._encodeSegmentId(bytes, event.segmentId) + this._encodeUnsigned(bytes, event.spanIndex) + this._encodeMeta(bytes, event.meta) + this._encodeMetrics(bytes, event.metrics) + + this._afterEncode() + } + + encodeException (event) { + const bytes = this._eventBytes + + this._beforeEncode(eventTypes.EXCEPTION, event) + + this._encodeUnsigned(bytes, eventTypes.EXCEPTION) + this._encodeFixArray(bytes, 5) + this._encodeSegmentId(bytes, event.segmentId) + this._encodeUnsigned(bytes, event.spanIndex) + this._encodeString(bytes, event.error.message || '') + this._encodeString(bytes, event.error.name || '') + this._encodeString(bytes, event.error.stack || '') + + this._afterEncode() + } + + makePayload () { + const stringSize = this._stringBytes.length + 6 + const segmentSize = this._segmentBytes.length + 6 + const eventSize = this._eventBytes.length + const buffer = Buffer.allocUnsafe(stringSize + segmentSize + eventSize) + + let offset = 0 + + offset = this._writeStrings(buffer, offset) + offset = this._writeSegments(buffer, offset) + this._writeEvents(buffer, offset) + + this._reset() + + return buffer + } + + flush (done = () => {}) { + try { + const data = this.makePayload() + + this._timer = clearTimeout(this._timer) + + collector.send_events(data) + + done() + } catch (e) { + done(e) + } + } + + reset () { + this._reset() + } + + _beforeEncode (type, event) { + if (DD_TRACE_COLLECTOR_DEBUG === 'true') { + const name = Object.keys(eventTypes).find(key => eventTypes[key] === type) + + console.log(name, type, JSON.stringify(event)) // eslint-disable-line no-console + + this._eventOffset = this._eventBytes.length + this._stringOffset = this._stringBytes.length + this._segmentOffset = this._segmentBytes.length + } + } + + _afterEncode () { + if (DD_TRACE_COLLECTOR_DEBUG === 'true') { + this._debugEncode('string', this._stringBytes, this._stringOffset) + this._debugEncode('segment', this._segmentBytes, this._segmentOffset) + this._debugEncode('event', this._eventBytes, this._eventOffset) + } + + this._eventCount++ + + this._maybeFlush() + } + + _debugEncode (name, bytes, lastOffset) { + const start = lastOffset + const end = bytes.length + + if (start === end) return + + const hex = bytes.buffer + .subarray(start, end).toString('hex').match(/../g).join(' ') + + console.log(`Encoded ${name}: ${hex}`) // eslint-disable-line no-console + } + + _maybeFlush () { + // we can go over the soft limit since the agent has a 50MB hard limit + if (this._eventBytes.length > this._limit || this._stringBytes.length > this._limit) { + this.flush() + } else if (!this._timer) { + this._timer = setTimeout(() => this.flush(), this._flushInterval).unref() + } + } + + _reset () { + this._metadataBytes.length = 0 + this._eventCount = 0 + this._eventBytes.length = 0 + this._segmentCount = 0 + this._segmentBytes.length = 0 + this._segmentMap = new Map([[0, 0]]) + this._stringCount = 0 + this._stringBytes.length = 0 + this._stringMap = { + '': 0 + } + + this.encodeProcessInfo() + } + + _encodeFixArray (bytes, size = 0) { + const offset = bytes.length + + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = 0x90 + size + } + + _encodeArrayPrefix (bytes, value) { + const length = value.length + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xdd + bytes.buffer[offset + 1] = length >> 24 + bytes.buffer[offset + 2] = length >> 16 + bytes.buffer[offset + 3] = length >> 8 + bytes.buffer[offset + 4] = length + } + + _encodeFixMap (bytes, size = 0) { + const offset = bytes.length + + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = 0x80 + size + } + + _encodeMapPrefix (bytes, keysLength) { + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + bytes.buffer[offset] = 0xdf + bytes.buffer[offset + 1] = keysLength >> 24 + bytes.buffer[offset + 2] = keysLength >> 16 + bytes.buffer[offset + 3] = keysLength >> 8 + bytes.buffer[offset + 4] = keysLength + } + + _encodeByte (bytes, value) { + bytes.reserve(1) + + bytes.buffer[bytes.length++] = value + } + + _encodeId (bytes, id) { + const offset = bytes.length + + if (!id || id === zeroId) { + bytes.reserve(2) + bytes.length += 2 + + bytes.buffer[offset] = 0xc4 + bytes.buffer[offset + 1] = 0 + } else { + const bufferLength = id._buffer.length + const byteLength = 2 + bufferLength + + bytes.reserve(byteLength) + bytes.length += byteLength + + bytes.buffer[offset] = 0xc4 + bytes.buffer[offset + 1] = bufferLength + + for (let i = 0; i < bufferLength; i++) { + bytes.buffer[offset + 2 + i] = id._buffer[i] + } + } + } + + _encodeInteger (bytes, value) { + const offset = bytes.length + + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xce + bytes.buffer[offset + 1] = value >> 24 + bytes.buffer[offset + 2] = value >> 16 + bytes.buffer[offset + 3] = value >> 8 + bytes.buffer[offset + 4] = value + } + + _encodeShort (bytes, value) { + const offset = bytes.length + + bytes.reserve(3) + bytes.length += 3 + + bytes.buffer[offset] = 0xcd + bytes.buffer[offset + 1] = value >> 8 + bytes.buffer[offset + 2] = value + } + + _encodeLong (bytes, value) { + const offset = bytes.length + const hi = (value / Math.pow(2, 32)) >> 0 + const lo = value >>> 0 + + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcf + bytes.buffer[offset + 1] = hi >> 24 + bytes.buffer[offset + 2] = hi >> 16 + bytes.buffer[offset + 3] = hi >> 8 + bytes.buffer[offset + 4] = hi + bytes.buffer[offset + 5] = lo >> 24 + bytes.buffer[offset + 6] = lo >> 16 + bytes.buffer[offset + 7] = lo >> 8 + bytes.buffer[offset + 8] = lo + } + + _encodeUnsigned (bytes, value) { + const offset = bytes.length + + if (value <= 0x7f) { + bytes.reserve(1) + bytes.length += 1 + + bytes.buffer[offset] = value + } else if (value <= 0xff) { + bytes.reserve(2) + bytes.length += 2 + + bytes.buffer[offset] = 0xcc + bytes.buffer[offset + 1] = value + } else if (value <= 0xffff) { + bytes.reserve(3) + bytes.length += 3 + + bytes.buffer[offset] = 0xcd + bytes.buffer[offset + 1] = value >> 8 + bytes.buffer[offset + 2] = value + } else if (value <= 0xffffffff) { + bytes.reserve(5) + bytes.length += 5 + + bytes.buffer[offset] = 0xce + bytes.buffer[offset + 1] = value >> 24 + bytes.buffer[offset + 2] = value >> 16 + bytes.buffer[offset + 3] = value >> 8 + bytes.buffer[offset + 4] = value + } else { + const hi = (value / Math.pow(2, 32)) >> 0 + const lo = value >>> 0 + + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcf + bytes.buffer[offset + 1] = hi >> 24 + bytes.buffer[offset + 2] = hi >> 16 + bytes.buffer[offset + 3] = hi >> 8 + bytes.buffer[offset + 4] = hi + bytes.buffer[offset + 5] = lo >> 24 + bytes.buffer[offset + 6] = lo >> 16 + bytes.buffer[offset + 7] = lo >> 8 + bytes.buffer[offset + 8] = lo + } + } + + _encodeTime (bytes, value) { + this._encodeUnsigned(bytes, Math.floor(value * 1e6)) + } + + _encodeMeta (bytes, value = {}) { + const keys = Object.keys(value) + const validKeys = keys.filter(key => typeof value[key] === 'string') + + this._encodeMapPrefix(bytes, validKeys.length) + + for (const key of validKeys) { + this._encodeString(bytes, key) + this._encodeString(bytes, value[key]) + } + } + + _encodeMetrics (bytes, value = {}) { + const keys = Object.keys(value) + const validKeys = keys.filter(key => typeof value[key] === 'number' && !isNaN(value[key])) + + this._encodeMapPrefix(bytes, validKeys.length) + + for (const key of validKeys) { + this._encodeString(bytes, key) + this._encodeFloat(bytes, value[key]) + } + } + + _encodeMap (bytes, value) { + const keys = Object.keys(value) + const validKeys = keys.filter(key => typeof value[key] === 'string' || typeof value[key] === 'number') + + this._encodeMapPrefix(bytes, validKeys.length) + + for (const key of validKeys) { + this._encodeRawString(bytes, key) + this._encodeValue(bytes, value[key]) + } + } + + _encodeValue (bytes, value) { + switch (typeof value) { + case 'string': + this._encodeRawString(bytes, value) + break + case 'number': + this._encodeFloat(bytes, value) + break + default: + // should not happen + } + } + + _encodeFixString (bytes, value = '') { + this._cacheString(value) + this._encodeUnsigned(bytes, this._stringMap[value]) + } + + _encodeString (bytes, value = '') { + this._cacheString(value) + this._encodeUnsigned(bytes, this._stringMap[value]) + } + + _encodeSegmentId (bytes, value = 0) { + this._cacheSegmentId(value) + this._encodeUnsigned(bytes, this._segmentMap.get(value)) + } + + // TODO: Use an extension for string table instead and make this the default. + _encodeRawString (bytes, value = '') { + bytes.write(value) + } + + _encodeFloat (bytes, value) { + float64Array[0] = value + + const offset = bytes.length + bytes.reserve(9) + bytes.length += 9 + + bytes.buffer[offset] = 0xcb + + if (bigEndian) { + for (let i = 0; i <= 7; i++) { + bytes.buffer[offset + i + 1] = uInt8Float64Array[i] + } + } else { + for (let i = 7; i >= 0; i--) { + bytes.buffer[bytes.length - i - 1] = uInt8Float64Array[i] + } + } + } + + _cacheString (value) { + if (!(value in this._stringMap)) { + this._stringMap[value] = ++this._stringCount + this._stringBytes.write(value) + } + } + + _cacheSegmentId (value) { + if (!this._segmentMap.has(value)) { + this._segmentMap.set(value, ++this._segmentCount) + this._encodeUnsigned(this._segmentBytes, value) + } + } + + _writeArrayPrefix (buffer, offset, count) { + buffer[offset++] = 0xdd + buffer.writeUInt32BE(count, offset) + + return offset + 4 + } + + _writeSegments (buffer, offset) { + buffer[offset++] = 0xfe + offset = this._writeArrayPrefix(buffer, offset, this._segmentCount) + offset += this._segmentBytes.buffer.copy(buffer, offset, 0, this._segmentBytes.length) + + return offset + } + + _writeStrings (buffer, offset) { + buffer[offset++] = 0xff + offset = this._writeArrayPrefix(buffer, offset, this._stringCount) + offset += this._stringBytes.buffer.copy(buffer, offset, 0, this._stringBytes.length) + + return offset + } + + _writeEvents (buffer, offset = 0) { + offset += this._eventBytes.buffer.copy(buffer, offset, 0, this._eventBytes.length) + + return offset + } +} + +module.exports = { Encoder } diff --git a/packages/dd-trace/src/collector/exporter.js b/packages/dd-trace/src/collector/exporter.js new file mode 100644 index 00000000000..0e2bfded128 --- /dev/null +++ b/packages/dd-trace/src/collector/exporter.js @@ -0,0 +1,40 @@ +'use strict' + +const { channel } = require('dc-polyfill') +const { Encoder } = require('./encoder') + +const segmentStartChannel = channel('datadog:tracing:segment:start') +const segmentDiscardChannel = channel('datadog:tracing:segment:discard') +const spanStartChannel = channel('datadog:tracing:span:start') +const spanFinishChannel = channel('datadog:tracing:span:finish') +const tagsChannel = channel('datadog:tracing:span:tags') +const errorChannel = channel('datadog:tracing:span:error') + +class CollectorExporter { + constructor (config) { + const encoder = this._encoder = new Encoder(config) + + this._handlers = new Map([ + [segmentStartChannel, encoder.encodeSegmentStart.bind(encoder)], + [segmentDiscardChannel, encoder.encodeSegmentDiscard.bind(encoder)], + [spanStartChannel, encoder.encodeSpanStart.bind(encoder)], + [spanFinishChannel, encoder.encodeSpanFinish.bind(encoder)], + [tagsChannel, encoder.encodeAddTags.bind(encoder)], + [errorChannel, encoder.encodeException.bind(encoder)] + ]) + } + + start () { + this._handlers.forEach((cb, ch) => ch.subscribe(cb)) + } + + stop () { + this._handlers.forEach((cb, ch) => ch.unsubscribe(cb)) + } + + setUrl (url) { + this._encoder.setUrl(url) + } +} + +module.exports = { CollectorExporter } diff --git a/packages/dd-trace/src/collector/span.js b/packages/dd-trace/src/collector/span.js new file mode 100644 index 00000000000..6ccf2702800 --- /dev/null +++ b/packages/dd-trace/src/collector/span.js @@ -0,0 +1,116 @@ +'use strict' + +const { channel } = require('dc-polyfill') +const DatadogSpan = require('../opentracing/span') +const DatadogCollectorSpanContext = require('./span_context') + +const now = performance.now.bind(performance) + +const startSegmentChannel = channel('datadog:tracing:segment:start') +const segmentDiscardChannel = channel('datadog:tracing:segment:discard') +const startChannel = channel('datadog:tracing:span:start') +const tagsChannel = channel('datadog:tracing:span:tags') +const errorChannel = channel('datadog:tracing:span:error') +const finishChannel = channel('datadog:tracing:span:finish') + +let segmentId = 0 + +class DatadogCollectorSpan extends DatadogSpan { + finish (finishTime) { + if (this._spanContext._isFinished) return + + this._spanContext._isFinished = true + this._spanContext._trace.active-- + + const trace = this._spanContext._trace + + // TODO: Emit a discard event from tracer. For now we just short-circuit. + if (trace.isDiscarded) return + if (trace.isRecording === false) { + trace.isDiscarded = true + return segmentDiscardChannel.publish({ segmentId }) + } + + const ticks = finishTime + ? now() + finishTime - trace.ticks - trace.startTime + : now() - trace.ticks + + finishChannel.publish({ + ticks, + segmentId: trace.segmentId, + spanIndex: this._spanContext._spanIndex + }) + } + + _start (_tracer, _processor, _prioritySampler, fields, _debug) { + const parent = fields.parent || null + const tags = fields.tags || {} + const spanContext = this._spanContext + const trace = spanContext._trace + + this._trackSegment() + + spanContext._spanIndex = trace.lastIndex++ + + if (fields.parent) { + spanContext._parentIndex = parent._spanIndex >= 0 + ? parent._spanIndex + 1 + : 0 + } + + startChannel.publish({ + ticks: now() - spanContext._trace.ticks, + segmentId: spanContext._trace.segmentId, + spanId: spanContext._spanId, + parentIndex: spanContext._parentIndex, + type: tags['span.type'], + name: fields.operationName, + resource: tags['resource.name'], + service: tags['service.name'], + meta: tags, + metrics: tags + }) + } + + _addTags (keyValuePairs = {}) { + tagsChannel.publish({ + segmentId: this._spanContext._trace.segmentId, + spanIndex: this._spanContext._spanIndex, + meta: keyValuePairs, + metrics: keyValuePairs + }) + + if (keyValuePairs.error) { + errorChannel.publish({ + segmentId: this._spanContext._trace.segmentId, + spanIndex: this._spanContext._spanIndex, + error: keyValuePairs.error + }) + } + } + + _trackSegment () { + const spanContext = this._spanContext + const trace = spanContext._trace + + if (trace.active > 0) { + trace.active++ + } else { + const traceId = spanContext._traceId + const parentId = spanContext._parentId + const time = trace.startTime + + trace.active = 1 + trace.lastIndex = 0 + trace.segmentId = ++segmentId + + startSegmentChannel.publish({ parentId, segmentId, time, traceId }) + } + } + + _initContext (props) { + return new DatadogCollectorSpanContext(props) + } +} + +module.exports = DatadogCollectorSpan diff --git a/packages/dd-trace/src/collector/span_context.js b/packages/dd-trace/src/collector/span_context.js new file mode 100644 index 00000000000..228f9cfab41 --- /dev/null +++ b/packages/dd-trace/src/collector/span_context.js @@ -0,0 +1,7 @@ +'use strict' + +const DatadogSpanContext = require('../opentracing/span_context') + +class DatadogCollectorSpanContext extends DatadogSpanContext {} + +module.exports = DatadogCollectorSpanContext diff --git a/packages/dd-trace/src/collector/tracer.js b/packages/dd-trace/src/collector/tracer.js new file mode 100644 index 00000000000..153f7caf083 --- /dev/null +++ b/packages/dd-trace/src/collector/tracer.js @@ -0,0 +1,24 @@ +'use strict' + +const DatadogTracer = require('../opentracing/tracer') +const DatadogCollectorSpan = require('./span') +const { CollectorExporter } = require('../collector/exporter') + +class DatadogCollectorTracer extends DatadogTracer { + constructor (config) { + super(config) + + this._collector = new CollectorExporter(config) + this._collector.start() + } + + _initSpan (...args) { + return new DatadogCollectorSpan(...args) + } + + _setUrl (url) { + this._collector.setUrl(url) + } +} + +module.exports = DatadogCollectorTracer diff --git a/packages/dd-trace/src/format.js b/packages/dd-trace/src/format.js index 6d7c85ce039..431d6e3a27d 100644 --- a/packages/dd-trace/src/format.js +++ b/packages/dd-trace/src/format.js @@ -113,6 +113,13 @@ function extractTags (trace, span) { case 'resource.name': addTag(trace, {}, map[tag], tags[tag]) break + case 'http.route': { + const resource = [tags['http.method'], tags['http.route']].filter(v => v).join(' ') + if (!tags['resource.name']) { + addTag(trace, {}, 'resource', resource) + } + break + } // HACK: remove when Datadog supports numeric status code case 'http.status_code': addTag(trace.meta, {}, tag, tags[tag] && String(tags[tag])) diff --git a/packages/dd-trace/src/id.js b/packages/dd-trace/src/id.js index 9f437f1fa1a..373e6412eea 100644 --- a/packages/dd-trace/src/id.js +++ b/packages/dd-trace/src/id.js @@ -181,3 +181,4 @@ function writeUInt32BE (buffer, value, offset) { } module.exports = (value, radix) => new Identifier(value, radix) +module.exports.zeroId = zeroId diff --git a/packages/dd-trace/src/opentracing/span.js b/packages/dd-trace/src/opentracing/span.js index 6cb9cb77b1b..27dd718c6af 100644 --- a/packages/dd-trace/src/opentracing/span.js +++ b/packages/dd-trace/src/opentracing/span.js @@ -57,15 +57,11 @@ class DatadogSpan { constructor (tracer, processor, prioritySampler, fields, debug) { const operationName = fields.operationName const parent = fields.parent || null - const tags = Object.assign({}, fields.tags) - const hostname = fields.hostname this._parentTracer = tracer this._debug = debug - this._processor = processor this._prioritySampler = prioritySampler this._store = storage.getStore() - this._duration = undefined // For internal use only. You probably want `context()._name`. // This name property is not updated when the span name changes. @@ -76,13 +72,8 @@ class DatadogSpan { getIntegrationCounter('spans_created', this._integrationName).inc() this._spanContext = this._createContext(parent, fields) - this._spanContext._name = operationName - this._spanContext._tags = tags - this._spanContext._hostname = hostname - - this._spanContext._trace.started.push(this) - this._startTime = fields.startTime || this._getTime() + this._start(tracer, processor, prioritySampler, fields, debug) this._links = [] fields.links && fields.links.forEach(link => this.addLink(link.context, link.attributes)) @@ -198,6 +189,19 @@ class DatadogSpan { this._processor.process(this) } + _start (_tracer, processor, _prioritySampler, fields) { + const operationName = fields.operationName + const tags = Object.assign({}, fields.tags) + const hostname = fields.hostname + + this._processor = processor + this._duration = undefined + this._spanContext._name = operationName + this._spanContext._tags = tags + this._spanContext._hostname = hostname + this._spanContext._trace.started.push(this) + } + _sanitizeAttributes (attributes = {}) { const sanitizedAttributes = {} diff --git a/packages/dd-trace/src/opentracing/tracer.js b/packages/dd-trace/src/opentracing/tracer.js index 13e6b9c1500..886e9f9cd14 100644 --- a/packages/dd-trace/src/opentracing/tracer.js +++ b/packages/dd-trace/src/opentracing/tracer.js @@ -54,7 +54,7 @@ class DatadogTracer { 'service.name': this._service } - const span = new Span(this, this._processor, this._prioritySampler, { + const span = this._initSpan(this, this._processor, this._prioritySampler, { operationName: options.operationName || name, parent, tags, @@ -94,6 +94,14 @@ class DatadogTracer { return null } } + + _initSpan (...args) { + return new Span(...args) + } + + _setUrl (url) { + this._exporter.setUrl(url) + } } function getContext (spanContext) { diff --git a/packages/dd-trace/src/plugins/util/web.js b/packages/dd-trace/src/plugins/util/web.js index c9cdf1990aa..6dffc1d69a8 100644 --- a/packages/dd-trace/src/plugins/util/web.js +++ b/packages/dd-trace/src/plugins/util/web.js @@ -63,7 +63,10 @@ const web = { if (!span) return span.context()._name = `${name}.request` - span.context()._tags.component = name + span.addTags({ + 'span.name': `${name}.request`, + component: name + }) web.setConfig(req, config) }, @@ -462,16 +465,17 @@ function addResponseTags (context) { } function addResourceTag (context) { - const { req, span } = context - const tags = span.context()._tags + // const { req, span } = context + // const tags = span.context()._tags - if (tags['resource.name']) return + // // if (tags['resource.name'] || !tags[HTTP_ROUTE]) return + // if (tags['resource.name']) return - const resource = [req.method, tags[HTTP_ROUTE]] - .filter(val => val) - .join(' ') + // const resource = [req.method, tags[HTTP_ROUTE]] + // .filter(val => val) + // .join(' ') - span.setTag(RESOURCE_NAME, resource) + // span.setTag(RESOURCE_NAME, resource) } function addHeaders (context) { diff --git a/packages/dd-trace/src/tracer.js b/packages/dd-trace/src/tracer.js index 63c60e81440..05aae81ff9b 100644 --- a/packages/dd-trace/src/tracer.js +++ b/packages/dd-trace/src/tracer.js @@ -1,6 +1,6 @@ 'use strict' -const Tracer = require('./opentracing/tracer') +const Tracer = getTracerClass() const tags = require('../../../ext/tags') const Scope = require('./scope') const { storage } = require('../../datadog-core') @@ -134,7 +134,7 @@ class DatadogTracer extends Tracer { } setUrl (url) { - this._exporter.setUrl(url) + this._setUrl(url) this._dataStreamsProcessor.setUrl(url) } @@ -177,4 +177,10 @@ function addTags (span, options) { span.addTags(tags) } +function getTracerClass () { + return globalThis.__dd_collector + ? require('./collector/tracer') + : require('./opentracing/tracer') +} + module.exports = DatadogTracer diff --git a/packages/dd-trace/test/plugins/agent.js b/packages/dd-trace/test/plugins/agent.js index 8b64a7b74da..c596eb208a5 100644 --- a/packages/dd-trace/test/plugins/agent.js +++ b/packages/dd-trace/test/plugins/agent.js @@ -247,7 +247,43 @@ module.exports = { }) agent.put('/v0.5/traces', (req, res) => { - res.status(404).end() + const payload = req.body + const strings = payload[0] + const traces = payload[1].map(trace => { + return trace.map(span => { + const meta = {} + const metrics = {} + + Object.keys(span[9]).forEach(key => { meta[strings[key]] = strings[span[9][key]] }) + Object.keys(span[10]).forEach(key => { metrics[strings[key]] = span[10][key] }) + + span = { + service: strings[span[0]], + name: strings[span[1]], + resource: strings[span[2]], + trace_id: span[3], + span_id: span[4], + parent_id: span[5], + start: span[6], + duration: span[7], + error: span[8], + meta, + metrics, + type: strings[span[11]] + } + + return span + }) + }) + + res.status(200).send({ rate_by_service: { 'service:,env:': 1 } }) + + traceHandlers.forEach(({ handler, spanResourceMatch }) => { + const spans = traces.flatMap(span => span) + if (isMatchingTrace(spans, spanResourceMatch)) { + handler(traces) + } + }) }) agent.put('/v0.4/traces', (req, res) => { diff --git a/packages/dd-trace/test/plugins/util/web.spec.js b/packages/dd-trace/test/plugins/util/web.spec.js index 821d2d8d537..277bb36861e 100644 --- a/packages/dd-trace/test/plugins/util/web.spec.js +++ b/packages/dd-trace/test/plugins/util/web.spec.js @@ -597,18 +597,6 @@ describe('plugins/util/web', () => { expect(config.hooks.request).to.have.been.calledWith(span, req, res) }) }) - - it('should set the resource name from the http.route tag set in the hooks', () => { - config.hooks = { - request: span => span.setTag('http.route', '/custom/route') - } - - web.instrument(tracer, config, req, res, 'test.request', span => { - res.end() - - expect(tags).to.have.property('resource.name', 'GET /custom/route') - }) - }) }) })