Skip to content

Commit eec21f3

Browse files
watsonStephen Belanger
authored andcommitted
feat: support APM Server intake API version 2 (#465)
Closes #356
1 parent 3958964 commit eec21f3

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+1354
-2128
lines changed

docs/agent-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1090,7 +1090,7 @@ Defaults to `unnamed`
10901090
You can alternatively set this via <<span-type,`span.type`>>.
10911091
Defaults to `custom.code`
10921092

1093-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
1093+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
10941094

10951095
See <<span-api,Span API>> docs for details on how to use custom spans.
10961096

docs/span-api.asciidoc

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ Defaults to `unnamed`
7373
You can alternatively set this via <<span-type,`span.type`>>.
7474
Defaults to `custom.code`
7575

76-
When a span is started it will measure the time until <<span-end,`span.end()`>> or <<span-truncate,`span.truncate()`>> is called.
76+
When a span is started it will measure the time until <<span-end,`span.end()`>> is called.
7777

7878
[[span-end]]
7979
==== `span.end()`
@@ -86,20 +86,3 @@ span.end()
8686
End the span.
8787
If the span has already ended,
8888
nothing happens.
89-
90-
A span that isn't ended before the parent transaction ends will be <<span-truncate,truncated>>.
91-
92-
[[span-truncate]]
93-
==== `span.truncate()`
94-
95-
[source,js]
96-
----
97-
span.truncate()
98-
----
99-
100-
Truncates and ends the span.
101-
If the span is already ended or truncated,
102-
nothing happens.
103-
104-
A truncated span is a special type of ended span.
105-
It's used to indicate that the measured event took longer than the duration recorded by the span.

docs/transaction-api.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Think of the transaction result as equivalent to the status code of an HTTP resp
5757
transaction.end([result])
5858
----
5959

60-
Ends the transaction and <<span-truncate,truncates>> all un-ended child spans.
60+
Ends the transaction.
6161
If the transaction has already ended,
6262
nothing happens.
6363

lib/agent.js

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ var connect = require('./middleware/connect')
1515
var Filters = require('./filters')
1616
var Instrumentation = require('./instrumentation')
1717
var parsers = require('./parsers')
18-
var request = require('./request')
1918
var stackman = require('./stackman')
2019
var symbols = require('./symbols')
20+
var truncate = require('./truncate')
2121

2222
var IncomingMessage = http.IncomingMessage
2323
var ServerResponse = http.ServerResponse
@@ -32,6 +32,7 @@ function Agent () {
3232

3333
this._instrumentation = new Instrumentation(this)
3434
this._filters = new Filters()
35+
this._apmServer = null
3536

3637
this._conf = null
3738
this._httpClient = null
@@ -52,6 +53,10 @@ Object.defineProperty(Agent.prototype, 'currentTransaction', {
5253
}
5354
})
5455

56+
Agent.prototype.destroy = function () {
57+
if (this._apmServer) this._apmServer.destroy()
58+
}
59+
5560
Agent.prototype.startTransaction = function () {
5661
return this._instrumentation.startTransaction.apply(this._instrumentation, arguments)
5762
}
@@ -129,15 +134,35 @@ Agent.prototype.start = function (opts) {
129134
})
130135
}
131136

