Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
### Added

- Provide custom resource attributes via `cds.requires.telemetry.resource.attributes`
- Added new queue statistic `queue.processing_failed` that tracks the number of failed attempts at processing tasks per instance, per tenant, per service

### Fixed

Expand Down
4 changes: 2 additions & 2 deletions lib/exporter/ConsoleMetricExporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ConsoleMetricExporter extends StandardConsoleMetricExporter {
// export queue metrics
for (const tenant of Object.keys(queue)) {
let toLog = `queue${tenant !== 'undefined' ? ` of tenant "${tenant}"` : ''}:`
toLog += `\n cold | remaining | min storage time | med storage time | max storage time | incoming | outgoing`
toLog += `\n cold | remaining | min storage time | med storage time | max storage time | incoming | outgoing | failed`
toLog += `\n ${`${queue[tenant].cold_entries}`.padStart(
7
)} | ${`${queue[tenant].remaining_entries}`.padStart(
Expand All @@ -107,7 +107,7 @@ class ConsoleMetricExporter extends StandardConsoleMetricExporter {
16
)} | ${`${queue[tenant].incoming_messages}`.padStart(8)} | ${`${queue[tenant].outgoing_messages}`.padStart(
8
)}`
)} | ${`${queue[tenant].processing_failures}`.padStart(6)}`
LOG.info(toLog)
}

Expand Down
43 changes: 31 additions & 12 deletions lib/metrics/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ function initQueueObservation(statistics) {
const meter = metrics.getMeter('@cap-js/telemetry:queue')
const observables = {}

// Gauges

observables.coldEntries = meter.createObservableGauge('queue.cold_entries', {
description:
'Number of entries that could not be delivered after repeated attempts and will not be retried anymore.',
Expand Down Expand Up @@ -79,6 +81,8 @@ function initQueueObservation(statistics) {
valueType: ValueType.INT
})

// Counters

observables.incomingMessages = meter.createObservableCounter('queue.incoming_messages', {
description: 'Number of incoming messages of the queue. Increased by one each time a new message entry is created.',
unit: 'each',
Expand All @@ -91,6 +95,12 @@ function initQueueObservation(statistics) {
valueType: ValueType.INT
})

observables.processingFailures = meter.createObservableCounter('queue.processing_failures', {
description: 'Number of failed message processing attempts by the outbox.',
unit: 'each',
valueType: ValueType.INT
})

meter.addBatchObservableCallback(batchResult => {
for (const tenant in statistics) {
for (const [serviceName, stats] of Object.entries(statistics[tenant])) {
Expand All @@ -116,6 +126,8 @@ function initQueueObservation(statistics) {
batchResult.observe(observables.incomingMessages, stats.incomingMessages, observationAttributes)

batchResult.observe(observables.outgoingMessages, stats.outgoingMessages, observationAttributes)

batchResult.observe(observables.processingFailures, stats.processingFailures, observationAttributes)
}
}
}, Object.values(observables))
Expand All @@ -129,6 +141,7 @@ function initTenantQueueStatistics(statistics, tenant, queuedServiceName) {
statistics[tenant][queuedServiceName] = {
incomingMessages: 0,
outgoingMessages: 0,
processingFailures: 0,
coldEntries: 0,
remainingEntries: 0,
minTimestamp: null,
Expand Down Expand Up @@ -174,10 +187,13 @@ module.exports = () => {
initQueueObservation(statistics)

// Register service when it's first found to be the target of an queued message
cds.db.after(['CREATE'], queueEntity, async (_, req) => {
cds.db.after(['CREATE'], queueEntity, async (_, createTaskReq) => {
const tenant = cds.context?.tenant

const queuedServiceName = req.data.target
const queuedServiceName = createTaskReq.data.target

// Initialize statistics for the tenant and service if not already done
initTenantQueueStatistics(statistics, tenant, queuedServiceName)

if (!registeredServics.has(queuedServiceName)) {

Expand All @@ -187,19 +203,22 @@ module.exports = () => {
return
}

cds.unqueued(targetedService).before('*', () => {
const tenant = cds.context?.tenant
cds.db.after(['UPDATE'], queueEntity, async (_, updateTaskReq) => {
if (!updateTaskReq.data.target === queuedServiceName) return
if (!updateTaskReq.data.attempts) return
statistics[tenant][queuedServiceName].processingFailures += 1
})

// Initialize statistics for the tenant and service if not already done
initTenantQueueStatistics(statistics, tenant, queuedServiceName)
const queueMetricsCollectionHandler = () => {
const tenant = cds.context?.tenant;
statistics[tenant][queuedServiceName].outgoingMessages += 1;
};
queueMetricsCollectionHandler._initial = true;

statistics[tenant][queuedServiceName].outgoingMessages += 1
})
registeredServics.add(queuedServiceName)
}
cds.unqueued(targetedService).before("*", queueMetricsCollectionHandler);

// Initialize statistics for the tenant and service if not already done
initTenantQueueStatistics(statistics, tenant, queuedServiceName)
registeredServics.add(queuedServiceName);
}

statistics[tenant][queuedServiceName].incomingMessages += 1
})
Expand Down
36 changes: 36 additions & 0 deletions test/bookshop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,42 @@
}
}
},
"[metrics-outbox]": {
"requires": {
"queue": true,
"telemetry": {
"metrics": {
"config": {
"exportIntervalMillis": 100
},
"_db_pool": false,
"_queue": true,
"exporter": {
"module": "@opentelemetry/sdk-metrics",
"class": "ConsoleMetricExporter"
}
}
}
}
},
"[metrics-outbox-disabled]": {
"requires": {
"queue": true,
"telemetry": {
"metrics": {
"config": {
"exportIntervalMillis": 250
},
"_db_pool": false,
"_queue": false,
"exporter": {
"module": "@opentelemetry/sdk-metrics",
"class": "ConsoleMetricExporter"
}
}
}
}
},
"log": {
"cls_custom_fields": [
"tenant_id"
Expand Down
17 changes: 5 additions & 12 deletions test/metrics-outbox-disabled.test.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
process.env.cds_requires_outbox = true
process.env.cds_requires_telemetry_metrics = JSON.stringify({
config: { exportIntervalMillis: 100 },
_db_pool: false,
_queue: false,
exporter: {
module: '@opentelemetry/sdk-metrics',
class: 'ConsoleMetricExporter'
}
})

// Mock console.dir to capture logs ConsoleMetricExporter writes
const consoleDirLogs = []
jest.spyOn(console, 'dir').mockImplementation((...args) => {
Expand All @@ -18,7 +7,11 @@ jest.spyOn(console, 'dir').mockImplementation((...args) => {
const cds = require('@sap/cds')
const { setTimeout: wait } = require('node:timers/promises')

const { expect, GET } = cds.test(__dirname + '/bookshop', '--with-mocks')
const { expect, GET } = cds.test(
__dirname + '/bookshop',
'--with-mocks',
'--profile', 'metrics-outbox-disabled'
)

function metricValue(metric) {
const mostRecentMetricLog = consoleDirLogs.findLast(
Expand Down
17 changes: 5 additions & 12 deletions test/metrics-outbox-multitenant.test.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
process.env.cds_requires_outbox = true
process.env.cds_requires_telemetry_metrics = JSON.stringify({
config: { exportIntervalMillis: 250 },
_db_pool: false,
_queue: true,
exporter: {
module: '@opentelemetry/sdk-metrics',
class: 'ConsoleMetricExporter'
}
})

// Mock console.dir to capture logs ConsoleMetricExporter writes
const consoleDirLogs = []
jest.spyOn(console, 'dir').mockImplementation((...args) => {
Expand All @@ -18,7 +7,11 @@ jest.spyOn(console, 'dir').mockImplementation((...args) => {
const cds = require('@sap/cds')
const { setTimeout: wait } = require('node:timers/promises')

const { expect, GET, axios } = cds.test(__dirname + '/bookshop', '--profile', 'multitenancy', '--with-mocks')
const { expect, GET, axios } = cds.test(
__dirname + '/bookshop',
'--with-mocks',
'--profile', 'metrics-outbox, multitenancy'
)
axios.defaults.validateStatus = () => true

function metricValue(tenant, metric) {
Expand Down
39 changes: 21 additions & 18 deletions test/metrics-outbox.test.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
process.env.cds_requires_outbox = true
process.env.cds_requires_telemetry_metrics = JSON.stringify({
config: { exportIntervalMillis: 100 },
_db_pool: false,
_queue: true,
exporter: {
module: '@opentelemetry/sdk-metrics',
class: 'ConsoleMetricExporter'
}
})

// Mock console.dir to capture logs ConsoleMetricExporter writes
const consoleDirLogs = []
jest.spyOn(console, 'dir').mockImplementation((...args) => {
Expand All @@ -18,7 +7,11 @@ jest.spyOn(console, 'dir').mockImplementation((...args) => {
const cds = require('@sap/cds')
const { setTimeout: wait } = require('node:timers/promises')

const { expect, GET } = cds.test(__dirname + '/bookshop', '--with-mocks')
const { expect, GET } = cds.test(
__dirname + '/bookshop',
'--with-mocks',
'--profile', 'metrics-outbox'
)
const debugLog = cds.log('telemetry').debug = jest.fn(() => {})

function metricValue(metric) {
Expand All @@ -34,6 +27,7 @@ function metricValue(metric) {
describe('queue metrics for single tenant service', () => {
let totalInc = 0
let totalOut = 0
let totalFailed = 0

const admin = { auth: { username: 'alice' } }

Expand Down Expand Up @@ -69,12 +63,13 @@ describe('queue metrics for single tenant service', () => {

await GET('/odata/v4/proxy/proxyCallToExternalService', admin)

await wait(150) // Wait for metrics to be collected
await wait(300) // Wait for metrics to be collected

expect(metricValue('cold_entries')).to.eq(0)
expect(metricValue('remaining_entries')).to.eq(0)
expect(metricValue('incoming_messages')).to.eq(totalInc)
expect(metricValue('outgoing_messages')).to.eq(totalOut)
expect(metricValue('processing_failures')).to.eq(totalFailed)
expect(metricValue('min_storage_time_in_seconds')).to.eq(0)
expect(metricValue('med_storage_time_in_seconds')).to.eq(0)
expect(metricValue('max_storage_time_in_seconds')).to.eq(0)
Expand All @@ -89,7 +84,10 @@ describe('queue metrics for single tenant service', () => {
unboxedService = await cds.connect.to('ExternalService')

unboxedService.before('call', req => {
if ((currentRetryCount += 1) <= 2) return req.reject({ status: 503 })
if ((currentRetryCount += 1) <= 2) {
totalFailed += 1
return req.reject({ status: 503 })
}
})
})

Expand All @@ -107,13 +105,14 @@ describe('queue metrics for single tenant service', () => {
const timeOfInitialCall = Date.now()
await GET('/odata/v4/proxy/proxyCallToExternalService', admin)

await wait(150) // ... for metrics to be collected
await wait(500) // ... for metrics to be collected
expect(currentRetryCount).to.eq(1)

expect(metricValue('cold_entries')).to.eq(0)
expect(metricValue('remaining_entries')).to.eq(1)
expect(metricValue('incoming_messages')).to.eq(totalInc)
expect(metricValue('outgoing_messages')).to.eq(totalOut)
expect(metricValue('processing_failures')).to.eq(totalFailed)
expect(metricValue('min_storage_time_in_seconds')).to.eq(0)
expect(metricValue('med_storage_time_in_seconds')).to.eq(0)
expect(metricValue('max_storage_time_in_seconds')).to.eq(0)
Expand All @@ -135,6 +134,7 @@ describe('queue metrics for single tenant service', () => {
expect(metricValue('remaining_entries')).to.eq(1)
expect(metricValue('incoming_messages')).to.eq(totalInc)
expect(metricValue('outgoing_messages')).to.eq(totalOut)
expect(metricValue('processing_failures')).to.eq(totalFailed)
expect(metricValue('min_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue('med_storage_time_in_seconds')).to.be.gte(1)
expect(metricValue('max_storage_time_in_seconds')).to.be.gte(1)
Expand All @@ -148,6 +148,7 @@ describe('queue metrics for single tenant service', () => {
expect(metricValue('remaining_entries')).to.eq(0)
expect(metricValue('incoming_messages')).to.eq(totalInc)
expect(metricValue('outgoing_messages')).to.eq(totalOut)
expect(metricValue('processing_failures')).to.eq(totalFailed)
expect(metricValue('min_storage_time_in_seconds')).to.eq(0)
expect(metricValue('med_storage_time_in_seconds')).to.eq(0)
expect(metricValue('max_storage_time_in_seconds')).to.eq(0)
Expand All @@ -161,6 +162,7 @@ describe('queue metrics for single tenant service', () => {
unboxedService = await cds.connect.to('ExternalService')

unboxedService.before('call', req => {
totalFailed += 1
return req.reject({ status: 418, unrecoverable: true })
})
})
Expand All @@ -174,12 +176,13 @@ describe('queue metrics for single tenant service', () => {

await GET('/odata/v4/proxy/proxyCallToExternalService', admin)

await wait(150) // ... for metrics to be collected
await wait(300) // ... for metrics to be collected

expect(metricValue('cold_entries')).to.eq(1)
expect(metricValue('remaining_entries')).to.eq(0)
expect(metricValue('incoming_messages')).to.eq(totalInc)
expect(metricValue('outgoing_messages')).to.eq(totalOut)
expect(metricValue('processing_failures')).to.eq(totalFailed)
expect(metricValue('min_storage_time_in_seconds')).to.eq(0)
expect(metricValue('med_storage_time_in_seconds')).to.eq(0)
expect(metricValue('max_storage_time_in_seconds')).to.eq(0)
Expand All @@ -194,8 +197,8 @@ describe('queue metrics for single tenant service', () => {

try {
await INSERT.into('cds.outbox.Messages').entries({ ID: cds.utils.uuid(), target: 'unknown-service' })
} catch {
expect.fail('Did not expect an error here')
} catch (e) {
expect.fail(`Did not expect an error here: ${e.message}`)
}

expect(debugLog.mock.calls.some(log => log[0].match(/unknown service/i))).to.be.true
Expand Down
Loading