Skip to content

Commit

Permalink
feat(ref-imp): added a lib to fetch operation queue size for monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
thehenrytsai authored Jan 13, 2021
1 parent dc9f1d7 commit 3956109
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 43 deletions.
29 changes: 15 additions & 14 deletions lib/common/EventEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import IEventEmitter from './interfaces/IEventEmitter';
import LogColor from './LogColor';
import Logger from './Logger';

/**
* Event emitter used in Sidetree.
* Intended to be machine readable for triggering custom handlers.
*/
export default class EventEmitter {
// Default to basic console log.
private static singleton: IEventEmitter = {
emit: async (eventCode, eventData?) => {
if (eventData === undefined) {
console.log(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}`));
} else {
console.log(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}: ${JSON.stringify(eventData)}`));
}
}
};
private static customEvenEmitter: IEventEmitter;

/**
* Overrides the default event emitter if given.
* Initializes with custom event emitter if given.
*/
static initialize (customEventEmitter?: IEventEmitter) {
if (customEventEmitter !== undefined) {
EventEmitter.singleton = customEventEmitter;
EventEmitter.customEvenEmitter = customEventEmitter;
}
}

/**
* Emits an event.
*/
public static async emit (eventName: string, eventData?: {[property: string]: any}): Promise<void> {
await EventEmitter.singleton.emit(eventName, eventData);
public static async emit (eventCode: string, eventData?: {[property: string]: any}): Promise<void> {
// Always log the event using the logger.
if (eventData === undefined) {
Logger.info(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}`));
} else {
Logger.info(LogColor.lightBlue(`Event emitted: ${LogColor.green(eventCode)}: ${JSON.stringify(eventData)}`));
}

if (EventEmitter.customEvenEmitter !== undefined) {
await EventEmitter.customEvenEmitter.emit(eventCode, eventData);
}
}
}
16 changes: 4 additions & 12 deletions lib/common/MongoDbStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,14 @@ export default class MongoDbStore {

/**
* Clears the store.
* NOTE: Avoid dropping collection using `collection.drop()` and recreating the collection in rapid succession (such as in tests), because:
* 1. It takes some time (seconds) for the collection be created again.
* 2. Some cloud MongoDB services such as CosmosDB will lead to `MongoError: ns not found` connectivity error.
*/
public async clearCollection () {
await this.collection!.deleteMany({ }); // Empty filter removes all entries in collection.
}

/**
* Drops the entire collection, only used by tests.
* NOTE: Avoid dropping and recreating the collection in rapid succession (such as in tests), because:
* 1. It takes some time (seconds) for the collection be create again.
* 2. Some cloud MongoDB services such as CosmosDB will lead to `MongoError: ns not found` connectivity error.
*/
public async dropCollection () {
await this.collection!.drop();
}

/**
* Creates the collection with indexes if it does not exists.
*/
Expand All @@ -66,9 +59,8 @@ export default class MongoDbStore {

/**
* Create the indices required by the collection passed.
* To be overriden by inherited classes if needed.
* To be overridden by inherited classes if needed.
*/
protected async createIndex (_collection: Collection): Promise<void> {
Logger.info(`Collection '${this.collectionName}' has no index.`);
}
}
2 changes: 1 addition & 1 deletion lib/common/interfaces/IEventEmitter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ export default interface IEventEmitter {
/**
* Emits an event.
*/
emit (eventName: string, eventData?: {[property: string]: any}): Promise<void>;
emit (eventCode: string, eventData?: {[property: string]: any}): Promise<void>;
}
27 changes: 27 additions & 0 deletions lib/core/Monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import Config from './models/Config';
import MongoDbOperationQueue from './versions/latest/MongoDbOperationQueue';

/**
* An class to monitor the Core.
* NOTE: this class is completely decoupled from Core, Core does not depend on this class at all for it to function.
*/
export default class Monitor {

private operationQueue: MongoDbOperationQueue;

public constructor () {
this.operationQueue = new MongoDbOperationQueue();
}

public async initialize (config: Config) {
this.operationQueue.initialize(config.mongoDbConnectionString, config.databaseName);
}

/**
* Gets the size of the operation queue.
*/
public async getOperationQueueSize (): Promise<number> {
const queueSize = await this.operationQueue.getSize();
return queueSize;
}
}
4 changes: 2 additions & 2 deletions lib/core/VersionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ export default class VersionManager implements IVersionManager, IVersionMetadata
const version = versionModel.version;

const MongoDbOperationQueue = await this.loadDefaultExportsForVersion(version, 'MongoDbOperationQueue');
const operationQueue = new MongoDbOperationQueue(this.config.mongoDbConnectionString, this.config.databaseName);
await operationQueue.initialize();
const operationQueue = new MongoDbOperationQueue();
await operationQueue.initialize(this.config.mongoDbConnectionString, this.config.databaseName);

const TransactionProcessor = await this.loadDefaultExportsForVersion(version, 'TransactionProcessor');
const transactionProcessor = new TransactionProcessor(downloadManager, operationStore, blockchain, this);
Expand Down
13 changes: 8 additions & 5 deletions lib/core/versions/latest/MongoDbOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ export default class MongoDbOperationQueue implements IOperationQueue {

private db: Db | undefined;

constructor (private serverUrl: string, private databaseName: string) { }

/**
* Initialize the MongoDB operation store.
*/
public async initialize () {
const client = await MongoClient.connect(this.serverUrl);
this.db = client.db(this.databaseName);
public async initialize (serverUrl: string, databaseName: string) {
const client = await MongoClient.connect(serverUrl);
this.db = client.db(databaseName);
this.collection = await MongoDbOperationQueue.createCollectionIfNotExist(this.db);
}

Expand Down Expand Up @@ -87,6 +85,11 @@ export default class MongoDbOperationQueue implements IOperationQueue {
return operations.length > 0;
}

async getSize (): Promise<number> {
const size = await this.collection!.estimatedDocumentCount();
return size;
}

/**
* * Clears the unresolvable transaction store. Mainly used in tests.
*/
Expand Down
5 changes: 5 additions & 0 deletions lib/core/versions/latest/interfaces/IOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ export default interface IOperationQueue {
* Checks to see if the queue already contains an operation for the given DID unique suffix.
*/
contains (didUniqueSuffix: string): Promise<boolean>;

/**
* Gets the size of the queue.
*/
getSize (): Promise<number>;
}
4 changes: 4 additions & 0 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import SidetreeBitcoinProcessor from './bitcoin/BitcoinProcessor';
import SidetreeBitcoinVersionModel from './bitcoin/models/BitcoinVersionModel';
import SidetreeConfig from './core/models/Config';
import SidetreeCore from './core/Core';
import SidetreeEventCode from './core/EventCode';
import SidetreeMonitor from './core/Monitor';
import SidetreeResponse from './common/Response';
import SidetreeResponseModel from './common/models/ResponseModel';
import SidetreeVersionModel from './core/models/VersionModel';
Expand All @@ -18,6 +20,8 @@ export {
ISidetreeCas,
SidetreeConfig,
SidetreeCore,
SidetreeEventCode,
SidetreeMonitor,
SidetreeResponse,
SidetreeResponseModel,
SidetreeVersionModel
Expand Down
35 changes: 28 additions & 7 deletions tests/bitcoin/MongoDbServiceStateStore.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import BitcoinServiceStateModel from '../../lib/bitcoin/models/BitcoinServiceStateModel';
import Config from '../../lib/core/models/Config';
import { MongoClient } from 'mongodb';
import MongoDb from '../common/MongoDb';
import MongoDbServiceStateStore from '../../lib/common/MongoDbServiceStateStore';

Expand All @@ -12,7 +13,7 @@ async function createStore (storeUri: string, databaseName: string): Promise<Mon
return store;
}

describe('MongoDbSeviceStateStore', async () => {
describe('MongoDbServiceStateStore', async () => {

const config: Config = require('../json/config-test.json');
const databaseName = 'sidetree-test';
Expand All @@ -21,16 +22,11 @@ describe('MongoDbSeviceStateStore', async () => {
let store: MongoDbServiceStateStore<BitcoinServiceStateModel>;

beforeAll(async () => {

mongoServiceAvailable = await MongoDb.isServerAvailable(config.mongoDbConnectionString);
if (mongoServiceAvailable) {
store = await createStore(config.mongoDbConnectionString, databaseName);

// // Delibrately drop the collection completely and initialize it again.
await store.dropCollection();
await store.initialize();
}
}, 10000); // Increasing `beforeAll()` timeout because dropping then recreating collection can take longer than default test timeout of 5 seconds.
});

beforeEach(async () => {
if (!mongoServiceAvailable) {
Expand Down Expand Up @@ -59,4 +55,29 @@ describe('MongoDbSeviceStateStore', async () => {

done();
});

describe('initialize()', async () => {
it('should create collection on initialization if it does not exist.', async (done) => {
// Deleting collections to setup this test.
const client = await MongoClient.connect(config.mongoDbConnectionString);
const db = client.db(databaseName);
await db.dropCollection(MongoDbServiceStateStore.collectionName);

// Make sure no collection exists before we start the test.
const collections = await db.collections();
const collectionNames = collections.map(collection => collection.collectionName);
expect(collectionNames.includes(MongoDbServiceStateStore.collectionName)).toBeFalsy();

// NOTE: In CosmosDB `db.createCollection()` call in `initialize()` does not make the collection "visible"
// until a subsequent operation is called (such as `createIndex()` or inserting record) possibly due to lazy load.
// hence in this test we insert a record and retrieve it again to prove that the collection is created.
await store.initialize();
await store.put({ serviceVersion: '1.1' });

const serviceState = await store.get();
expect(serviceState?.serviceVersion).toEqual('1.1');

done();
});
});
});
12 changes: 10 additions & 2 deletions tests/core/MongoDbOperationQueue.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import SidetreeError from '../../lib/common/SidetreeError';
* Creates a MongoDbOperationQueue and initializes it.
*/
async function createOperationQueue (storeUri: string, databaseName: string): Promise<MongoDbOperationQueue> {
const operationQueue = new MongoDbOperationQueue(storeUri, databaseName);
await operationQueue.initialize();
const operationQueue = new MongoDbOperationQueue();
await operationQueue.initialize(storeUri, databaseName);
return operationQueue;
}

Expand Down Expand Up @@ -145,4 +145,12 @@ describe('MongoDbOperationQueue', async () => {
}
}
});

it('should get queue size correctly.', async () => {
const operationCount = 3;
await generateAndQueueOperations(operationQueue, operationCount);

const size = await operationQueue.getSize();
expect(size).toEqual(3);
});
});
17 changes: 17 additions & 0 deletions tests/core/Monitor.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { SidetreeMonitor } from '../../lib';

describe('Monitor', async () => {
const testConfig = require('../json/config-test.json');

describe('getOperationQueueSize()', async () => {
it('should get operation queue size correctly', async () => {
const monitor = new SidetreeMonitor();
spyOn((monitor as any).operationQueue, 'initialize');
spyOn((monitor as any).operationQueue, 'getSize').and.returnValue(Promise.resolve(300));

monitor.initialize(testConfig);
const queueSize = await monitor.getOperationQueueSize();
expect(queueSize).toEqual(300);
});
});
});
4 changes: 4 additions & 0 deletions tests/mocks/MockOperationQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ export default class MockOperationQueue implements IOperationQueue {
async contains (didUniqueSuffix: string): Promise<boolean> {
return this.operations.has(didUniqueSuffix);
}

async getSize (): Promise<number> {
return this.operations.size;
}
}

0 comments on commit 3956109

Please sign in to comment.