Skip to content

Commit

Permalink
forward port workarround for queue populator batch getting stuck
Browse files Browse the repository at this point in the history
original commit hash: dfcc2f5

Issue: BB-526
  • Loading branch information
Kerkesni committed Sep 26, 2024
1 parent a1e9895 commit 85f130b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
7 changes: 6 additions & 1 deletion bin/queuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,16 @@ function queueBatch(queuePopulator, taskState) {
log.debug('skipping batch: previous one still in progress');
return undefined;
}
const onTimeout = () => {

Check warning on line 30 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L30

Added line #L30 was not covered by tests
// reset the flag to allow a new batch to start in case the
// previous batch timed out
taskState.batchInProgress = false;

Check warning on line 33 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L33

Added line #L33 was not covered by tests
};
log.debug('start queueing batch');
taskState.batchInProgress = true;
const maxRead = qpConfig.batchMaxRead;
const timeoutMs = qpConfig.batchTimeoutMs;
queuePopulator.processLogEntries({ maxRead, timeoutMs }, err => {
queuePopulator.processLogEntries({ maxRead, timeoutMs, onTimeout }, err => {

Check warning on line 39 in bin/queuePopulator.js

View check run for this annotation

Codecov / codecov/patch/Backbeat

bin/queuePopulator.js#L39

Added line #L39 was not covered by tests
taskState.batchInProgress = false;
if (err) {
log.error('an error occurred during batch processing', {
Expand Down
11 changes: 11 additions & 0 deletions lib/queuePopulator/LogReader.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class LogReader {
this._metricsHandler = params.metricsHandler;
// TODO: use a common handler for zk metrics from all extensions
this._zkMetricsHandler = params.zkMetricsHandler;

this.batchTimeoutSeconds = parseInt(process.env.BATCH_TIMEOUT_SECONDS, 10) || 300;
}

_setEntryBatch(entryBatch) {
Expand Down Expand Up @@ -291,6 +293,14 @@ class LogReader {
timeoutMs: params.timeoutMs,
logger: this.log.newRequestLogger(),
};
const batchTimeoutTimer = setTimeout(() => {
this.log.error('queue populator batch timeout', {
logStats: batchState.logStats,
});
if (params.onTimeout) {
params.onTimeout();
}
}, this.batchTimeoutSeconds * 1000);
async.waterfall([
next => this._processReadRecords(params, batchState, next),
next => this._processPrepareEntries(batchState, next),
Expand All @@ -299,6 +309,7 @@ class LogReader {
next => this._processSaveLogOffset(batchState, next),
],
err => {
clearTimeout(batchTimeoutTimer);
if (err) {
return done(err);
}
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/lib/queuePopulator/LogReader.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,20 @@ describe('LogReader', () => {
});
});
});

describe('processLogEntries', () => {
it('should call function when the timeout is hit', done => {
logReader.batchTimeoutSeconds = 1;
// logReader will become stuck as _processReadRecords will never
// call the callback
sinon.stub(logReader, '_processReadRecords').returns();
const onTimeout = sinon.stub().returns();
logReader.processLogEntries({ onTimeout }, () => {});
setTimeout(() => {
delete process.env.BATCH_TIMEOUT_SECONDS;
assert(onTimeout.calledOnce);
done();
}, 2000);
}).timeout(4000);
});
});

0 comments on commit 85f130b

Please sign in to comment.