Skip to content

fix(llmobs): llmobs data can be sent to agent proxy running on uds #5679

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions packages/dd-trace/src/llmobs/writers/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,18 @@ class BaseLLMObsWriter {
this._destroyed = false
}

get url () {
if (this._agentless == null) return null

const baseUrl = this._baseUrl.href
const endpoint = this._endpoint

// Split on protocol separator to preserve it
// path.join will remove some slashes unnecessarily
const [protocol, rest] = baseUrl.split('://')
return protocol + '://' + path.join(rest, endpoint)
}

append (event, byteLength) {
if (this._buffer.length >= this._bufferLimit) {
logger.warn(`${this.constructor.name} event buffer full (limit is ${this._bufferLimit}), dropping event`)
Expand Down Expand Up @@ -69,7 +81,7 @@ class BaseLLMObsWriter {
const options = this._getOptions()

request(payload, options, (err, resp, code) => {
parseResponseAndLog(err, code, events.length, options.url.href, this._eventType)
parseResponseAndLog(err, code, events.length, this.url, this._eventType)
})
}

Expand All @@ -87,17 +99,23 @@ class BaseLLMObsWriter {

setAgentless (agentless) {
this._agentless = agentless
this._url = this._getUrl()
logger.debug(`Configuring ${this.constructor.name} to ${this._url.href}`)
const { url, endpoint } = this._getUrlAndPath()

this._baseUrl = url
this._endpoint = endpoint

logger.debug(`Configuring ${this.constructor.name} to ${this.url}`)
}

_getUrl () {
_getUrlAndPath () {
if (this._agentless) {
return new URL(format({
protocol: 'https:',
hostname: `${this._intake}.${this._config.site}`,
pathname: this._endpoint
}))
return {
url: new URL(format({
protocol: 'https:',
hostname: `${this._intake}.${this._config.site}`
})),
endpoint: this._endpoint
}
}

const { hostname, port } = this._config
Expand All @@ -107,8 +125,10 @@ class BaseLLMObsWriter {
port
}))

const proxyPath = path.join(EVP_PROXY_AGENT_BASE_PATH, this._endpoint)
return new URL(proxyPath, base)
return {
url: base,
endpoint: path.join(EVP_PROXY_AGENT_BASE_PATH, this._endpoint)
}
}

_getOptions () {
Expand All @@ -118,7 +138,8 @@ class BaseLLMObsWriter {
},
method: 'POST',
timeout: this._timeout,
url: this._url
url: this._baseUrl,
path: this._endpoint
}

if (this._agentless) {
Expand Down
46 changes: 38 additions & 8 deletions packages/dd-trace/test/llmobs/writers/base.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ describe('BaseLLMObsWriter', () => {
writer.setAgentless(true)

expect(writer._agentless).to.be.true
expect(writer._url.href).to.equal('https://intake.site.com/endpoint')
expect(writer.url).to.equal('https://intake.site.com/endpoint')
})

it('constructs an agent proxy writer', () => {
writer = new BaseLLMObsWriter(options)
writer.setAgentless(false)

expect(writer._agentless).to.be.false
expect(writer._url.href).to.equal('http://localhost:8126/evp_proxy/v2/endpoint')
expect(writer.url).to.equal('http://localhost:8126/evp_proxy/v2/endpoint')
})

describe('with config url', () => {
Expand All @@ -73,7 +73,37 @@ describe('BaseLLMObsWriter', () => {
writer = new BaseLLMObsWriter(options)
writer.setAgentless(false)

expect(writer._url.href).to.equal('http://test-agent:12345/evp_proxy/v2/endpoint')
expect(writer.url).to.equal('http://test-agent:12345/evp_proxy/v2/endpoint')
})
})

describe('with unix socket', () => {
beforeEach(() => {
options.config.url = new URL('unix:///var/run/datadog/apm.socket/')
})

afterEach(() => {
delete options.config.url
})

it('constructs a writer with the correct url', () => {
writer = new BaseLLMObsWriter(options)
writer.setAgentless(false)

expect(writer.url).to.equal('unix:///var/run/datadog/apm.socket/evp_proxy/v2/endpoint')
})

it('makes the request with the correct options', () => {
writer = new BaseLLMObsWriter(options)
writer.setAgentless(false)
writer.makePayload = (events) => ({ events })

writer.append({ foo: 'bar' })
writer.flush()

const requestOptions = request.getCall(0).args[1]
expect(requestOptions.url.href).to.equal('unix:///var/run/datadog/apm.socket/')
expect(requestOptions.path).to.equal('/evp_proxy/v2/endpoint')
})
})

Expand Down Expand Up @@ -134,7 +164,8 @@ describe('BaseLLMObsWriter', () => {
writer.flush()

const requestOptions = request.getCall(0).args[1]
expect(requestOptions.url.href).to.equal('https://intake.site.com/endpoint')
expect(requestOptions.url.href).to.equal('https://intake.site.com/')
expect(requestOptions.path).to.equal('/endpoint')
expect(requestOptions.headers['Content-Type']).to.equal('application/json')
expect(requestOptions.headers['DD-API-KEY']).to.equal('test')
})
Expand All @@ -148,7 +179,8 @@ describe('BaseLLMObsWriter', () => {
writer.flush()

const requestOptions = request.getCall(0).args[1]
expect(requestOptions.url.href).to.equal('http://localhost:8126/evp_proxy/v2/endpoint')
expect(requestOptions.url.href).to.equal('http://localhost:8126/')
expect(requestOptions.path).to.equal('/evp_proxy/v2/endpoint')
expect(requestOptions.headers['Content-Type']).to.equal('application/json')
expect(requestOptions.headers['X-Datadog-EVP-Subdomain']).to.equal('intake')
})
Expand Down Expand Up @@ -188,16 +220,14 @@ describe('BaseLLMObsWriter', () => {
writer.append({ foo: 'bar' })

const error = new Error('boom')
let reqUrl
request.callsFake((url, options, callback) => {
reqUrl = options.url.href
callback(error)
})

writer.flush()

expect(logger.error).to.have.been.calledWith(
'Error sending %d LLMObs %s events to %s: %s', 1, undefined, reqUrl, 'boom', error
'Error sending %d LLMObs %s events to %s: %s', 1, undefined, 'https://intake.site.com/endpoint', 'boom', error
)
})

Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/test/llmobs/writers/evaluations.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ describe('LLMObsEvalMetricsWriter', () => {

writer.flush = flush // just to stop the beforeExit flush call

expect(writer._url.href).to.equal('https://api.datadoghq.com/api/intake/llm-obs/v1/eval-metric')
expect(writer.url).to.equal('https://api.datadoghq.com/api/intake/llm-obs/v1/eval-metric')
expect(writer._eventType).to.equal('evaluation_metric')
})

Expand All @@ -32,7 +32,7 @@ describe('LLMObsEvalMetricsWriter', () => {
hostname: 'localhost'
})
writer.setAgentless(false)
expect(writer._url.href).to.equal('http://localhost:8126/evp_proxy/v2/api/intake/llm-obs/v1/eval-metric')
expect(writer.url).to.equal('http://localhost:8126/evp_proxy/v2/api/intake/llm-obs/v1/eval-metric')
expect(writer._eventType).to.equal('evaluation_metric')
})

Expand Down
4 changes: 2 additions & 2 deletions packages/dd-trace/test/llmobs/writers/spans.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ describe('LLMObsSpanWriter', () => {
writer = new LLMObsSpanWriter(config)
writer.setAgentless(true)
expect(writer._agentless).to.equal(true)
expect(writer._url.href).to.equal('https://llmobs-intake.datadoghq.com/api/v2/llmobs')
expect(writer.url).to.equal('https://llmobs-intake.datadoghq.com/api/v2/llmobs')
})

it('creates an agent proxy writer', () => {
writer = new LLMObsSpanWriter(config)
writer.setAgentless(false)

expect(writer._agentless).to.equal(false)
expect(writer._url.href).to.equal('http://localhost:8126/evp_proxy/v2/api/v2/llmobs')
expect(writer.url).to.equal('http://localhost:8126/evp_proxy/v2/api/v2/llmobs')
})

it('computes the number of bytes of the appended event', () => {
Expand Down