132-
this._instrumentation.start()
137+
this._apmServer = new ElasticAPMHttpClient({
138+
// metadata
139+
agentName: 'nodejs',
140+
agentVersion: version,
141+
serviceName: this._conf.serviceName,
142+
serviceVersion: this._conf.serviceVersion,
143+
frameworkName: this._conf.frameworkName,
144+
frameworkVersion: this._conf.frameworkVersion,
145+
hostname: this._conf.hostname,
133146

134-
this._httpClient = new ElasticAPMHttpClient({
147+
// Sanitize conf
148+
truncateStringsAt: config.INTAKE_STRING_MAX_SIZE, // TODO: Do we need to set this (it's the default value)
149+
150+
// HTTP conf
135151
secretToken: this._conf.secretToken,
136152
userAgent: userAgent,
137153
serverUrl: this._conf.serverUrl,
138154
rejectUnauthorized: this._conf.verifyServerCert,
139-
serverTimeout: this._conf.serverTimeout * 1000
155+
serverTimeout: this._conf.serverTimeout * 1000,
156+
157+
// Streaming conf
158+
size: this._conf.apiRequestSize,
159+
time: this._conf.apiRequestTime * 1000
140160
})
161+
this._apmServer.on('error', err => {
162+
this.logger.error('An error occrued while communicating with the APM Server:', err.message)
163+
})
164+
165+
this._instrumentation.start()
141166

142167
Error.stackTraceLimit = this._conf.stackTraceLimit
143168
if (this._conf.captureExceptions) this.handleUncaughtExceptions()
@@ -285,10 +310,27 @@ Agent.prototype.captureError = function (err, opts, cb) {
285310
}
286311

287312
function send (error) {
288-
agent.logger.info('logging error %s with Elastic APM', id)
289-
request.errors(agent, [error], (err) => {
290-
if (cb) cb(err, error.id)
291-
})
313+
error = agent._filters.process(error) // TODO: Update filter to expect this format
314+
315+
if (!error) {
316+
agent.logger.debug('error ignored by filter %o', { id: id })
317+
cb(null, id)
318+
return
319+
}
320+
321+
truncate.error(error, agent._conf)
322+
323+
if (agent._apmServer) {
324+
agent.logger.info(`Sending error ${id} to Elastic APM`)
325+
agent._apmServer.sendError(error, function () {
326+
agent._apmServer.flush(function (err) {
327+
cb(err, id)
328+
})
329+
})
330+
} else {
331+
// TODO: Swallow this error just as it's done in agent.flush()?
332+
process.nextTick(cb.bind(null, new Error('cannot capture error before agent is started'), id))
333+
}
292334
}
293335
}
294336

@@ -314,7 +356,12 @@ Agent.prototype.handleUncaughtExceptions = function (cb) {
314356
}
315357

316358
Agent.prototype.flush = function (cb) {
317-
this._instrumentation.flush(cb)
359+
if (this._apmServer) {
360+
this._apmServer.flush(cb)
361+
} else {
362+
this.logger.warn(new Error('cannot flush agent before it is started'))
363+
process.nextTick(cb)
364+
}
318365
}
319366

320367
Agent.prototype.lambda = function wrapLambda (type, fn) {

lib/config.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
'use strict'
22

33
var fs = require('fs')
4-
var os = require('os')
54
var path = require('path')
65

76
var consoleLogLevel = require('console-log-level')
@@ -35,7 +34,8 @@ var DEFAULTS = {
3534
verifyServerCert: true,
3635
active: true,
3736
logLevel: 'info',
38-
hostname: os.hostname(),
37+
apiRequestSize: 1024 * 1024, // TODO: Is this the right default
38+
apiRequestTime: 10, // TODO: Is this the right default. Should this be ms?
3939
stackTraceLimit: 50,
4040
captureExceptions: true,
4141
filterHttpHeaders: true,
@@ -51,10 +51,10 @@ var DEFAULTS = {
5151
sourceLinesSpanAppFrames: 0,
5252
sourceLinesSpanLibraryFrames: 0,
5353
errorMessageMaxLength: 2048,
54-
flushInterval: 10,
54+
flushInterval: 10, // TODO: Deprecate
5555
transactionMaxSpans: 500,
5656
transactionSampleRate: 1.0,
57-
maxQueueSize: 100,
57+
maxQueueSize: 100, // TODO: Deprecate
5858
serverTimeout: 30,
5959
disableInstrumentations: []
6060
}
@@ -68,6 +68,8 @@ var ENV_TABLE = {
6868
active: 'ELASTIC_APM_ACTIVE',
6969
logLevel: 'ELASTIC_APM_LOG_LEVEL',
7070
hostname: 'ELASTIC_APM_HOSTNAME',
71+
apiRequestSize: 'ELASTIC_APM_API_REQUEST_SIZE',
72+
apiRequestTime: 'ELASTIC_APM_API_REQUEST_TIME',
7173
frameworkName: 'ELASTIC_APM_FRAMEWORK_NAME',
7274
frameworkVersion: 'ELASTIC_APM_FRAMEWORK_VERSION',
7375
stackTraceLimit: 'ELASTIC_APM_STACK_TRACE_LIMIT',
@@ -105,6 +107,8 @@ var BOOL_OPTS = [
105107
]
106108

107109
var NUM_OPTS = [
110+
'apiRequestSize',
111+
'apiRequestTime',
108112
'stackTraceLimit',
109113
'abortedErrorThreshold',
110114
'flushInterval',

lib/instrumentation/index.js

Lines changed: 32 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,11 @@
33
var fs = require('fs')
44
var path = require('path')
55

6-
var AsyncValuePromise = require('async-value-promise')
76
var hook = require('require-in-the-middle')
87
var semver = require('semver')
98

10-
var Queue = require('../queue')
11-
var request = require('../request')
129
var Transaction = require('./transaction')
10+
var truncate = require('../truncate')
1311
var shimmer = require('./shimmer')
1412

1513
var MODULES = [
@@ -45,7 +43,6 @@ module.exports = Instrumentation
4543

4644
function Instrumentation (agent) {
4745
this._agent = agent
48-
this._queue = null
4946
this._hook = null // this._hook is only exposed for testing purposes
5047
this._started = false
5148
this.currentTransaction = null
@@ -59,21 +56,6 @@ Instrumentation.prototype.start = function () {
5956
var self = this
6057
this._started = true
6158

62-
var qopts = {
63-
flushInterval: this._agent._conf.flushInterval,
64-
maxQueueSize: this._agent._conf.maxQueueSize,
65-
logger: this._agent.logger
66-
}
67-
this._queue = new Queue(qopts, function onFlush (transactions, done) {
68-
AsyncValuePromise.all(transactions).then(function (transactions) {
69-
if (self._agent._conf.active && transactions.length > 0) {
70-
request.transactions(self._agent, transactions, done)
71-
} else {
72-
done()
73-
}
74-
}, done)
75-
})
76-
7759
if (this._agent._conf.asyncHooks && semver.gte(process.version, '8.2.0')) {
7860
require('./async-hooks')(this)
7961
} else {
@@ -110,27 +92,44 @@ Instrumentation.prototype._patchModule = function (exports, name, version, enabl
11092
}
11193

11294
Instrumentation.prototype.addEndedTransaction = function (transaction) {
95+
var agent = this._agent
96+
11397
if (this._started) {
114-
var queue = this._queue
98+
var payload = agent._filters.process(transaction._encode()) // TODO: Update filter to expect this format
99+
if (!payload) return agent.logger.debug('transaction ignored by filter %o', { id: transaction.id })
100+
truncate.transaction(payload)
101+
agent.logger.debug('sending transaction %o', { id: transaction.id })
102+
agent._apmServer.sendTransaction(payload)
103+
} else {
104+
agent.logger.debug('ignoring transaction %o', { id: transaction.id })
105+
}
106+
}
115107

116-
this._agent.logger.debug('adding transaction to queue %o', { id: transaction.id })
108+
Instrumentation.prototype.addEndedSpan = function (span) {
109+
var agent = this._agent
117110

118-
var payload = new AsyncValuePromise()
111+
if (this._started) {
112+
agent.logger.debug('encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type })
113+
span._encode(function (err, payload) {
114+
if (err) {
115+
agent.logger.error('error encoding span %o', { trans: span.transaction.id, name: span.name, type: span.type, error: err.message })
116+
return
117+
}
119118

120-
payload.catch(function (err) {
121-
this._agent.logger.error('error encoding transaction %s: %s', transaction.id, err.message)
122-
})
119+
payload = agent._filters.process(payload) // TODO: Update filter to expect this format
123120

124-
// Add the transaction payload to the queue instead of the transation
125-
// object it self to free up the transaction for garbage collection
126-
transaction._encode(function (err, _payload) {
127-
if (err) payload.reject(err)
128-
else payload.resolve(_payload)
129-
})
121+
if (!payload) {
122+
agent.logger.debug('span ignored by filter %o', { trans: span.transaction.id, name: span.name, type: span.type })
123+
return
124+
}
130125

131-
queue.add(payload)
126+
truncate.span(payload)
127+
128+
agent.logger.debug('sending span %o', { trans: span.transaction.id, name: span.name, type: span.type })
129+
if (agent._apmServer) agent._apmServer.sendSpan(payload)
130+
})
132131
} else {
133-
this._agent.logger.debug('ignoring transaction %o', { id: transaction.id })
132+
agent.logger.debug('ignoring span %o', { trans: span.transaction.id, name: span.name, type: span.type })
134133
}
135134
}
136135

@@ -220,11 +219,3 @@ Instrumentation.prototype._recoverTransaction = function (trans) {
220219

221220
this.currentTransaction = trans
222221
}
223-
224-
Instrumentation.prototype.flush = function (cb) {
225-
if (this._queue) {
226-
this._queue.flush(cb)
227-
} else {
228-
process.nextTick(cb)
229-
}
230-
}

lib/instrumentation/span.js

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ module.exports = Span
1414
function Span (transaction) {
1515
this.transaction = transaction
1616
this.started = false
17-
this.truncated = false
1817
this.ended = false
1918
this.name = null
2019
this.type = null
@@ -50,18 +49,6 @@ Span.prototype.customStackTrace = function (stackObj) {
5049
this._recordStackTrace(stackObj)
5150
}
5251

53-
Span.prototype.truncate = function () {
54-
if (!this.started) {
55-
this._agent.logger.debug('tried to truncate non-started span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
56-
return
57-
} else if (this.ended) {
58-
this._agent.logger.debug('tried to truncate already ended span - ignoring %o', { id: this.transaction.id, name: this.name, type: this.type })
59-
return
60-
}
61-
this.truncated = true
62-
this.end()
63-
}
64-
6552
Span.prototype.end = function () {
6653
if (!this.started) {
6754
this._agent.logger.debug('tried to call span.end() on un-started span %o', { id: this.transaction.id, name: this.name, type: this.type })
@@ -75,8 +62,8 @@ Span.prototype.end = function () {
7562
this._agent._instrumentation._recoverTransaction(this.transaction)
7663

7764
this.ended = true
78-
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type, truncated: this.truncated })
79-
this.transaction._recordEndedSpan(this)
65+
this._agent.logger.debug('ended span %o', { id: this.transaction.id, name: this.name, type: this.type })
66+
this._agent._instrumentation.addEndedSpan(this)
8067
}
8168

8269
Span.prototype.duration = function () {
@@ -157,8 +144,10 @@ Span.prototype._encode = function (cb) {
157144
}
158145

159146
var payload = {
147+
transactionId: self.transaction.id,
148+
timestamp: self.transaction.timestamp,
160149
name: self.name,
161-
type: self.truncated ? self.type + '.truncated' : self.type,
150+
type: self.type,
162151
start: self.offsetTime(),
163152
duration: self.duration()
164153
}

0 commit comments

Comments
 (0)