Skip to content

Commit

Permalink
feat(core-snapshots): support custom network configurations (#4305)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastijankuzner authored Feb 9, 2021
1 parent 9279275 commit 4570d9f
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import { StreamReader, StreamWriter } from "@packages/core-snapshots/src/codecs"
import { Identifiers } from "@packages/core-snapshots/src/ioc";
import * as Actions from "@packages/core-snapshots/src/workers/actions";
import { Sandbox } from "@packages/core-test-framework";
import { Types } from "@packages/crypto";
import { Managers } from "@packages/crypto";
import { dirSync, setGracefulCleanup } from "tmp";
import { Connection } from "typeorm";

import { Assets } from "../../__fixtures__";
import { ReadableStream, waitForMessage } from "./__support__";

jest.mock("worker_threads", () => {
Expand Down Expand Up @@ -47,7 +46,6 @@ const roundsStream = new ReadableStream("Round_", "rounds");
class Repository {
public constructor(private table: string) {}

// public getReadStream = jest.fn().mockResolvedValue(stream);
public getReadStream() {
if (this.table === "blocks") {
return blocksStream;
Expand All @@ -62,6 +60,8 @@ class Repository {
}

beforeEach(() => {
Managers.configManager.setFromPreset("testnet");

connection = {
isConnected: true,
};
Expand Down Expand Up @@ -173,7 +173,6 @@ describe("WorkerAction", () => {

describe.each(cases)("Table [%s] with codec [%s] and compression: [%s]", (table, codec, skipCompression) => {
let dir: string;
const genesisBlockId = Assets.blocks[1].previousBlock;

it(`should DUMP with [${codec}] codec`, async () => {
dir = dirSync({ mode: 0o777 }).name;
Expand All @@ -186,10 +185,8 @@ describe("WorkerAction", () => {
end: 100,
skipCompression: skipCompression as boolean,
filePath: dir + "/" + table,
genesisBlockId: genesisBlockId,
updateStep: 1,
verify: true,
network: "testnet" as Types.NetworkName,
};

dumpWorkerAction.init(options);
Expand All @@ -206,10 +203,8 @@ describe("WorkerAction", () => {
end: 100,
skipCompression: skipCompression as boolean,
filePath: dir + "/" + table,
genesisBlockId: genesisBlockId,
updateStep: 1,
verify: true,
network: "testnet" as Types.NetworkName,
};

verifyWorkerAction.init(options);
Expand Down Expand Up @@ -238,10 +233,8 @@ describe("WorkerAction", () => {
end: 100,
skipCompression: skipCompression as boolean,
filePath: dir + "/" + table,
genesisBlockId: genesisBlockId,
updateStep: 1,
verify: true,
network: "testnet" as Types.NetworkName,
};

restoreWorkerAction.init(options);
Expand Down Expand Up @@ -270,10 +263,8 @@ describe("WorkerAction", () => {
end: 100,
skipCompression: skipCompression as boolean,
filePath: dir + "/" + table,
genesisBlockId: genesisBlockId,
updateStep: 2,
verify: false,
network: "testnet" as Types.NetworkName,
};

restoreWorkerAction.init(options);
Expand Down
36 changes: 18 additions & 18 deletions __tests__/unit/core-snapshots/workers/worker-thread.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import "jest-extended";

import { Worker } from "worker_threads";
import { resolve } from "path";
import * as Contracts from "@packages/core-snapshots/src/contracts";
import { Managers } from "@packages/crypto";
import { resolve } from "path";
import { Worker } from "worker_threads";

let _workerData: Contracts.Worker.WorkerData = {
const _workerData: Contracts.Worker.WorkerData = {
actionOptions: {
action: "test",
table: "blocks",
Expand All @@ -13,14 +14,13 @@ let _workerData: Contracts.Worker.WorkerData = {
end: 100,
skipCompression: false,
filePath: "",
genesisBlockId: "123",
updateStep: 1000,
verify: true,
network: "testnet",
},
networkConfig: Managers.configManager.all()!,
};

let eventListener = {
const eventListener = {
onExit: (data: any) => {},
onError: (data: any) => {},
onMessage: (data: any) => {},
Expand All @@ -42,7 +42,7 @@ const appendListeners = (worker: Worker) => {

const waitForEvent = (worker: Worker, message?: any): Promise<void> => {
return new Promise<void>((resolve) => {
let onEvent = (event, data) => {
const onEvent = (event, data) => {
worker.removeAllListeners();

resolve();
Expand All @@ -68,7 +68,7 @@ let spyOnExit;
let spyOnError;
let spyOnMessage;

let workerPath = resolve("packages/core-snapshots/dist/workers/worker.js");
const workerPath = resolve("packages/core-snapshots/dist/workers/worker.js");

beforeEach(() => {
spyOnExit = jest.spyOn(eventListener, "onExit");
Expand All @@ -82,7 +82,7 @@ afterEach(() => {

describe("Worker", () => {
it("should exit without error", async () => {
let worker = new Worker(workerPath, { workerData: _workerData });
const worker = new Worker(workerPath, { workerData: _workerData });

appendListeners(worker);

Expand All @@ -94,11 +94,11 @@ describe("Worker", () => {
});

it("should run init and start Action", async () => {
let worker = new Worker(workerPath, { workerData: _workerData });
const worker = new Worker(workerPath, { workerData: _workerData });

appendListeners(worker);

let message: Contracts.Worker.WorkerMessage = {
const message: Contracts.Worker.WorkerMessage = {
action: "start",
data: {},
};
Expand All @@ -113,14 +113,14 @@ describe("Worker", () => {
});

it("should catch unhandled rejection and pass it in message", async () => {
let tmpWorkerData = { ..._workerData };
const tmpWorkerData = { ..._workerData };
tmpWorkerData.actionOptions.table = "throwError";

let worker = new Worker(workerPath, { workerData: _workerData });
const worker = new Worker(workerPath, { workerData: _workerData });

appendListeners(worker);

let message: Contracts.Worker.WorkerMessage = {
const message: Contracts.Worker.WorkerMessage = {
action: "start",
data: {},
};
Expand All @@ -138,12 +138,12 @@ describe("Worker", () => {
});

it("should throw exception", async () => {
let tmpWorkerData = { ..._workerData };
const tmpWorkerData = { ..._workerData };
tmpWorkerData.actionOptions.table = "wait";

let worker = new Worker(workerPath, { workerData: _workerData });
const worker = new Worker(workerPath, { workerData: _workerData });

let message: Contracts.Worker.WorkerMessage = {
const message: Contracts.Worker.WorkerMessage = {
action: "start",
data: {},
};
Expand All @@ -158,7 +158,7 @@ describe("Worker", () => {

appendListeners(worker);

let message2 = {
const message2 = {
action: "sync",
data: {
execute: "throwError",
Expand Down
12 changes: 4 additions & 8 deletions __tests__/unit/core-snapshots/workers/worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import "jest-extended";

// @ts-ignore
import { workerData } from "worker_threads";

import { init, dispose } from "@packages/core-snapshots/src/workers/worker";
import { DumpWorkerAction } from "@packages/core-snapshots/src/workers/actions/dump-worker-action";
import { dispose, init } from "@packages/core-snapshots/src/workers/worker";

jest.mock("worker_threads", () => {
return {
Expand All @@ -17,10 +14,9 @@ jest.mock("worker_threads", () => {
codec: "default",
skipCompression: false,
filePath: "",
genesisBlockId: "123",
updateStep: 1000,
network: "testnet",
},
networkConfig: require("@packages/crypto").Managers.configManager.all(),
},
};
});
Expand All @@ -33,7 +29,7 @@ describe("Worker", () => {
it("should run worker", async () => {
DumpWorkerAction.prototype.start = jest.fn();

await init();
await dispose();
await expect(init()).toResolve();
await expect(dispose()).toResolve();
});
});
8 changes: 2 additions & 6 deletions packages/core-snapshots/src/contracts/worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Types } from "@arkecosystem/crypto";
import { Interfaces } from "@arkecosystem/crypto";

export interface WorkerData {
actionOptions: ActionOptions;
networkConfig: Interfaces.NetworkConfig;
connection?: any;
}

Expand All @@ -13,13 +14,8 @@ export interface ActionOptions {
codec: string;
skipCompression: boolean;
verify: boolean;

filePath: string;

genesisBlockId?: string;
updateStep?: number;

network: Types.NetworkName;
}

export interface WorkerMessage {
Expand Down
12 changes: 4 additions & 8 deletions packages/core-snapshots/src/database-service.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Models } from "@arkecosystem/core-database";
import { Container, Contracts, Providers, Utils } from "@arkecosystem/core-kernel";
import { Blocks, Interfaces, Managers, Types } from "@arkecosystem/crypto";
import { Blocks, Interfaces, Managers } from "@arkecosystem/crypto";

import { Database, Meta, Options, Worker } from "./contracts";
import { Database, Meta, Options } from "./contracts";
import { Filesystem } from "./filesystem/filesystem";
import { Identifiers } from "./ioc";
import { ProgressDispatcher } from "./progress-dispatcher";
Expand Down Expand Up @@ -265,9 +265,8 @@ export class SnapshotDatabaseService implements Database.DatabaseService {
}

private prepareWorkerData(action: string, table: string, meta: Meta.MetaData): any {
const result: Worker.WorkerData = {
return {
actionOptions: {
network: this.app.network() as Types.NetworkName,
action: action,
table: table,
start: meta[table].start,
Expand All @@ -276,16 +275,13 @@ export class SnapshotDatabaseService implements Database.DatabaseService {
skipCompression: this.skipCompression,
verify: this.verifyData,
filePath: `${this.filesystem.getSnapshotPath()}${table}`,
genesisBlockId: Managers.configManager.get("genesisBlock").id,
updateStep: this.configuration.getOptional("updateStep", 1000),
},
networkConfig: Managers.configManager.all()!,
connection: this.configuration.get("connection"),
};

return result;
}

// @ts-ignore
private async prepareProgressDispatcher(worker: WorkerWrapper, table: string, count: number): Promise<Function> {
const progressDispatcher = this.app.get<ProgressDispatcher>(Identifiers.ProgressDispatcher);

Expand Down
2 changes: 0 additions & 2 deletions packages/core-snapshots/src/snapshot-service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ export class SnapshotService implements Contracts.Snapshot.SnapshotService {
Utils.assert.defined<string>(options.network);
Utils.assert.defined<string>(options.blocks);

// this.filesystem.setNetwork(options.network);
this.filesystem.setSnapshot(options.blocks);

if (!(await this.filesystem.snapshotExists())) {
Expand Down Expand Up @@ -79,7 +78,6 @@ export class SnapshotService implements Contracts.Snapshot.SnapshotService {
Utils.assert.defined<string>(options.network);
Utils.assert.defined<string>(options.blocks);

// this.filesystem.setNetwork(options.network);
this.filesystem.setSnapshot(options.blocks);

if (!(await this.filesystem.snapshotExists())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ export abstract class AbstractWorkerAction implements WorkerAction {
protected codec?: string;
protected skipCompression?: boolean;
protected filePath?: string;
protected genesisBlockId?: string;
protected updateStep?: number;

protected options?: Worker.ActionOptions;
Expand All @@ -28,12 +27,9 @@ export abstract class AbstractWorkerAction implements WorkerAction {
this.codec = options.codec;
this.skipCompression = options.skipCompression;
this.filePath = options.filePath;
this.genesisBlockId = options.genesisBlockId;
this.updateStep = options.updateStep;

this.options = options;

Managers.configManager.setFromPreset(options.network);
}

protected getRepository(): Repository {
Expand Down Expand Up @@ -80,7 +76,7 @@ export abstract class AbstractWorkerAction implements WorkerAction {

protected applyGenesisBlockFix(block: Models.Block): void {
if (block.height === 1) {
block.id = this.genesisBlockId!;
block.id = Managers.configManager.get<string>("genesisBlock.id")!;
}
}

Expand Down
4 changes: 3 additions & 1 deletion packages/core-snapshots/src/workers/worker.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Models, Utils } from "@arkecosystem/core-database";
import { Container } from "@arkecosystem/core-kernel";
import { Transactions as MagistrateTransactions } from "@arkecosystem/core-magistrate-crypto";
import { Transactions } from "@arkecosystem/crypto";
import { Managers, Transactions } from "@arkecosystem/crypto";
import { Connection, createConnection, getCustomRepository } from "typeorm";
import { parentPort, workerData } from "worker_threads";

Expand All @@ -27,6 +27,8 @@ const connect = async (options: any): Promise<Connection> => {
};

export const init = async () => {
Managers.configManager.setConfig(_workerData.networkConfig);

Transactions.TransactionRegistry.registerTransactionType(MagistrateTransactions.BridgechainRegistrationTransaction);
Transactions.TransactionRegistry.registerTransactionType(MagistrateTransactions.BridgechainResignationTransaction);
Transactions.TransactionRegistry.registerTransactionType(MagistrateTransactions.BridgechainUpdateTransaction);
Expand Down

0 comments on commit 4570d9f

Please sign in to comment.