Skip to content

Commit

Permalink
Log consumer lag metrics
Browse files Browse the repository at this point in the history
Issue: BB-561
  • Loading branch information
KillianG committed Oct 18, 2024
1 parent 31d9ca6 commit efb3f0b
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 1 deletion.
1 change: 1 addition & 0 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ class QueueProcessor extends EventEmitter {
groupId,
concurrency: this.repConfig.queueProcessor.concurrency,
queueProcessor: queueProcessorFunc,
logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS,
canary: true,
circuitBreaker: this.circuitBreakerConfig,
circuitBreakerMetrics: {
Expand Down
38 changes: 37 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ class BackbeatConsumer extends EventEmitter {
circuitBreakerMetrics: joi.object({
type: joi.string().required(),
}).optional(),
logConsumerMetricsIntervalS: joi.number(),
});
const validConfig = joi.attempt(config, configJoi,
'invalid config params');

const { clientId, zookeeper, kafka, topic, groupId, queueProcessor,
fromOffset, concurrency, fetchMaxBytes,
fromOffset, concurrency, fetchMaxBytes, logConsumerMetricsIntervalS,
canary, bootstrap, circuitBreaker, circuitBreakerMetrics } = validConfig;

this._zookeeperEndpoint = zookeeper && zookeeper.connectionString;
Expand All @@ -127,6 +128,7 @@ class BackbeatConsumer extends EventEmitter {
this._canary = canary;
this._bootstrap = bootstrap;
this._offsetLedger = new OffsetLedger();
this._logConsumerMetricsIntervalS = logConsumerMetricsIntervalS;

this._processingQueue = null;
this._messagesConsumed = 0;
Expand Down Expand Up @@ -207,6 +209,9 @@ class BackbeatConsumer extends EventEmitter {
if (this._fetchMaxBytes !== undefined) {
consumerParams['fetch.message.max.bytes'] = this._fetchMaxBytes;
}
if (this._logConsumerMetricsIntervalS !== undefined) {
consumerParams['statistics.interval.ms'] = this._logConsumerMetricsIntervalS * 1000;
}
if (process.env.RDKAFKA_DEBUG_LOGS) {
consumerParams.debug = process.env.RDKAFKA_DEBUG_LOGS;
}
Expand Down Expand Up @@ -241,6 +246,37 @@ class BackbeatConsumer extends EventEmitter {
return this._consumer.once('ready', () => {
this._consumerReady = true;
this._checkIfReady();
if (this._logConsumerMetricsIntervalS !== undefined) {
this._consumer.on('event.stats', res => {
const statsObj = JSON.parse(res.message);
if (typeof statsObj !== 'object') {
return undefined;

Check warning on line 253 in lib/BackbeatConsumer.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/BackbeatConsumer.js#L253

Added line #L253 was not covered by tests
}
const topicStats = statsObj.topics[this._topic];
if (typeof topicStats !== 'object') {
return undefined;

Check warning on line 257 in lib/BackbeatConsumer.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/BackbeatConsumer.js#L257

Added line #L257 was not covered by tests
}
const consumerStats = {
lag: {},
};
// Gather stats per partition consumed by this
// consumer instance
Object.keys(topicStats.partitions).forEach(partition => {
/* eslint-disable camelcase */
const { consumer_lag, fetch_state } =
topicStats.partitions[partition];
if (fetch_state === 'active' && consumer_lag >= 0) {
consumerStats.lag[partition] = consumer_lag;
}
/* eslint-enable camelcase */
});
this._log.info('topic consumer statistics', {
topic: this._topic,
consumerStats,
});
return undefined;
});
}
});
}

Expand Down
93 changes: 93 additions & 0 deletions tests/functional/lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -932,3 +932,96 @@ describe('BackbeatConsumer shutdown tests', () => {
], done);
}).timeout(60000);
});

describe('BackbeatConsumer statistics logging tests', () => {
const topic = 'backbeat-consumer-spec-statistics';
const groupId = `replication-group-${Math.random()}`;
const messages = [
{ key: 'foo', message: '{"hello":"foo"}' },
{ key: 'bar', message: '{"world":"bar"}' },
{ key: 'qux', message: '{"hi":"qux"}' },
];
let producer;
let consumer;
let consumedMessages = [];
function queueProcessor(message, cb) {
consumedMessages.push(message.value);
process.nextTick(cb);
}
before(function before(done) {
this.timeout(60000);
producer = new BackbeatProducer({
kafka: producerKafkaConf,
topic,
pollIntervalMs: 100,
});
consumer = new BackbeatConsumer({
zookeeper: zookeeperConf,
kafka: consumerKafkaConf, groupId, topic,
queueProcessor,
concurrency: 10,
bootstrap: true,
// this enables statistics logging
logConsumerMetricsIntervalS: 1,
});
async.parallel([
innerDone => producer.on('ready', innerDone),
innerDone => consumer.on('ready', innerDone),
], done);
});
afterEach(() => {
consumedMessages = [];
consumer.removeAllListeners('consumed');
});
after(done => {
async.parallel([
innerDone => producer.close(innerDone),
innerDone => consumer.close(innerDone),
], done);
});
it('should be able to log consumer statistics', done => {
producer.send(messages, err => {
assert.ifError(err);
});
let totalConsumed = 0;
// It would have been nice to check that the lag is strictly
// positive when we haven't consumed yet, but the lag seems
// off when no consumer offset has been written yet to Kafka,
// so it cannot be tested reliably until we start consuming.
consumer.subscribe();
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
assert(totalConsumed <= messages.length);
if (totalConsumed === messages.length) {
let firstTime = true;
setTimeout(() => {
consumer._log = {
error: () => {},
warn: () => {},
info: (message, args) => {
if (firstTime && message.indexOf('statistics') !== -1) {
firstTime = false;
assert.strictEqual(args.topic, topic);
const consumerStats = args.consumerStats;
assert.strictEqual(typeof consumerStats, 'object');
const lagStats = consumerStats.lag;
assert.strictEqual(typeof lagStats, 'object');
// there should be one consumed partition
assert.strictEqual(Object.keys(lagStats).length, 1);
// everything should have been
// consumed hence consumer offsets
// stored equal topic offset, and lag
// should be 0.
const partitionLag = lagStats['0'];
assert.strictEqual(partitionLag, 0);
done();
}
},
debug: () => {},
trace: () => {},
};
}, 5000);
}
});
}).timeout(30000);
});

0 comments on commit efb3f0b

Please sign in to comment.