From 2bb51a275fb7f59d5906f065ab2eeaa63bb39e82 Mon Sep 17 00:00:00 2001 From: Sebastijan K <58827427+sebastijankuzner@users.noreply.github.com> Date: Tue, 30 Jun 2020 16:47:19 +0200 Subject: [PATCH] test(core-snapshot): increase coverage to 100% (#3859) --- .../core-snapshots/database-service.test.ts | 67 +++++++------ .../filesystem/filesystem.test.ts | 86 ++++++++++++++++- .../filesystem/stream-reader-writer.test.ts | 60 ++++++------ .../core-snapshots/service-provider.test.ts | 12 ++- .../workers/actions/__support__/helper.ts | 13 +-- .../actions/test-worker-action.test.ts | 22 ++++- .../workers/actions/worker-action.test.ts | 93 ++++++++++++++----- .../workers/worker-wrapper.test.ts | 57 +++++++----- .../core-snapshots/src/database-service.ts | 42 ++++----- .../src/filesystem/filesystem.ts | 10 +- .../src/filesystem/stream-reader.ts | 1 + .../src/repositories/block-repository.ts | 8 +- .../src/repositories/round-repository.ts | 4 +- .../repositories/transaction-repository.ts | 4 +- .../src/workers/actions/read-processor.ts | 32 +------ .../workers/actions/restore-worker-action.ts | 3 +- .../src/workers/actions/test-worker-action.ts | 3 +- .../workers/actions/verify-worker-action.ts | 2 +- .../src/workers/worker-wrapper.ts | 3 +- packages/core-snapshots/src/workers/worker.ts | 2 +- 20 files changed, 341 insertions(+), 183 deletions(-) diff --git a/__tests__/unit/core-snapshots/database-service.test.ts b/__tests__/unit/core-snapshots/database-service.test.ts index 0d0b136310..84615f55fc 100644 --- a/__tests__/unit/core-snapshots/database-service.test.ts +++ b/__tests__/unit/core-snapshots/database-service.test.ts @@ -1,18 +1,18 @@ import "jest-extended"; -import { dirSync, setGracefulCleanup } from "tmp"; -// @ts-ignore -import * as workerThreads from "worker_threads"; -import { EventEmitter } from "events"; -import { Connection } from "typeorm"; import { Container, Providers } from "@packages/core-kernel"; -import { Sandbox } from "@packages/core-test-framework"; +import { LocalFilesystem } from "@packages/core-kernel/src/services/filesystem/drivers/local"; import { SnapshotDatabaseService } from "@packages/core-snapshots/src/database-service"; -import { Identifiers } from "@packages/core-snapshots/src/ioc"; import { Filesystem } from "@packages/core-snapshots/src/filesystem/filesystem"; -import { LocalFilesystem } from "@packages/core-kernel/src/services/filesystem/drivers/local"; +import { Identifiers } from "@packages/core-snapshots/src/ioc"; import { ProgressDispatcher } from "@packages/core-snapshots/src/progress-dispatcher"; -import { BlockRepository, TransactionRepository, RoundRepository } from "@packages/core-snapshots/src/repositories"; +import { BlockRepository, RoundRepository, TransactionRepository } from "@packages/core-snapshots/src/repositories"; +import { Sandbox } from "@packages/core-test-framework"; +import { EventEmitter } from "events"; +import { dirSync, setGracefulCleanup } from "tmp"; +import { Connection } from "typeorm"; +// @ts-ignore +import * as workerThreads from "worker_threads"; import { Assets } from "./__fixtures__"; @@ -72,7 +72,7 @@ beforeEach(() => { isConnected: true, }; - let lastBlock = Assets.blocksBigNumber[0]; + const lastBlock = Assets.blocksBigNumber[0]; lastBlock.height = 100; blockRepository = { @@ -132,7 +132,7 @@ beforeEach(() => { sandbox.app.bind(Identifiers.SnapshotDatabaseService).to(SnapshotDatabaseService).inSingletonScope(); - let pluginConfiguration = sandbox.app.getTagged( + const pluginConfiguration = sandbox.app.getTagged( Container.Identifiers.PluginConfiguration, "plugin", "@arkecosystem/core-snapshots", @@ -153,7 +153,11 @@ afterEach(() => { describe("DatabaseService", () => { describe("init", () => { it("should be ok", async () => { - database.init("default", false); + database.init("default", false, false); + }); + + it("should be ok with default parameters", async () => { + database.init(); }); }); @@ -175,7 +179,7 @@ describe("DatabaseService", () => { describe("rollbackChain", () => { it("should rollback chain to specific height", async () => { - let roundInfo = { + const roundInfo = { round: 1, nextRound: 2, maxDelegates: 51, @@ -193,13 +197,13 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let dumpOptions = { + const dumpOptions = { network: "testnet", skipCompression: false, codec: "default", }; - let promise = database.dump(dumpOptions); + const promise = database.dump(dumpOptions); await expect(promise).toResolve(); }); @@ -212,13 +216,13 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let dumpOptions = { + const dumpOptions = { network: "testnet", skipCompression: false, codec: "default", }; - let promise = database.dump(dumpOptions); + const promise = database.dump(dumpOptions); await expect(promise).rejects.toThrow(); }); @@ -231,13 +235,13 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let dumpOptions = { + const dumpOptions = { network: "testnet", skipCompression: false, codec: "default", }; - let promise = database.dump(dumpOptions); + const promise = database.dump(dumpOptions); await expect(promise).rejects.toThrow(); }); @@ -248,7 +252,7 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let dumpOptions = { + const dumpOptions = { network: "testnet", skipCompression: false, codec: "default", @@ -256,7 +260,7 @@ describe("DatabaseService", () => { mockWorkerWrapper.start = jest.fn().mockRejectedValue(new Error()); - let promise = database.dump(dumpOptions); + const promise = database.dump(dumpOptions); await expect(promise).rejects.toThrow(); }); @@ -269,7 +273,18 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let promise = database.restore(Assets.metaData, { truncate: true }); + const promise = database.restore(Assets.metaData, { truncate: true }); + + await expect(promise).toResolve(); + }); + + it("should resolve without truncate", async () => { + const dir: string = dirSync().name; + const subdir: string = `${dir}/sub`; + + filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); + + const promise = database.restore(Assets.metaData, { truncate: false }); await expect(promise).toResolve(); }); @@ -282,7 +297,7 @@ describe("DatabaseService", () => { mockWorkerWrapper.sync = jest.fn(); - let promise = database.restore(Assets.metaData, { truncate: true }); + const promise = database.restore(Assets.metaData, { truncate: true }); await expect(promise).toResolve(); }); @@ -295,7 +310,7 @@ describe("DatabaseService", () => { mockWorkerWrapper.start = jest.fn().mockRejectedValue(new Error()); - let promise = database.restore(Assets.metaData, { truncate: true }); + const promise = database.restore(Assets.metaData, { truncate: true }); await expect(promise).rejects.toThrow(); }); @@ -308,7 +323,7 @@ describe("DatabaseService", () => { filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); - let promise = database.verify(Assets.metaData); + const promise = database.verify(Assets.metaData); await expect(promise).toResolve(); }); @@ -321,7 +336,7 @@ describe("DatabaseService", () => { mockWorkerWrapper.start = jest.fn().mockRejectedValue(new Error()); - let promise = database.verify(Assets.metaData); + const promise = database.verify(Assets.metaData); await expect(promise).rejects.toThrow(); }); diff --git a/__tests__/unit/core-snapshots/filesystem/filesystem.test.ts b/__tests__/unit/core-snapshots/filesystem/filesystem.test.ts index b5b4a859d4..d568e15570 100644 --- a/__tests__/unit/core-snapshots/filesystem/filesystem.test.ts +++ b/__tests__/unit/core-snapshots/filesystem/filesystem.test.ts @@ -1,11 +1,12 @@ import "jest-extended"; -import { dirSync, setGracefulCleanup } from "tmp"; import { Container } from "@arkecosystem/core-kernel"; -import { Sandbox } from "@packages/core-test-framework"; -import { Identifiers } from "@packages/core-snapshots/src/ioc"; -import { Filesystem } from "@packages/core-snapshots/src/filesystem/filesystem"; import { LocalFilesystem } from "@packages/core-kernel/src/services/filesystem/drivers/local"; +import { Filesystem } from "@packages/core-snapshots/src/filesystem/filesystem"; +import { Identifiers } from "@packages/core-snapshots/src/ioc"; +import { Sandbox } from "@packages/core-test-framework"; +import { cloneDeep } from "lodash"; +import { dirSync, setGracefulCleanup } from "tmp"; import { metaData } from "../__fixtures__/assets"; @@ -68,4 +69,81 @@ describe("Filesystem", () => { await expect(filesystem.readMetaData()).resolves.toEqual(metaData); }); }); + + describe("validateMetaData", () => { + let tmpMeta; + + beforeEach(() => { + const dir: string = dirSync().name; + const subdir: string = `${dir}/sub`; + + filesystem.getSnapshotPath = jest.fn().mockReturnValue(subdir); + + tmpMeta = cloneDeep(metaData); + }); + + it("should throw if codec is missing", async () => { + delete tmpMeta.codec; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if skipCompression is missing", async () => { + delete tmpMeta.skipCompression; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if blocks is missing", async () => { + delete tmpMeta.blocks; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if blocks.count is missing", async () => { + delete tmpMeta.blocks.count; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if transactions is missing", async () => { + delete tmpMeta.transactions; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if transactions.count is missing", async () => { + delete tmpMeta.transactions.count; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if rounds is missing", async () => { + delete tmpMeta.rounds; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + + it("should throw if rounds.count is missing", async () => { + delete tmpMeta.rounds.count; + + await expect(filesystem.writeMetaData(tmpMeta)).toResolve(); + + await expect(filesystem.readMetaData()).rejects.toThrowError(); + }); + }); }); diff --git a/__tests__/unit/core-snapshots/filesystem/stream-reader-writer.test.ts b/__tests__/unit/core-snapshots/filesystem/stream-reader-writer.test.ts index cbb081dcbc..116dbd4d8d 100644 --- a/__tests__/unit/core-snapshots/filesystem/stream-reader-writer.test.ts +++ b/__tests__/unit/core-snapshots/filesystem/stream-reader-writer.test.ts @@ -1,32 +1,30 @@ import "jest-extended"; -import pluralize from "pluralize"; -import { Readable } from "stream"; -import { dirSync, setGracefulCleanup } from "tmp"; -import { pascalize, decamelize } from "xcase"; - import { Container } from "@packages/core-kernel"; -import { Sandbox } from "@packages/core-test-framework/src"; - -import { StreamReader, StreamWriter } from "@packages/core-snapshots/src/filesystem"; -import { Identifiers } from "@packages/core-snapshots/src/ioc"; -import * as Contracts from "@packages/core-snapshots/src/contracts"; import { JSONCodec, MessagePackCodec } from "@packages/core-snapshots/src/codecs"; +import * as Contracts from "@packages/core-snapshots/src/contracts"; import * as Exceptions from "@packages/core-snapshots/src/exceptions"; +import { StreamReader, StreamWriter } from "@packages/core-snapshots/src/filesystem"; +import { Identifiers } from "@packages/core-snapshots/src/ioc"; +import { Sandbox } from "@packages/core-test-framework/src"; +import pluralize from "pluralize"; +import { Readable } from "stream"; +import { dirSync, setGracefulCleanup } from "tmp"; +import { decamelize, pascalize } from "xcase"; import { Assets } from "../__fixtures__"; class DbStream extends Readable { private count = 0; - readonly prefix: string = ""; + private readonly prefix: string = ""; - constructor(private table: string) { + public constructor(private table: string) { super({ objectMode: true }); this.prefix = table.charAt(0).toUpperCase() + table.slice(1, -1) + "_"; } - _read() { + public _read() { if (this.count !== Assets[this.table].length) { this.push(this.appendPrefix(Assets[this.table][this.count])); this.count++; @@ -36,11 +34,11 @@ class DbStream extends Readable { } private appendPrefix(entity: any) { - let itemToReturn = {}; + const itemToReturn = {}; - let item = entity; + const item = entity; - for (let key of Object.keys(item)) { + for (const key of Object.keys(item)) { itemToReturn[this.prefix + decamelize(key)] = item[key]; } @@ -122,8 +120,13 @@ describe("StreamReader and StreamWriter", () => { it(`Should throw error if stream not open`, async () => { file = dirSync({ mode: 0o777 }).name + "/" + table; - let dbStream = new DbStream(table as string); - let streamWriter = streamWriterFactory(dbStream, file, useCompression as boolean, getEncode(table, codec)); + const dbStream = new DbStream(table as string); + const streamWriter = streamWriterFactory( + dbStream, + file, + useCompression as boolean, + getEncode(table, codec), + ); await expect(streamWriter.write()).rejects.toThrow(Exceptions.Stream.StreamNotOpen); }); @@ -131,8 +134,13 @@ describe("StreamReader and StreamWriter", () => { it(`Should write all entities`, async () => { file = dirSync({ mode: 0o777 }).name + "/" + table; - let dbStream = new DbStream(table as string); - let streamWriter = streamWriterFactory(dbStream, file, useCompression as boolean, getEncode(table, codec)); + const dbStream = new DbStream(table as string); + const streamWriter = streamWriterFactory( + dbStream, + file, + useCompression as boolean, + getEncode(table, codec), + ); await expect(streamWriter.open()).toResolve(); @@ -140,12 +148,12 @@ describe("StreamReader and StreamWriter", () => { }); it(`Should read all entities`, async () => { - let streamReader = streamReaderFactory(file, useCompression as boolean, getDecode(table, codec)); + const streamReader = streamReaderFactory(file, useCompression as boolean, getDecode(table, codec)); await expect(streamReader.open()).toResolve(); // @ts-ignore - for (let item of Assets[table]) { + for (const item of Assets[table]) { // await expect(streamWriter.readNext()).resolves.toEqual(item); await expect(streamReader.readNext()).toResolve(); } @@ -153,14 +161,14 @@ describe("StreamReader and StreamWriter", () => { await expect(streamReader.readNext()).resolves.toBeNull(); }); - it(`Should throw error if stream not open`, async () => { - let streamReader = streamReaderFactory(file, useCompression as boolean, getDecode(table, codec)); + it(`Should throw error if stream is not open`, async () => { + const streamReader = streamReaderFactory(file, useCompression as boolean, getDecode(table, codec)); await expect(streamReader.readNext()).rejects.toThrow(Exceptions.Stream.StreamNotOpen); }); it(`Should throw error if file is not valid`, async () => { - let streamReader = streamReaderFactory( + const streamReader = streamReaderFactory( file + "invalid_file", useCompression as boolean, getDecode(table, codec), @@ -170,7 +178,7 @@ describe("StreamReader and StreamWriter", () => { }); it(`Should throw error if error in codec`, async () => { - let streamReader = streamReaderFactory(file, useCompression as boolean, () => { + const streamReader = streamReaderFactory(file, useCompression as boolean, () => { throw new Error(); }); diff --git a/__tests__/unit/core-snapshots/service-provider.test.ts b/__tests__/unit/core-snapshots/service-provider.test.ts index 3a53143667..b1158b56a6 100644 --- a/__tests__/unit/core-snapshots/service-provider.test.ts +++ b/__tests__/unit/core-snapshots/service-provider.test.ts @@ -22,8 +22,10 @@ beforeEach(() => { sandbox = new Sandbox(); sandbox.app.bind(Container.Identifiers.LogService).toConstantValue({}); +}); - sandbox.app.bind(Container.Identifiers.DatabaseConnection).toConstantValue({}); +afterEach(() => { + jest.clearAllMocks(); }); describe("ServiceProvider", () => { @@ -39,6 +41,14 @@ describe("ServiceProvider", () => { expect(spyOnCreateConnection).toHaveBeenCalled(); }); + it("should register is default connection is already active", async () => { + sandbox.app.bind(Container.Identifiers.DatabaseConnection).toConstantValue({}); + + await expect(serviceProvider.register()).toResolve(); + expect(spyOnGetCustomRepository).toHaveBeenCalledTimes(3); + expect(spyOnCreateConnection).toHaveBeenCalled(); + }); + it("should dispose", async () => { await expect(serviceProvider.register()).toResolve(); expect(spyOnGetCustomRepository).toHaveBeenCalled(); diff --git a/__tests__/unit/core-snapshots/workers/actions/__support__/helper.ts b/__tests__/unit/core-snapshots/workers/actions/__support__/helper.ts index 1984446cf0..209b93fe8f 100644 --- a/__tests__/unit/core-snapshots/workers/actions/__support__/helper.ts +++ b/__tests__/unit/core-snapshots/workers/actions/__support__/helper.ts @@ -1,13 +1,14 @@ +import { WorkerAction } from "@packages/core-snapshots/src/contracts"; import { Readable } from "stream"; +import WorkerThreads from "worker_threads"; import { decamelize } from "xcase"; + import { Assets } from "../../../__fixtures__"; -import { WorkerAction } from "@packages/core-snapshots/src/contracts"; -import WorkerThreads from "worker_threads"; export class ReadableStream extends Readable { private count = 0; - constructor(private prefix: string, private table: string) { + public constructor(private prefix: string, private table: string) { super({ objectMode: true }); } @@ -21,11 +22,11 @@ export class ReadableStream extends Readable { } private appendPrefix(entity: any) { - let itemToReturn = {}; + const itemToReturn = {}; - let item = entity; + const item = entity; - for (let key of Object.keys(item)) { + for (const key of Object.keys(item)) { itemToReturn[this.prefix + decamelize(key)] = item[key]; } diff --git a/__tests__/unit/core-snapshots/workers/actions/test-worker-action.test.ts b/__tests__/unit/core-snapshots/workers/actions/test-worker-action.test.ts index 1dc3a9786a..60d19d739e 100644 --- a/__tests__/unit/core-snapshots/workers/actions/test-worker-action.test.ts +++ b/__tests__/unit/core-snapshots/workers/actions/test-worker-action.test.ts @@ -1,7 +1,25 @@ import "jest-extended"; + import { TestWorkerAction } from "@packages/core-snapshots/src/workers/actions"; -let testWorkerAction = new TestWorkerAction(); +const testWorkerAction = new TestWorkerAction(); + +jest.mock("worker_threads", () => { + const { EventEmitter } = require("events"); + class ParentPort extends EventEmitter { + public constructor() { + super(); + } + + public postMessage(data) { + this.emit("message", data); + } + } + + return { + parentPort: new ParentPort(), + }; +}); describe("TestWorkerAction", () => { it("should start with no action", async () => { @@ -25,7 +43,7 @@ describe("TestWorkerAction", () => { table: "wait", }); - let promise = testWorkerAction.start(); + const promise = testWorkerAction.start(); await new Promise((resolve) => { setTimeout(() => { diff --git a/__tests__/unit/core-snapshots/workers/actions/worker-action.test.ts b/__tests__/unit/core-snapshots/workers/actions/worker-action.test.ts index 9e21382cbf..cb74aeeff2 100644 --- a/__tests__/unit/core-snapshots/workers/actions/worker-action.test.ts +++ b/__tests__/unit/core-snapshots/workers/actions/worker-action.test.ts @@ -1,24 +1,22 @@ import "jest-extended"; -import { dirSync, setGracefulCleanup } from "tmp"; -import { Connection } from "typeorm"; -import { Assets } from "../../__fixtures__"; -import { Types } from "@packages/crypto"; import { Container } from "@packages/core-kernel"; -import { Identifiers } from "@packages/core-snapshots/src/ioc"; -import { Sandbox } from "@packages/core-test-framework"; import * as Codecs from "@packages/core-snapshots/src/codecs"; -import * as Actions from "@packages/core-snapshots/src/workers/actions"; import { StreamReader, StreamWriter } from "@packages/core-snapshots/src/codecs"; +import { Identifiers } from "@packages/core-snapshots/src/ioc"; +import * as Actions from "@packages/core-snapshots/src/workers/actions"; +import { Sandbox } from "@packages/core-test-framework"; +import { Types } from "@packages/crypto"; +import { dirSync, setGracefulCleanup } from "tmp"; +import { Connection } from "typeorm"; + +import { Assets } from "../../__fixtures__"; import { ReadableStream, waitForMessage } from "./__support__"; -// @ts-ignore -import * as WorkerThreads from "worker_threads"; jest.mock("worker_threads", () => { const { EventEmitter } = require("events"); - // @ts-ignore class ParentPort extends EventEmitter { - constructor() { + public constructor() { super(); } @@ -42,11 +40,25 @@ let blockRepository: any; let transactionRepository: any; let roundRepository: any; -let stream = new ReadableStream("Block_", "blocks"); +const blocksStream = new ReadableStream("Block_", "blocks"); +const transactionsStream = new ReadableStream("Transaction_", "transactions"); +const roundsStream = new ReadableStream("Round_", "rounds"); class Repository { - getReadStream = jest.fn().mockResolvedValue(stream); - async save(val) {} + public constructor(private table: string) {} + + // public getReadStream = jest.fn().mockResolvedValue(stream); + public getReadStream() { + if (this.table === "blocks") { + return blocksStream; + } + if (this.table === "transactions") { + return transactionsStream; + } + + return roundsStream; + } + public async save(val) {} } beforeEach(() => { @@ -54,11 +66,11 @@ beforeEach(() => { isConnected: true, }; - blockRepository = new Repository(); + blockRepository = new Repository("blocks"); - transactionRepository = new Repository(); + transactionRepository = new Repository("transactions"); - roundRepository = new Repository(); + roundRepository = new Repository("rounds"); sandbox = new Sandbox(); @@ -159,14 +171,14 @@ describe("WorkerAction", () => { ["rounds", "json", false], ]; - describe.each(cases)("Blocks with [%s] codec and compression: [%s]", (table, codec, skipCompression) => { + describe.each(cases)("Table [%s] with codec [%s] and compression: [%s]", (table, codec, skipCompression) => { let dir: string; - let genesisBlockId = Assets.blocks[1].previousBlock; + const genesisBlockId = Assets.blocks[1].previousBlock; it(`should DUMP with [${codec}] codec`, async () => { dir = dirSync({ mode: 0o777 }).name; - let options = { + const options = { action: "dump", table: table as string, codec: codec as string, @@ -186,8 +198,8 @@ describe("WorkerAction", () => { }); it(`should VERIFY with [${codec}] codec`, async () => { - let options = { - action: "dump", + const options = { + action: "verify", table: table as string, codec: codec as string, start: 1, @@ -218,8 +230,8 @@ describe("WorkerAction", () => { }); it(`should RESTORE with [${codec}] codec`, async () => { - let options = { - action: "dump", + const options = { + action: "restore", table: table as string, codec: codec as string, start: 1, @@ -248,5 +260,38 @@ describe("WorkerAction", () => { }), ).toResolve(); }); + + it(`should RESTORE with [${codec}] codec - without verify`, async () => { + const options = { + action: "restore", + table: table as string, + codec: codec as string, + start: 1, + end: 100, + skipCompression: skipCompression as boolean, + filePath: dir + "/" + table, + genesisBlockId: genesisBlockId, + updateStep: 2, + verify: false, + network: "testnet" as Types.NetworkName, + }; + + restoreWorkerAction.init(options); + + await expect(waitForMessage(restoreWorkerAction, "start", undefined)).toResolve(); + + await new Promise((resolve) => { + setTimeout(() => { + resolve(); + }, 10); + }); + + await expect( + waitForMessage(restoreWorkerAction, "sync", { + nextCount: Number.POSITIVE_INFINITY, + height: 1, + }), + ).toResolve(); + }); }); }); diff --git a/__tests__/unit/core-snapshots/workers/worker-wrapper.test.ts b/__tests__/unit/core-snapshots/workers/worker-wrapper.test.ts index 5589260304..fb3d2dd14c 100644 --- a/__tests__/unit/core-snapshots/workers/worker-wrapper.test.ts +++ b/__tests__/unit/core-snapshots/workers/worker-wrapper.test.ts @@ -1,8 +1,8 @@ import "jest-extended"; -import { Worker } from "worker_threads"; import { WorkerWrapper } from "@packages/core-snapshots/src/workers/worker-wrapper"; import { EventEmitter } from "events"; +import { Worker } from "worker_threads"; let mockWorker: any; @@ -27,7 +27,7 @@ afterEach(() => { describe("WorkerWrapper", () => { describe("Terminate", () => { it("should call terminate", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); await workerWrapper.terminate(); @@ -37,9 +37,9 @@ describe("WorkerWrapper", () => { describe("Start", () => { it("should resolve on [started] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.start(); + const promise = workerWrapper.start(); await new Promise((resolve) => { setTimeout(() => { @@ -55,9 +55,9 @@ describe("WorkerWrapper", () => { }); it("should resolve on [exit] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.start(); + const promise = workerWrapper.start(); await new Promise((resolve) => { setTimeout(() => { @@ -70,9 +70,9 @@ describe("WorkerWrapper", () => { }); it("should reject on [exception] message event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.start(); + const promise = workerWrapper.start(); await new Promise((resolve) => { setTimeout(() => { @@ -88,9 +88,9 @@ describe("WorkerWrapper", () => { }); it("should reject on [any] message event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.start(); + const promise = workerWrapper.start(); await new Promise((resolve) => { setTimeout(() => { @@ -106,9 +106,9 @@ describe("WorkerWrapper", () => { }); it("should reject on [error] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.start(); + const promise = workerWrapper.start(); await new Promise((resolve) => { setTimeout(() => { @@ -123,11 +123,11 @@ describe("WorkerWrapper", () => { describe("Sync", () => { it("should resolve on [synced] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.sync({}); + const promise = workerWrapper.sync({}); - let data = { + const data = { dummy: "dummy_data", }; @@ -145,9 +145,9 @@ describe("WorkerWrapper", () => { }); it("should resolve with undefined on [exit] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.sync({}); + const promise = workerWrapper.sync({}); await new Promise((resolve) => { setTimeout(() => { @@ -159,10 +159,21 @@ describe("WorkerWrapper", () => { await expect(promise).resolves.toBeUndefined(); }); + it("should resolve if worker already exit", async () => { + const workerWrapper = new WorkerWrapper({}); + + // @ts-ignore + workerWrapper.isDone = true; + + const promise = workerWrapper.sync({}); + + await expect(promise).resolves.toBeUndefined(); + }); + it("should reject on [exception] message event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.sync({}); + const promise = workerWrapper.sync({}); await new Promise((resolve) => { setTimeout(() => { @@ -178,9 +189,9 @@ describe("WorkerWrapper", () => { }); it("should reject on [any] message event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.sync({}); + const promise = workerWrapper.sync({}); await new Promise((resolve) => { setTimeout(() => { @@ -196,9 +207,9 @@ describe("WorkerWrapper", () => { }); it("should reject on [error] event", async () => { - let workerWrapper = new WorkerWrapper({}); + const workerWrapper = new WorkerWrapper({}); - let promise = workerWrapper.sync({}); + const promise = workerWrapper.sync({}); await new Promise((resolve) => { setTimeout(() => { diff --git a/packages/core-snapshots/src/database-service.ts b/packages/core-snapshots/src/database-service.ts index eac4a6795c..c59897947f 100644 --- a/packages/core-snapshots/src/database-service.ts +++ b/packages/core-snapshots/src/database-service.ts @@ -37,10 +37,10 @@ export class SnapshotDatabaseService implements Database.DatabaseService { private skipCompression: boolean = false; private verifyData: boolean = false; - public init(codec?: string, skipCompression?: boolean, verify: boolean = false): void { - this.codec = codec || "default"; - this.skipCompression = skipCompression || false; - this.verifyData = verify || false; + public init(codec: string = "default", skipCompression: boolean = false, verify: boolean = false): void { + this.codec = codec; + this.skipCompression = skipCompression; + this.verifyData = verify; } public async truncate(): Promise { @@ -101,9 +101,9 @@ export class SnapshotDatabaseService implements Database.DatabaseService { throw err; } finally { - await blocksWorker?.terminate(); - await transactionsWorker?.terminate(); - await roundsWorker?.terminate(); + await blocksWorker.terminate(); + await transactionsWorker.terminate(); + await roundsWorker.terminate(); } } @@ -166,13 +166,9 @@ export class SnapshotDatabaseService implements Database.DatabaseService { for (const height of milestoneHeights) { const promises = [] as any; - // console.log("Run blocks with: ",{ nextValue: height, nextField: "height"}) promises.push(blocksWorker.sync({ nextValue: height, nextField: "height" })); if (result && result.height > 0) { - // console.log("Run transactions with: ", { nextCount: result.numberOfTransactions, height: result.height - 1 }) - // console.log("Run rounds with: ", { nextCount: Utils.roundCalculator.calculateRound(result.height).round, height: result.height - 1 }) - promises.push( transactionsWorker.sync({ nextCount: result.numberOfTransactions, height: result.height - 1 }), ); @@ -186,8 +182,6 @@ export class SnapshotDatabaseService implements Database.DatabaseService { result = (await Promise.all(promises))[0]; - // console.log("Result: ", result) - if (!result) { break; } @@ -199,9 +193,9 @@ export class SnapshotDatabaseService implements Database.DatabaseService { throw err; } finally { - await blocksWorker?.terminate(); - await transactionsWorker?.terminate(); - await roundsWorker?.terminate(); + await blocksWorker.terminate(); + await transactionsWorker.terminate(); + await roundsWorker.terminate(); } } @@ -212,18 +206,18 @@ export class SnapshotDatabaseService implements Database.DatabaseService { throw new Error("Database is empty"); } - const firstHeight = start || 1; - const lastHeight = end || lastBlock?.height || 1; + const firstRound = Utils.roundCalculator.calculateRound(start || 1); + const lastRound = Utils.roundCalculator.calculateRound(end || lastBlock.height); - const firstRound = Utils.roundCalculator.calculateRound(firstHeight); - const lastRound = Utils.roundCalculator.calculateRound(lastHeight); + const startHeight = firstRound.roundHeight; + const endHeight = lastRound.roundHeight - 1; - if (firstRound.roundHeight >= lastRound.roundHeight) { + if (startHeight >= endHeight) { throw new Error("Start round is greater or equal to end round"); } - const firstBlock = await this.blockRepository.findByHeight(firstRound.roundHeight); - lastBlock = await this.blockRepository.findByHeight(lastRound.roundHeight); + const firstBlock = await this.blockRepository.findByHeight(startHeight); + lastBlock = await this.blockRepository.findByHeight(endHeight); Utils.assert.defined(firstBlock); Utils.assert.defined(lastBlock); @@ -235,7 +229,7 @@ export class SnapshotDatabaseService implements Database.DatabaseService { firstRoundRound: firstRound.round, lastRoundRound: lastRound.round, - roundsCount: await this.roundRepository.countInRange(firstRound.round, lastRound.round), + roundsCount: await this.roundRepository.countInRange(firstRound.round, lastRound.round - 1), firstTransactionTimestamp: firstBlock.timestamp, lastTransactionTimestamp: lastBlock.timestamp, diff --git a/packages/core-snapshots/src/filesystem/filesystem.ts b/packages/core-snapshots/src/filesystem/filesystem.ts index bbd9e5d13f..2c6fa0b0aa 100644 --- a/packages/core-snapshots/src/filesystem/filesystem.ts +++ b/packages/core-snapshots/src/filesystem/filesystem.ts @@ -42,11 +42,11 @@ export class Filesystem { } private validateMetaData(meta: Meta.MetaData): void { - Utils.assert.defined(meta?.codec); - Utils.assert.defined(meta?.skipCompression); + Utils.assert.defined(meta.codec); + Utils.assert.defined(meta.skipCompression); - Utils.assert.defined(meta?.blocks?.count); - Utils.assert.defined(meta?.transactions?.count); - Utils.assert.defined(meta?.rounds?.count); + Utils.assert.defined(meta.blocks?.count); + Utils.assert.defined(meta.transactions?.count); + Utils.assert.defined(meta.rounds?.count); } } diff --git a/packages/core-snapshots/src/filesystem/stream-reader.ts b/packages/core-snapshots/src/filesystem/stream-reader.ts index cfc6cb9e42..78cec7e149 100644 --- a/packages/core-snapshots/src/filesystem/stream-reader.ts +++ b/packages/core-snapshots/src/filesystem/stream-reader.ts @@ -138,6 +138,7 @@ export class StreamReader { } let copyLength = 0; + /* istanbul ignore next */ if (this.offset + remaining <= this.length) { copyLength = remaining; } else { diff --git a/packages/core-snapshots/src/repositories/block-repository.ts b/packages/core-snapshots/src/repositories/block-repository.ts index 740548127a..34f93e82bd 100644 --- a/packages/core-snapshots/src/repositories/block-repository.ts +++ b/packages/core-snapshots/src/repositories/block-repository.ts @@ -8,17 +8,17 @@ import { Repository } from "../contracts"; export class BlockRepository extends Repositories.AbstractRepository implements Repository { public async getReadStream(start: number, end: number): Promise { return this.createQueryBuilder() - .where("height >= :start AND height < :end", { start, end }) + .where("height >= :start AND height <= :end", { start, end }) .orderBy("height", "ASC") .stream(); } public async countInRange(start: number, end: number): Promise { - return this.createQueryBuilder().where("height >= :start AND height < :end", { start, end }).getCount(); + return this.createQueryBuilder().where("height >= :start AND height <= :end", { start, end }).getCount(); } public async rollback(roundInfo: Contracts.Shared.RoundInfo): Promise { - const block = await this.findByHeight(roundInfo.roundHeight); + const block = await this.findByHeight(roundInfo.roundHeight - 1); if (!block) { throw new Error("Cannot find block on height " + roundInfo.roundHeight); @@ -43,7 +43,7 @@ export class BlockRepository extends Repositories.AbstractRepository :round", { round: roundInfo.round }) + .where("round > :round", { round: roundInfo.round - 1 }) .execute(); }); } diff --git a/packages/core-snapshots/src/repositories/round-repository.ts b/packages/core-snapshots/src/repositories/round-repository.ts index 778195653d..9caeab3cb4 100644 --- a/packages/core-snapshots/src/repositories/round-repository.ts +++ b/packages/core-snapshots/src/repositories/round-repository.ts @@ -7,12 +7,12 @@ import { Repository } from "../contracts"; export class RoundRepository extends Repositories.AbstractRepository implements Repository { public async getReadStream(start: number, end: number): Promise { return this.createQueryBuilder() - .where("round >= :start AND round < :end", { start, end }) + .where("round >= :start AND round <= :end", { start, end }) .orderBy("round", "ASC") .stream(); } public async countInRange(start: number, end: number): Promise { - return this.createQueryBuilder().where("round >= :start AND round < :end", { start, end }).getCount(); + return this.createQueryBuilder().where("round >= :start AND round <= :end", { start, end }).getCount(); } } diff --git a/packages/core-snapshots/src/repositories/transaction-repository.ts b/packages/core-snapshots/src/repositories/transaction-repository.ts index 60e0d60e88..9a0a62a506 100644 --- a/packages/core-snapshots/src/repositories/transaction-repository.ts +++ b/packages/core-snapshots/src/repositories/transaction-repository.ts @@ -7,13 +7,13 @@ import { Repository } from "../contracts"; export class TransactionRepository extends Repositories.AbstractRepository implements Repository { public async getReadStream(start: number, end: number): Promise { return this.createQueryBuilder() - .where("timestamp >= :start AND timestamp < :end", { start, end }) + .where("timestamp >= :start AND timestamp <= :end", { start, end }) .orderBy("timestamp", "ASC") .addOrderBy("sequence", "ASC") .stream(); } public async countInRange(start: number, end: number): Promise { - return this.createQueryBuilder().where("timestamp >= :start AND timestamp < :end", { start, end }).getCount(); + return this.createQueryBuilder().where("timestamp >= :start AND timestamp <= :end", { start, end }).getCount(); } } diff --git a/packages/core-snapshots/src/workers/actions/read-processor.ts b/packages/core-snapshots/src/workers/actions/read-processor.ts index 6a60310ee7..ea5c44b1eb 100644 --- a/packages/core-snapshots/src/workers/actions/read-processor.ts +++ b/packages/core-snapshots/src/workers/actions/read-processor.ts @@ -24,31 +24,24 @@ export class ReadProcessor { ) {} public sync(data: any): void { - // console.log("Sync", data); - this.nextField = data.nextField; this.nextValue = data.nextValue; this.nextCount = data.nextCount; if (data.height) { - /* istanbul ignore next */ Managers.configManager.setHeight(data.height); } + /* istanbul ignore next */ if (this.onResume) { - /* istanbul ignore next */ this.onResume(); } this.isRunning = true; this.emitCount(); - // parentPort?.postMessage({ - // action: "log", - // data: "Resume: " + JSON.stringify(data) - // }) - // On first message is not defined + /* istanbul ignore next */ if (this.callOnMessage) { this.callOnMessage(); } @@ -57,15 +50,15 @@ export class ReadProcessor { public async start() { await this.streamReader.open(); - parentPort?.postMessage({ + parentPort!.postMessage({ action: "started", }); await this.waitForSynchronization(false); const interval = setInterval(() => { + /* istanbul ignore next */ if (this.isRunning) { - /* istanbul ignore next */ this.emitCount(); } }, 500); @@ -76,11 +69,6 @@ export class ReadProcessor { while ((entity = await this.streamReader.readNext())) { this.count++; - // parentPort?.postMessage({ - // action: "log", - // data: "count: " + this.count - // }) - if (this.nextValue && entity[this.nextField] > this.nextValue!) { await this.waitOrContinue(this.count, previousEntity, entity); } @@ -110,7 +98,7 @@ export class ReadProcessor { } private emitCount(): void { - parentPort?.postMessage({ + parentPort!.postMessage({ action: "count", data: this.count, }); @@ -137,11 +125,6 @@ export class ReadProcessor { private waitForSynchronization(emit: boolean = true): Promise { return new Promise(async (resolve) => { - // parentPort?.postMessage({ - // action: "log", - // data: "Wait: " - // }) - if (this.onWait) { await this.onWait(); } @@ -159,11 +142,6 @@ export class ReadProcessor { } private emitSynchronized() { - // parentPort?.postMessage({ - // action: "log", - // data: "wait" - // }) - parentPort!.postMessage({ action: "synchronized", data: { diff --git a/packages/core-snapshots/src/workers/actions/restore-worker-action.ts b/packages/core-snapshots/src/workers/actions/restore-worker-action.ts index 64a5c67604..fa41df149c 100644 --- a/packages/core-snapshots/src/workers/actions/restore-worker-action.ts +++ b/packages/core-snapshots/src/workers/actions/restore-worker-action.ts @@ -9,7 +9,7 @@ export class RestoreWorkerAction extends AbstractWorkerAction { private entities = [] as any[]; public sync(data: any): void { - this.readProcessor?.sync(data); + this.readProcessor!.sync(data); } public async start() { @@ -26,7 +26,6 @@ export class RestoreWorkerAction extends AbstractWorkerAction { } if (this.options!.verify) { - /* istanbul ignore next */ verify(entity, previousEntity); } diff --git a/packages/core-snapshots/src/workers/actions/test-worker-action.ts b/packages/core-snapshots/src/workers/actions/test-worker-action.ts index 2a2c03fa9b..fff275685f 100644 --- a/packages/core-snapshots/src/workers/actions/test-worker-action.ts +++ b/packages/core-snapshots/src/workers/actions/test-worker-action.ts @@ -13,6 +13,7 @@ export class TestWorkerAction implements WorkerAction { } public sync(data: any): void { + /* istanbul ignore next */ if (this.resume) { this.resume(); } @@ -28,7 +29,7 @@ export class TestWorkerAction implements WorkerAction { } if (this.options.table === "wait") { - parentPort?.postMessage({ + parentPort!.postMessage({ action: "started", }); diff --git a/packages/core-snapshots/src/workers/actions/verify-worker-action.ts b/packages/core-snapshots/src/workers/actions/verify-worker-action.ts index 679223af68..fa1578783f 100644 --- a/packages/core-snapshots/src/workers/actions/verify-worker-action.ts +++ b/packages/core-snapshots/src/workers/actions/verify-worker-action.ts @@ -8,7 +8,7 @@ export class VerifyWorkerAction extends AbstractWorkerAction { private readProcessor: ReadProcessor | undefined = undefined; public sync(data: any): void { - this.readProcessor?.sync(data); + this.readProcessor!.sync(data); } public async start() { diff --git a/packages/core-snapshots/src/workers/worker-wrapper.ts b/packages/core-snapshots/src/workers/worker-wrapper.ts index ec40c19dda..031dd6d26a 100644 --- a/packages/core-snapshots/src/workers/worker-wrapper.ts +++ b/packages/core-snapshots/src/workers/worker-wrapper.ts @@ -45,9 +45,7 @@ export class WorkerWrapper extends EventEmitter { public sync(data: any): Promise { return new Promise((resolve, reject) => { if (this.isDone) { - /* istanbul ignore next */ resolve(); - /* istanbul ignore next */ return; } @@ -77,6 +75,7 @@ export class WorkerWrapper extends EventEmitter { private handleMessage(data) { // Actions: count, started, synced, exit, error this.emit(data.action, data.data); + /* istanbul ignore next */ if (data.action !== "count" && data.action !== "log") { this.emit("*", { name: data.action, data: data.data }); } diff --git a/packages/core-snapshots/src/workers/worker.ts b/packages/core-snapshots/src/workers/worker.ts index 1137677550..aa51ae593a 100644 --- a/packages/core-snapshots/src/workers/worker.ts +++ b/packages/core-snapshots/src/workers/worker.ts @@ -36,8 +36,8 @@ export const init = async () => { app = new Application(new Container.Container()); + /* istanbul ignore next */ if (_workerData.connection) { - /* istanbul ignore next */ app.bind(Identifiers.SnapshotDatabaseConnection).toConstantValue( await connect({ connection: _workerData.connection }), );