Skip to content

Commit

Permalink
feat(core-kernel): implement pipeline service
Browse files Browse the repository at this point in the history
  • Loading branch information
faustbrian authored and Brian Faust committed Nov 15, 2019
1 parent 292d9e8 commit be25226
Show file tree
Hide file tree
Showing 6 changed files with 227 additions and 3 deletions.
134 changes: 134 additions & 0 deletions __tests__/unit/core-kernel/services/pipeline/pipeline.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import "jest-extended";

import { Application, Container } from "@packages/core-kernel";
import { Pipeline, Stage } from "@packages/core-kernel/src/services/pipeline";

let app: Application;
let pipeline: Pipeline;
beforeEach(() => {
app = new Application(new Container.Container());

pipeline = new Pipeline();
});

describe("Pipeline", () => {
describe("Class", () => {
describe("instantiated", () => {
it("should apply all stages (async)", async () => {
class RemoveDash implements Stage {
async process(payload: string) {
return payload.replace("_", "");
}
}

class RemoveUnderscore implements Stage {
async process(payload: string) {
return payload.replace("-", " ");
}
}

const actual: string = await pipeline
.pipe(new RemoveDash())
.pipe(new RemoveUnderscore())
.process("_Hello-World");

expect(actual).toBe("Hello World");
});

it("should apply all stages (sync)", () => {
class RemoveDash implements Stage {
process(payload: string) {
return payload.replace("_", "");
}
}

class RemoveUnderscore implements Stage {
process(payload: string) {
return payload.replace("-", " ");
}
}

const actual: string = pipeline
.pipe(new RemoveDash())
.pipe(new RemoveUnderscore())
.processSync("_Hello-World");

expect(actual).toBe("Hello World");
});
});

describe("resolved", () => {
it("should apply all stages (async)", async () => {
@Container.injectable()
class RemoveDash implements Stage {
async process(payload: string) {
return payload.replace("_", "");
}
}

@Container.injectable()
class RemoveUnderscore implements Stage {
async process(payload: string) {
return payload.replace("-", " ");
}
}

const actual: string = await pipeline
.pipe(app.resolve(RemoveDash))
.pipe(app.resolve(RemoveUnderscore))
.process("_Hello-World");

expect(actual).toBe("Hello World");
});

it("should apply all stages (sync)", () => {
@Container.injectable()
class RemoveDash implements Stage {
process(payload: string) {
return payload.replace("_", "");
}
}

@Container.injectable()
class RemoveUnderscore implements Stage {
process(payload: string) {
return payload.replace("-", " ");
}
}

const actual: string = pipeline
.pipe(app.resolve(RemoveDash))
.pipe(app.resolve(RemoveUnderscore))
.processSync("_Hello-World");

expect(actual).toBe("Hello World");
});
});
});

describe("Function", () => {
it("should apply all stages (async)", async () => {
const removeDash = async (payload: string) => payload.replace("_", "");
const removeUnderscore = async (payload: string) => payload.replace("-", " ");

const actual: string = await pipeline
.pipe(removeDash)
.pipe(removeUnderscore)
.process("_Hello-World");

expect(actual).toBe("Hello World");
});

it("should apply all stages (sync)", () => {
const removeDash = (payload: string) => payload.replace("_", "");
const removeUnderscore = (payload: string) => payload.replace("-", " ");

const actual: string = pipeline
.pipe(removeDash)
.pipe(removeUnderscore)
.processSync("_Hello-World");

expect(actual).toBe("Hello World");
});
});
});
4 changes: 1 addition & 3 deletions packages/core-forger/src/forger-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,6 @@ export class ForgerManager {

return this.checkLater(Crypto.Slots.getTimeInMsUntilNextSlot());
} catch (error) {
AppUtils.assert.defined<Contracts.P2P.CurrentRound>(this.round);

if (error instanceof HostNoResponseError || error instanceof RelayCommunicationError) {
if (error.message.includes("blockchain isn't ready")) {
this.logger.info("Waiting for relay to become ready.");
Expand All @@ -127,7 +125,7 @@ export class ForgerManager {
} else {
this.logger.error(error.stack);

if (!AppUtils.isEmpty(this.round)) {
if (this.round) {
this.logger.info(
`Round: ${this.round.current.toLocaleString()}, height: ${this.round.lastBlock.height.toLocaleString()}`,
);
Expand Down
1 change: 1 addition & 0 deletions packages/core-kernel/src/ioc/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export const Identifiers: Record<string, symbol> = {
FilesystemService: Symbol.for("Service<Filesystem>"),
LogService: Symbol.for("Service<Log>"),
MixinService: Symbol.for("Service<Mixin>"),
PipelineService: Symbol.for("Service<Pipeline>"),
QueueService: Symbol.for("Service<Queue>"),
ScheduleService: Symbol.for("Service<Schedule>"),
SnapshotService: Symbol.for("Service<Snapshot>"),
Expand Down
17 changes: 17 additions & 0 deletions packages/core-kernel/src/services/pipeline/contracts.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/**
* @export
* @interface Stage
*/
export interface Stage {
/**
* Process the payload.
*
* @remarks
* We generally avoid the use of any but with pipeline stages the payload could be any of
* that type until it hits the end of the pipeline where it is returned in its final form.
*
* @param {*} payload
* @memberof Stage
*/
process(payload: any);
}
2 changes: 2 additions & 0 deletions packages/core-kernel/src/services/pipeline/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from "./contracts";
export * from "./pipeline";
72 changes: 72 additions & 0 deletions packages/core-kernel/src/services/pipeline/pipeline.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { injectable } from "../../ioc";
import { Stage } from "./contracts";

/**
* @export
* @class Pipeline
*/
@injectable()
export class Pipeline {
/**
* Creates an instance of Pipeline.
*
* @param {(Array<Function | Stage>)} stages
* @memberof Pipeline
*/
public constructor(private readonly stages: Array<Function | Stage> = []) {}

/**
* Create a new pipeline with an appended stage.
*
* @param {Function} stage
* @returns {Pipeline}
* @memberof Pipeline
*/
public pipe(stage: Function | Stage): Pipeline {
const stages: Array<Function | Stage> = [...this.stages];

stages.push(stage);

return new Pipeline(stages);
}

/**
* Process the payload. (Asynchronous)
*
* @template T
* @param {T} payload
* @returns {(Promise<T | undefined>)}
* @memberof Pipeline
*/
public async process<T>(payload: T): Promise<T | undefined> {
for (const stage of this.stages) {
if ("process" in stage) {
payload = await stage.process(payload);
} else if (typeof stage === "function") {
payload = await stage(payload);
}
}

return payload;
}

/**
* Process the payload. (Synchronous)
*
* @template T
* @param {T} payload
* @returns {(T | undefined)}
* @memberof Pipeline
*/
public processSync<T>(payload: T): T | undefined {
for (const stage of this.stages) {
if ("process" in stage) {
payload = stage.process(payload);
} else if (typeof stage === "function") {
payload = stage(payload);
}
}

return payload;
}
}

0 comments on commit be25226

Please sign in to comment.