Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log consumer lag metrics #2563

Open
wants to merge 1 commit into
base: development/8.7
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions extensions/replication/queueProcessor/QueueProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ class QueueProcessor extends EventEmitter {
groupId,
concurrency: this.repConfig.queueProcessor.concurrency,
queueProcessor: queueProcessorFunc,
logConsumerMetricsIntervalS: this.repConfig.queueProcessor.logConsumerMetricsIntervalS,
canary: true,
circuitBreaker: this.circuitBreakerConfig,
circuitBreakerMetrics: {
Expand Down
38 changes: 37 additions & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@
circuitBreakerMetrics: joi.object({
type: joi.string().required(),
}).optional(),
logConsumerMetricsIntervalS: joi.number(),
});
const validConfig = joi.attempt(config, configJoi,
'invalid config params');

const { clientId, zookeeper, kafka, topic, groupId, queueProcessor,
fromOffset, concurrency, fetchMaxBytes,
fromOffset, concurrency, fetchMaxBytes, logConsumerMetricsIntervalS,
canary, bootstrap, circuitBreaker, circuitBreakerMetrics } = validConfig;

this._zookeeperEndpoint = zookeeper && zookeeper.connectionString;
Expand All @@ -127,6 +128,7 @@
this._canary = canary;
this._bootstrap = bootstrap;
this._offsetLedger = new OffsetLedger();
this._logConsumerMetricsIntervalS = logConsumerMetricsIntervalS;

this._processingQueue = null;
this._messagesConsumed = 0;
Expand Down Expand Up @@ -207,6 +209,9 @@
if (this._fetchMaxBytes !== undefined) {
consumerParams['fetch.message.max.bytes'] = this._fetchMaxBytes;
}
if (this._logConsumerMetricsIntervalS !== undefined) {
consumerParams['statistics.interval.ms'] = this._logConsumerMetricsIntervalS * 1000;
}
if (process.env.RDKAFKA_DEBUG_LOGS) {
consumerParams.debug = process.env.RDKAFKA_DEBUG_LOGS;
}
Expand Down Expand Up @@ -241,6 +246,37 @@
return this._consumer.once('ready', () => {
this._consumerReady = true;
this._checkIfReady();
if (this._logConsumerMetricsIntervalS !== undefined) {
this._consumer.on('event.stats', res => {
const statsObj = JSON.parse(res.message);
if (typeof statsObj !== 'object') {
return undefined;

Check warning on line 253 in lib/BackbeatConsumer.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/BackbeatConsumer.js#L253

Added line #L253 was not covered by tests
}
const topicStats = statsObj.topics[this._topic];
if (typeof topicStats !== 'object') {
return undefined;

Check warning on line 257 in lib/BackbeatConsumer.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

lib/BackbeatConsumer.js#L257

Added line #L257 was not covered by tests
}
const consumerStats = {
lag: {},
};
// Gather stats per partition consumed by this
// consumer instance
Object.keys(topicStats.partitions).forEach(partition => {
/* eslint-disable camelcase */
const { consumer_lag, fetch_state } =
topicStats.partitions[partition];
if (fetch_state === 'active' && consumer_lag >= 0) {
consumerStats.lag[partition] = consumer_lag;
}
/* eslint-enable camelcase */
});
this._log.info('topic consumer statistics', {
topic: this._topic,
consumerStats,
});
return undefined;
});
}
});
}

Expand Down
93 changes: 93 additions & 0 deletions tests/functional/lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -932,3 +932,96 @@ describe('BackbeatConsumer shutdown tests', () => {
], done);
}).timeout(60000);
});

describe('BackbeatConsumer statistics logging tests', () => {
const topic = 'backbeat-consumer-spec-statistics';
const groupId = `replication-group-${Math.random()}`;
const messages = [
{ key: 'foo', message: '{"hello":"foo"}' },
{ key: 'bar', message: '{"world":"bar"}' },
{ key: 'qux', message: '{"hi":"qux"}' },
];
let producer;
let consumer;
let consumedMessages = [];
function queueProcessor(message, cb) {
consumedMessages.push(message.value);
process.nextTick(cb);
}
before(function before(done) {
this.timeout(60000);
producer = new BackbeatProducer({
kafka: producerKafkaConf,
topic,
pollIntervalMs: 100,
});
consumer = new BackbeatConsumer({
zookeeper: zookeeperConf,
kafka: consumerKafkaConf, groupId, topic,
queueProcessor,
concurrency: 10,
bootstrap: true,
// this enables statistics logging
logConsumerMetricsIntervalS: 1,
});
async.parallel([
innerDone => producer.on('ready', innerDone),
innerDone => consumer.on('ready', innerDone),
], done);
});
afterEach(() => {
consumedMessages = [];
consumer.removeAllListeners('consumed');
});
after(done => {
async.parallel([
innerDone => producer.close(innerDone),
innerDone => consumer.close(innerDone),
], done);
});
it('should be able to log consumer statistics', done => {
producer.send(messages, err => {
assert.ifError(err);
});
let totalConsumed = 0;
// It would have been nice to check that the lag is strictly
// positive when we haven't consumed yet, but the lag seems
// off when no consumer offset has been written yet to Kafka,
// so it cannot be tested reliably until we start consuming.
consumer.subscribe();
consumer.on('consumed', messagesConsumed => {
totalConsumed += messagesConsumed;
assert(totalConsumed <= messages.length);
if (totalConsumed === messages.length) {
let firstTime = true;
setTimeout(() => {
consumer._log = {
error: () => {},
warn: () => {},
info: (message, args) => {
if (firstTime && message.indexOf('statistics') !== -1) {
firstTime = false;
assert.strictEqual(args.topic, topic);
const consumerStats = args.consumerStats;
assert.strictEqual(typeof consumerStats, 'object');
const lagStats = consumerStats.lag;
assert.strictEqual(typeof lagStats, 'object');
// there should be one consumed partition
assert.strictEqual(Object.keys(lagStats).length, 1);
// everything should have been
// consumed hence consumer offsets
// stored equal topic offset, and lag
// should be 0.
const partitionLag = lagStats['0'];
assert.strictEqual(partitionLag, 0);
done();
}
},
debug: () => {},
trace: () => {},
};
}, 5000);
}
});
}).timeout(30000);
});
Loading