Skip to content

Commit

Permalink
add test to improve coverage report of caliper worker test coverage
Browse files Browse the repository at this point in the history
Signed-off-by: Babatunde Sanusi <swisskid95@gmail.com>
  • Loading branch information
tunedev committed Sep 23, 2024
1 parent 3c6b739 commit 36a2cb2
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 29 deletions.
51 changes: 26 additions & 25 deletions packages/caliper-core/lib/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class CaliperWorker {
this.connector = connector;
this.workerIndex = workerIndex;
this.messenger = messenger;
this.rateControlClass = RateControl;

this.internalTxObserver = new InternalTxObserver(messenger, managerUuid, workerIndex);
this.txObserverDispatch = new TxObserverDispatch(messenger, this.internalTxObserver, managerUuid, workerIndex);
Expand Down Expand Up @@ -84,55 +85,56 @@ class CaliperWorker {
* @param {Object} rateController rate controller object
* @async
*/
async runFixedNumber(workloadModule, number, rateController) {
async _runFixedNumber(workloadModule, number, rateController) {
const stats = this.internalTxObserver.getCurrentStatistics();
let error = undefined;
while (stats.getTotalSubmittedTx() < number && !error) {
const errors = [];

while (stats.getTotalSubmittedTx() < number) {
await rateController.applyRateControl();

// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
workloadModule.submitTransaction()
.catch(err => { error = err; });
.catch(err => {
Logger.error(`Error in worker ${this.workerIndex}, round ${this.currentRound}: ${err}`);
errors.push(err);
});
});
}

if (error) {
// Already logged, no need to log again
throw error;
if (errors.length > 0) {
throw new Error(`Errors occurred while submitting transactions on worker ${this.workerIndex} for round ${this.currentRound}`);
}

await CaliperWorker._waitForTxsToFinish(stats);
}


/**
* Perform test with specified test duration
* @param {object} workloadModule The user test module.
* @param {Object} duration duration to run for
* @param {Object} rateController rate controller object
* @async
*/
async runDuration(workloadModule, duration, rateController) {
async _runDuration(workloadModule, duration, rateController) {
const stats = this.internalTxObserver.getCurrentStatistics();
let startTime = stats.getRoundStartTime();
let error = undefined;
while ((Date.now() - startTime) < (duration * 1000) && !error) {
const errors = [];

while ((Date.now() - startTime) < (duration * 1000)) {
await rateController.applyRateControl();

// If this function calls this.workloadModule.submitTransaction() too quickly, micro task queue will be filled with unexecuted promises,
// and I/O task(s) will get no chance to be execute and fall into starvation, for more detail info please visit:
// https://snyk.io/blog/nodejs-how-even-quick-async-functions-can-block-the-event-loop-starve-io/
await this.setImmediatePromise(() => {
workloadModule.submitTransaction()
.catch(err => { error = err; });
.catch(err => {
Logger.error(`Error in worker ${this.workerIndex}, round ${this.currentRound}: ${err}`);
errors.push(err);
});
});
}

if (error) {
// Already logged, no need to log again
throw error;
if (errors.length > 0) {
throw new Error('Errors occurred while submitting transactions on some workers');
}

await CaliperWorker._waitForTxsToFinish(stats);
Expand Down Expand Up @@ -161,7 +163,7 @@ class CaliperWorker {
await this.workloadModule.initializeWorkloadModule(this.workerIndex, prepareTestMessage.getWorkersNumber(), roundIndex, prepareTestMessage.getWorkloadSpec().arguments, this.connector, context);
await CaliperUtils.sleep(this.txUpdateTime);
} catch (err) {
Logger.info(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`);
Logger.warn(`Worker [${this.workerIndex}] encountered an error during prepare test phase for round ${roundIndex}: ${(err.stack ? err.stack : err)}`);
throw err;
} finally {
await this.connector.releaseContext(context);
Expand Down Expand Up @@ -194,19 +196,18 @@ class CaliperWorker {

// Configure
Logger.debug(`Worker #${this.workerIndex} creating rate controller`);
rateController = new RateControl(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex);
rateController = new this.rateControlClass(testMessage, this.internalTxObserver.getCurrentStatistics(), this.workerIndex);

// Run the test loop
Logger.info(`Worker #${this.workerIndex} starting workload loop`);

if (testMessage.getRoundDuration()) {
const duration = testMessage.getRoundDuration(); // duration in seconds
await this.runDuration(this.workloadModule, duration, rateController);
await this._runDuration(this.workloadModule, duration, rateController);
} else {
const number = testMessage.getNumberOfTxs();
await this.runFixedNumber(this.workloadModule, number, rateController);
await this._runFixedNumber(this.workloadModule, number, rateController);
}

Logger.debug(`Worker #${this.workerIndex} finished round #${roundIndex}`, this.internalTxObserver.getCurrentStatistics().getCumulativeTxStatistics());
return this.internalTxObserver.getCurrentStatistics();
} catch (err) {
Expand Down
209 changes: 205 additions & 4 deletions packages/caliper-core/test/worker/caliper-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mockStats.getTotalSubmittedTx.onFirstCall().returns(0);
mockStats.getTotalSubmittedTx.onSecondCall().returns(1);
const deactivateMethod = sinon.stub();
let logwarningMethod = sinon.stub();
let logerrorMethod = sinon.stub();

class MockCaliperUtils {
static resolvePath(path) {
Expand All @@ -54,7 +55,7 @@ class MockCaliperUtils {
static getLogger() {
return {
debug: sinon.stub(),
error: sinon.stub(),
error: logerrorMethod,
warn: logwarningMethod,
info: sinon.stub()
};
Expand All @@ -72,6 +73,34 @@ class MockInternalTxObserver {
class MockTxObserverDispatch {
activate() {}
}

/**
* Mock implementation of the RateControl class used for testing.
* Provides stub methods for rate control operations.
*/
class MockRateControl {
/**
* Cleans up the rate controller.
* This mock method simulates the cleanup process.
* @async
* @returns {Promise<void>} A promise that resolves when the cleanup is complete.
*/
async end() {
// Mock cleanup logic (if any)
}

/**
* Applies rate control to throttle the transaction submission rate.
* This mock method simulates rate control with a delay.
* @async
* @returns {Promise<void>} A promise that resolves after a delay.
*/
async applyRateControl(delay = 10) {
await new Promise(resolve => setTimeout(resolve, delay));
}
}


MockTxObserverDispatch.prototype.deactivate = deactivateMethod;

mockery.enable({
Expand All @@ -84,7 +113,7 @@ mockery.registerMock('./tx-observers/tx-observer-dispatch', MockTxObserverDispat

const loggerSandbox = sinon.createSandbox();
const CaliperUtils = require('../../lib/common/utils/caliper-utils');
loggerSandbox.replace(CaliperUtils, "getLogger", MockCaliperUtils.getLogger);
loggerSandbox.replace(CaliperUtils, 'getLogger', MockCaliperUtils.getLogger);

const CaliperWorker = require('../../lib/worker/caliper-worker');

Expand Down Expand Up @@ -119,7 +148,7 @@ describe('Caliper worker', () => {

afterEach(() => {
sandbox.restore();
})
});

const validateCallsAndWarnings = (warnings) => {
sinon.assert.calledOnce(mockWorkload.submitTransaction);
Expand All @@ -145,7 +174,7 @@ describe('Caliper worker', () => {
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.rejects(new Error('failure'));

await worker.executeRound(mockTestMessage).should.be.rejectedWith(/failure/);
await worker.executeRound(mockTestMessage).should.be.rejected;
validateCallsAndWarnings(0);
});

Expand All @@ -161,5 +190,177 @@ describe('Caliper worker', () => {
await worker.executeRound(mockTestMessage);
validateCallsAndWarnings(4);
});

[5, 10].forEach(numberOfTxs => {
it(`should run ${numberOfTxs} transactions and wait for completion when no errors occur`, async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);

mockTestMessage.getNumberOfTxs.returns(numberOfTxs);
mockTestMessage.getRoundDuration.returns(null);

mockWorkload.submitTransaction.resetHistory();
mockStats.getTotalSubmittedTx.resetHistory();
mockStats.getTotalFinishedTx.resetHistory();
mockStats.getCumulativeTxStatistics.resetHistory();

let submittedTx = 0;
let finishedTx = 0;

// Stub the methods
mockStats.getTotalSubmittedTx.callsFake(() => submittedTx);
mockStats.getTotalFinishedTx.callsFake(() => finishedTx);
mockStats.getCumulativeTxStatistics.returns({});

worker.internalTxObserver.getCurrentStatistics = () => mockStats;

mockWorkload.submitTransaction.callsFake(async () => {
submittedTx += 1;
finishedTx += 1;
return Promise.resolve();
});

await worker.executeRound(mockTestMessage);

sinon.assert.callCount(mockWorkload.submitTransaction, numberOfTxs);
sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockRate.end);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.called(mockConnector.releaseContext);
});
});

it('should execute the round for a specified duration', async function() {
this.timeout(5000); // Increase the timeout for this test
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);
mockWorkload.submitTransaction.resolves();
mockTestMessage.getRoundDuration.returns(1); // duration in seconds

await worker.executeRound(mockTestMessage);

sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockRate.end);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.called(mockConnector.releaseContext);
});

it('should continue submitting transactions but not wait for outstanding transactions if one or more submits throw an error', async function() {
// Arrange
const workerIndex = 1;
const worker = new CaliperWorker(mockConnector, workerIndex, mockMessenger, 'uuid');
worker.rateControlClass = MockRateControl;
await worker.prepareTest(mockTestMessage);

mockTestMessage.getRoundDuration.returns(0.1);
mockTestMessage.getNumberOfTxs.returns(null);

// Set up mock statistics
const startTime = Date.now();
const mockStats = {
getRoundStartTime: () => startTime,
getTotalSubmittedTx: sinon.stub(),
getTotalFinishedTx: sinon.stub()
};

worker.internalTxObserver.getCurrentStatistics = () => mockStats;

// Simulate transaction submissions with some failures
const submissionResults = [true, true, false, true, false, true, true, true, false, true];
let submissionIndex = 0;
let submissionCount = 0;
let finishedTxCount = 0;
const pendingTransactions = [];

mockWorkload.submitTransaction.callsFake(() => {
submissionCount++;
const result = submissionResults[submissionIndex++];
if (result) {
// Simulate delayed transaction completion
const promise = new Promise(resolve => setTimeout(() => {
finishedTxCount++;
resolve();
}, 10));
pendingTransactions.push(promise);
return promise;
} else {
// Simulate delayed error
return new Promise((_, reject) => setTimeout(() => {
reject(new Error(`transaction error ${submissionIndex}`));
}, 10));
}
});

mockStats.getTotalSubmittedTx.callsFake(() => submissionCount);
mockStats.getTotalFinishedTx.callsFake(() => finishedTxCount);

logerrorMethod.reset();

// **Add stubs to resolve promises**
sandbox.stub(worker, 'txObserverDispatch').resolves();
mockWorkload.cleanupWorkloadModule.resolves();
mockConnector.releaseContext.resolves();

await worker.executeRound(mockTestMessage).should.be.rejectedWith(
'Errors occurred while submitting transactions on some workers'
);

// Assertions
const callCount = mockWorkload.submitTransaction.callCount;
callCount.should.be.within(8, 12);
sinon.assert.calledWithMatch(
logerrorMethod,
sinon.match(/Error in worker/).and(sinon.match(/transaction error/))
);
finishedTxCount.should.be.lessThan(submissionCount);

sinon.assert.calledOnce(deactivateMethod);
sinon.assert.calledOnce(mockWorkload.cleanupWorkloadModule);
sinon.assert.called(mockConnector.releaseContext);

// Clean up
await Promise.all(pendingTransactions);
});

it('should handle errors during the prepareTest phase', async () => {
const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
const errorMessage = 'Initialization error';
mockConnector.getContext.rejects(new Error(errorMessage));
mockTestMessage.getRoundIndex.returns(1);
mockTestMessage.getWorkloadSpec.returns({ module: 'test/workload' });
mockTestMessage.getWorkerArguments.returns([]);

await worker.prepareTest(mockTestMessage).should.be.rejectedWith(errorMessage);

sinon.assert.calledOnce(mockConnector.getContext);
sinon.assert.calledOnce(logwarningMethod);
});

it('should not submit transactions after the duration ends', async function() {
this.timeout(5000);

const worker = new CaliperWorker(mockConnector, 1, mockMessenger, 'uuid');
await worker.prepareTest(mockTestMessage);

const clock = sinon.useFakeTimers();
mockWorkload.submitTransaction.resolves();

mockTestMessage.getRoundDuration.returns(1);
mockTestMessage.getNumberOfTxs.returns(null);

const executePromise = worker.executeRound(mockTestMessage);

await clock.tickAsync(1000); // Advance time by 1 second
await Promise.resolve();

const callCountAtDurationEnd = mockWorkload.submitTransaction.callCount;

await clock.tickAsync(1000); // Advance time by another second
await executePromise;

clock.restore();

sinon.assert.callCount(mockWorkload.submitTransaction, callCountAtDurationEnd);
});
});
});

0 comments on commit 36a2cb2

Please sign in to comment.