Skip to content

Commit

Permalink
add test to improve coverage report of caliper worker test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Babatunde Sanusi <swisskid95@gmail.com>
  • Loading branch information
tunedev committed Oct 8, 2024
1 parent 3c6b739 commit e20dfcc
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 16 deletions.
32 changes: 20 additions & 12 deletions packages/caliper-core/lib/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,10 @@ class CaliperWorker {
* @param {Object} rateController rate controller object
* @async
*/
async runFixedNumber(workloadModule, number, rateController) {
async _runFixedNumber(workloadModule, number, rateController) {
const stats = this.internalTxObserver.getCurrentStatistics();
let error = undefined;

while (stats.getTotalSubmittedTx() < number && !error) {
await rateController.applyRateControl();

Expand All @@ -95,7 +96,10 @@ class CaliperWorker {
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
workloadModule.submitTransaction()
.catch(err => { error = err; });
.catch(err => {
Logger.error(`Error in worker ${this.workerIndex}, round ${this.currentRound}: ${err}`);
error = err;
});
});
}

Expand All @@ -107,26 +111,31 @@ class CaliperWorker {
await CaliperWorker._waitForTxsToFinish(stats);
}


/**
* Perform test with specified test duration
* @param {object} workloadModule The user test module.
* @param {Object} duration duration to run for
* @param {Object} rateController rate controller object
* @async
*/
async runDuration(workloadModule, duration, rateController) {
async _runDuration(workloadModule, duration, rateController) {
const stats = this.internalTxObserver.getCurrentStatistics();
let startTime = stats.getRoundStartTime();
let error = undefined;
while ((Date.now() - startTime) < (duration * 1000) && !error) {

// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
while ((Date.now() - startTime) < (duration * 1000)) {
await rateController.applyRateControl();

// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
workloadModule.submitTransaction()
.catch(err => { error = err; });
.catch(err => {
Logger.error(`Error in worker ${this.workerIndex}, round ${this.currentRound}: ${err}`);
error = err;
});
});
}

Expand Down Expand Up @@ -161,7 +170,7 @@ class CaliperWorker {
await this.workloadModule.initializeWorkloadModule(this.workerIndex, prepareTestMessage.getWorkersNumber(), roundIndex, prepareTestMessage.getWorkloadSpec().arguments, this.connector, context);
await CaliperUtils.sleep(this.txUpdateTime);
} catch (err) {
Logger.info(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`);
Logger.warn(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`);
throw err;
} finally {
await this.connector.releaseContext(context);
Expand Down Expand Up @@ -201,12 +210,11 @@ class CaliperWorker {

if (testMessage.getRoundDuration()) {
const duration = testMessage.getRoundDuration(); // duration in seconds
await this.runDuration(this.workloadModule, duration, rateController);
await this._runDuration(this.workloadModule, duration, rateController);
} else {
const number = testMessage.getNumberOfTxs();
await this.runFixedNumber(this.workloadModule, number, rateController);
await this._runFixedNumber(this.workloadModule, number, rateController);
}

Logger.debug(`Worker #${this.workerIndex} finished round #${roundIndex}`, this.internalTxObserver.getCurrentStatistics().getCumulativeTxStatistics());
return this.internalTxObserver.getCurrentStatistics();
} catch (err) {
Expand Down
133 changes: 129 additions & 4 deletions packages/caliper-core/test/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mockStats.getTotalSubmittedTx.onFirstCall().returns(0);
mockStats.getTotalSubmittedTx.onSecondCall().returns(1);
const deactivateMethod = sinon.stub();
let logwarningMethod = sinon.stub();
let logerrorMethod = sinon.stub();

class MockCaliperUtils {
static resolvePath(path) {
Expand All @@ -54,7 +55,7 @@ class MockCaliperUtils {
static getLogger() {
return {
debug: sinon.stub(),
error: sinon.stub(),
error: logerrorMethod,
warn: logwarningMethod,
info: sinon.stub()
};
Expand All @@ -72,6 +73,34 @@ class MockInternalTxObserver {
class MockTxObserverDispatch {
activate() {}
}

/**
* Mock implementation of the RateControl class used for testing.
* Provides stub methods for rate control operations.
*/
class MockRateControl {
/**
* Cleans up the rate controller.
* This mock method simulates the cleanup process.
* @async
* @returns {Promise<void>} A promise that resolves when the cleanup is complete.
*/
async end() {
// Mock cleanup logic (if any)
}

/**
* Applies rate control to throttle the transaction submission rate.
* This mock method simulates rate control with a delay.
* @async
* @returns {Promise<void>} A promise that resolves after a delay.
*/
async applyRateControl(delay = 10) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}


MockTxObserverDispatch.prototype.deactivate = deactivateMethod;

mockery.enable({
Expand All @@ -84,7 +113,7 @@ mockery.registerMock('./tx-observers/tx-observer-dispatch', MockTxObserverDispat

const loggerSandbox = sinon.createSandbox();
const CaliperUtils = require('../../lib/common/utils/caliper-utils');
loggerSandbox.replace(CaliperUtils, "getLogger", MockCaliperUtils.getLogger);
loggerSandbox.replace(CaliperUtils, 'getLogger', MockCaliperUtils.getLogger);

const CaliperWorker = require('../../lib/worker/caliper-worker');

Expand Down Expand Up @@ -119,7 +148,7 @@ describe('Caliper worker', () => {

afterEach(() => {
sandbox.restore();
})
});

const validateCallsAndWarnings = (warnings) => {
sinon.assert.calledOnce(mockWorkload.submitTransaction);
Expand All @@ -145,7 +174,7 @@ describe('Caliper worker', () => {
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.rejects(new Error('failure'));

await worker.executeRound(mockTestMessage).should.be.rejectedWith(/failure/);
await worker.executeRound(mockTestMessage).should.be.rejected;
validateCallsAndWarnings(0);
});

Expand All @@ -161,5 +190,101 @@ describe('Caliper worker', () => {
await worker.executeRound(mockTestMessage);
validateCallsAndWarnings(4);
});

[5, 10].forEach(numberOfTxs => {
it(`should run ${numberOfTxs} transactions and wait for completion when no errors occur`, async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);

mockTestMessage.getNumberOfTxs.returns(numberOfTxs);
mockTestMessage.getRoundDuration.returns(null);

mockWorkload.submitTransaction.resetHistory();
mockStats.getTotalSubmittedTx.resetHistory();
mockStats.getTotalFinishedTx.resetHistory();
mockStats.getCumulativeTxStatistics.resetHistory();

let submittedTx = 0;
let finishedTx = 0;

// Stub the methods
mockStats.getTotalSubmittedTx.callsFake(() => submittedTx);
mockStats.getTotalFinishedTx.callsFake(() => finishedTx);
mockStats.getCumulativeTxStatistics.returns({});

worker.internalTxObserver.getCurrentStatistics = () => mockStats;

mockWorkload.submitTransaction.callsFake(async () => {
submittedTx += 1;
finishedTx += 1;
return Promise.resolve();
});

await worker.executeRound(mockTestMessage);

sinon.assert.callCount(mockWorkload.submitTransaction, numberOfTxs);
sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockRate.end);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.called(mockConnector.releaseContext);
});
});

it('should execute the round for a specified duration', async function() {
this.timeout(5000); // Increase the timeout for this test
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.resolves();
mockTestMessage.getRoundDuration.returns(1); // duration in seconds

await worker.executeRound(mockTestMessage);

sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockRate.end);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.called(mockConnector.releaseContext);
});


it('should handle errors during the prepareTest phase', async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
const errorMessage = 'Initialization error';
mockConnector.getContext.rejects(new Error(errorMessage));
mockTestMessage.getRoundIndex.returns(1);
mockTestMessage.getWorkloadSpec.returns({ module: 'test/workload' });
mockTestMessage.getWorkerArguments.returns([]);

await worker.prepareTest(mockTestMessage).should.be.rejectedWith(errorMessage);

sinon.assert.calledOnce(mockConnector.getContext);
sinon.assert.calledOnce(logwarningMethod);
});

it('should not submit transactions after the duration ends', async function() {
this.timeout(5000);

const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);

const clock = sinon.useFakeTimers();
mockWorkload.submitTransaction.resolves();

mockTestMessage.getRoundDuration.returns(1);
mockTestMessage.getNumberOfTxs.returns(null);

const executePromise = worker.executeRound(mockTestMessage);

await clock.tickAsync(1000); // Advance time by 1 second
await Promise.resolve();

const callCountAtDurationEnd = mockWorkload.submitTransaction.callCount;

await clock.tickAsync(1000); // Advance time by another second
await executePromise;

clock.restore();

sinon.assert.callCount(mockWorkload.submitTransaction, callCountAtDurationEnd);
});
});
});

0 comments on commit e20dfcc

Please sign in to comment.