From 84c478849f2eebb6549fa299d2b19c0a69914037 Mon Sep 17 00:00:00 2001 From: Brian Faust Date: Thu, 28 Nov 2019 15:20:25 +0200 Subject: [PATCH] refactor(core-kernel): expect actions, jobs and event listeners to be classes --- .../service-provider-repository.test.ts | 27 +- .../services/events/drivers/memory.test.ts | 568 ++++++++---------- .../services/queue/drivers/memory.test.ts | 172 +----- .../services/triggers/triggers.test.ts | 97 +-- packages/core-blockchain/src/blockchain.ts | 2 - packages/core-forger/src/delegate-tracker.ts | 22 +- packages/core-forger/src/service-provider.ts | 5 +- packages/core-kernel/package.json | 1 - .../boot-service-providers.ts | 36 +- .../bootstrap/service-providers/listeners.ts | 81 +++ .../src/contracts/kernel/events.ts | 9 +- .../core-kernel/src/contracts/kernel/queue.ts | 83 +-- .../src/services/events/drivers/memory.ts | 46 +- .../src/services/events/drivers/null.ts | 3 +- .../src/services/queue/drivers/memory.ts | 175 +++--- .../src/services/queue/drivers/null.ts | 105 +--- .../src/services/schedule/block-job.ts | 9 +- .../src/services/schedule/listeners.ts | 44 ++ .../src/services/triggers/action.ts | 19 +- .../src/services/triggers/triggers.ts | 10 +- packages/core-kernel/src/types/events.ts | 3 - packages/core-kernel/src/types/index.ts | 5 +- packages/core-p2p/src/event-listener.ts | 17 +- packages/core-p2p/src/listeners.ts | 83 +++ packages/core-p2p/src/peer-processor.ts | 14 +- .../core-transaction-pool/src/connection.ts | 3 +- .../core-transaction-pool/src/listeners.ts | 23 + packages/core-webhooks/src/listener.ts | 6 +- .../core-webhooks/src/service-provider.ts | 6 +- yarn.lock | 20 - 30 files changed, 824 insertions(+), 870 deletions(-) create mode 100644 packages/core-kernel/src/bootstrap/service-providers/listeners.ts create mode 100644 packages/core-kernel/src/services/schedule/listeners.ts delete mode 100644 packages/core-kernel/src/types/events.ts create mode 100644 packages/core-p2p/src/listeners.ts create mode 100644 packages/core-transaction-pool/src/listeners.ts diff --git a/__tests__/unit/core-kernel/providers/service-provider-repository.test.ts b/__tests__/unit/core-kernel/providers/service-provider-repository.test.ts index 6eccd29536..ec6d93096a 100644 --- a/__tests__/unit/core-kernel/providers/service-provider-repository.test.ts +++ b/__tests__/unit/core-kernel/providers/service-provider-repository.test.ts @@ -1,11 +1,20 @@ import "jest-extended"; +import { Contracts } from "@packages/core-kernel/src"; import { Application } from "@packages/core-kernel/src/application"; import { KernelEvent } from "@packages/core-kernel/src/enums/events"; import { Container, Identifiers, interfaces } from "@packages/core-kernel/src/ioc"; import { ServiceProvider, ServiceProviderRepository } from "@packages/core-kernel/src/providers"; import { MemoryEventDispatcher } from "@packages/core-kernel/src/services/events/drivers/memory"; +class StubListener implements Contracts.Kernel.EventListener { + public constructor(private readonly method?) {} + + public handle(): void { + this.method(); + } +} + class StubServiceProvider extends ServiceProvider { public async register(): Promise {} @@ -121,15 +130,15 @@ describe("ServiceProviderRepository", () => { const spyRegister = jest.spyOn(serviceProvider, "register"); serviceProviderRepository.set("stub", serviceProvider); - let fired: boolean = false; + let fired: jest.Mock = jest.fn(); app.get(Identifiers.EventDispatcherService).listenOnce( KernelEvent.ServiceProviderRegistered, - () => (fired = true), + new StubListener(fired), ); await serviceProviderRepository.register("stub"); - expect(fired).toBeTrue(); + expect(fired).toHaveBeenCalled(); expect(spyRegister).toHaveBeenCalled(); }); @@ -138,15 +147,15 @@ describe("ServiceProviderRepository", () => { const spyBoot = jest.spyOn(serviceProvider, "boot"); serviceProviderRepository.set("stub", serviceProvider); - let fired: boolean = false; + let fired: jest.Mock = jest.fn(); app.get(Identifiers.EventDispatcherService).listenOnce( KernelEvent.ServiceProviderBooted, - () => (fired = true), + new StubListener(fired), ); await serviceProviderRepository.boot("stub"); - expect(fired).toBeTrue(); + expect(fired).toHaveBeenCalled(); expect(spyBoot).toHaveBeenCalled(); expect(serviceProviderRepository.loaded("stub")).toBeTrue(); expect(serviceProviderRepository.failed("stub")).toBeFalse(); @@ -158,15 +167,15 @@ describe("ServiceProviderRepository", () => { const spyDispose = jest.spyOn(serviceProvider, "dispose"); serviceProviderRepository.set("stub", serviceProvider); - let fired: boolean = false; + let fired: jest.Mock = jest.fn(); app.get(Identifiers.EventDispatcherService).listenOnce( KernelEvent.ServiceProviderDisposed, - () => (fired = true), + new StubListener(fired), ); await serviceProviderRepository.dispose("stub"); - expect(fired).toBeTrue(); + expect(fired).toHaveBeenCalled(); expect(spyDispose).toHaveBeenCalled(); expect(serviceProviderRepository.loaded("stub")).toBeFalse(); expect(serviceProviderRepository.failed("stub")).toBeFalse(); diff --git a/__tests__/unit/core-kernel/services/events/drivers/memory.test.ts b/__tests__/unit/core-kernel/services/events/drivers/memory.test.ts index c67f5879fa..4bf81e6c10 100644 --- a/__tests__/unit/core-kernel/services/events/drivers/memory.test.ts +++ b/__tests__/unit/core-kernel/services/events/drivers/memory.test.ts @@ -1,438 +1,382 @@ import "jest-extended"; -import { MemoryEventDispatcher } from "@packages/core-kernel/src/services/events/drivers/memory"; -let emitter: MemoryEventDispatcher; -beforeEach(() => (emitter = new MemoryEventDispatcher())); +import { Contracts } from "@packages/core-kernel/src"; +import { MemoryEventDispatcher } from "@packages/core-kernel/src/services/events/drivers/memory"; -describe(".listen", () => { - it("should add an event listener", async () => { - const calls: number[] = []; - emitter.listen("firstEvent", () => calls.push(1)); - emitter.listen("firstEvent", () => calls.push(2)); +class DummyClass implements Contracts.Kernel.EventListener { + public constructor(private readonly method?) {} - await emitter.dispatch("firstEvent"); + public handle(): void { + this.method(); + } +} - expect(calls).toEqual([1, 2]); - }); +let emitter: MemoryEventDispatcher; - it("should return an unsubcribe method for an event listener", async () => { - const calls: number[] = []; - const listener = () => calls.push(1); +beforeEach(() => (emitter = new MemoryEventDispatcher())); - const off = emitter.listen("firstEvent", listener); - await emitter.dispatch("firstEvent"); - expect(calls).toEqual([1]); +describe("MemoryEventDispatcher", () => { + let dummyCaller: jest.Mock; + let dummyListener: any; - off(); - await emitter.dispatch("firstEvent"); - expect(calls).toEqual([1]); + beforeEach(() => { + dummyCaller = jest.fn(); + dummyListener = new DummyClass(dummyCaller); }); - it("should prevent duplicate listeners", async () => { - const calls: number[] = []; - const listener = () => calls.push(1); - - emitter.listen("firstEvent", listener); - emitter.listen("firstEvent", listener); - emitter.listen("firstEvent", listener); + describe(".listen", () => { + it("should add an event listener", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); - expect(calls).toEqual([1]); - }); + expect(dummyCaller).toHaveBeenCalledTimes(2); + }); - describe("Wildcard", () => { - it("should add a wildcard listener", async () => { - emitter.listen("*", ({ name, data }) => { - expect(name).toBe("firstEvent"); - expect(data).toEqual(true); - }); + it("should return an unsubcribe method for an event listener", async () => { + const off = emitter.listen("firstEvent", dummyListener); + await emitter.dispatch("firstEvent"); + expect(dummyCaller).toHaveBeenCalled(); - await emitter.dispatch("firstEvent", true); - await emitter.dispatchSeq("firstEvent", true); + off(); + await emitter.dispatch("firstEvent"); + expect(dummyCaller).toHaveBeenCalledTimes(1); }); - }); -}); -describe(".listenMany", () => { - it("should add many event listeners", async () => { - const calls: number[] = []; - emitter.listenMany([ - ["firstEvent", () => calls.push(1)], - ["firstEvent", () => calls.push(2)], - ]); + it("should prevent duplicate listeners", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("firstEvent", dummyListener); + emitter.listen("firstEvent", dummyListener); - await emitter.dispatch("firstEvent"); - - expect(calls).toEqual([1, 2]); - }); + await emitter.dispatch("firstEvent"); - it("should prevent duplicate listeners", async () => { - const calls: number[] = []; - const listener = () => calls.push(1); + expect(dummyCaller).toHaveBeenCalled(); + }); - emitter.listenMany(new Array(5).fill(["firstEvent", listener])); + describe("Wildcard", () => { + it("should add a wildcard listener", async () => { + emitter.listen("*", dummyListener); - await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatchSeq("firstEvent"); - expect(calls).toEqual([1]); + expect(dummyCaller).toHaveBeenCalledTimes(2); + }); + }); }); -}); - -describe(".listenOnce", () => { - it("should listen once", async () => { - let unicorn: boolean = false; - expect(unicorn).toBeFalse(); + describe(".listenMany", () => { + it("should add many event listeners", async () => { + emitter.listenMany([ + ["firstEvent", dummyListener], + ["firstEvent", new DummyClass(dummyCaller)], + ]); - emitter.listenOnce("firstEvent", ({ data }) => (unicorn = data)); + await emitter.dispatch("firstEvent"); - emitter.dispatchSync("firstEvent", true); + expect(dummyCaller).toHaveBeenCalledTimes(2); + }); - expect(unicorn).toBeTrue(); + it("should prevent duplicate listeners", async () => { + emitter.listenMany(new Array(5).fill(["firstEvent", dummyListener])); - emitter.dispatchSync("firstEvent", false); + await emitter.dispatch("firstEvent"); - expect(unicorn).toBeTrue(); + expect(dummyCaller).toHaveBeenCalled(); + }); }); -}); -describe(".forget", () => { - it("should remove an event listener", () => { - const calls: number[] = []; - const listener = () => calls.push(1); + describe(".listenOnce", () => { + it("should listen once", async () => { + emitter.listenOnce("firstEvent", dummyListener); - emitter.listen("firstEvent", listener); + emitter.dispatchSync("firstEvent"); + emitter.dispatchSync("firstEvent"); + emitter.dispatchSync("firstEvent"); - emitter.dispatchSync("firstEvent"); + expect(dummyCaller).toHaveBeenCalled(); - expect(calls).toEqual([1]); + emitter.dispatchSync("firstEvent"); + emitter.dispatchSync("firstEvent"); + emitter.dispatchSync("firstEvent"); - emitter.forget("firstEvent", listener); + expect(dummyCaller).toHaveBeenCalledTimes(1); + }); + }); - emitter.dispatchSync("firstEvent"); + describe(".forget", () => { + it("should remove an event listener", () => { + emitter.listen("firstEvent", dummyListener); - expect(calls).toEqual([1]); - }); -}); + emitter.dispatchSync("firstEvent"); -describe(".dispatch", () => { - it("should emit one event", async () => { - emitter.listen("firstEvent", ({ data }) => expect(data).toEqual(true)); + expect(dummyCaller).toHaveBeenCalled(); - await emitter.dispatch("firstEvent", true); - }); + emitter.forget("firstEvent", dummyListener); - it("should emit multiple events", async () => { - let count = 0; + emitter.dispatchSync("firstEvent"); - emitter.listen("firstEvent", () => { - if (++count >= 5) { - expect(count).toBe(5); - } + expect(dummyCaller).toHaveBeenCalledTimes(1); }); - - await emitter.dispatch("firstEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("firstEvent"); }); - it("should not execute an event listener without await", async () => { - let unicorn: boolean = false; - - emitter.listen("firstEvent", () => (unicorn = true)); + describe(".dispatch", () => { + it("should emit one event", async () => { + emitter.listen("firstEvent", dummyListener); - emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); - expect(unicorn).toBeFalse(); - }); -}); - -describe(".dispatchSeq", () => { - it("should execute a wildcard listener with await", async () => { - let unicorn: boolean = false; - - emitter.listen("*", () => (unicorn = true)); + expect(dummyCaller).toHaveBeenCalled(); + }); - await emitter.dispatchSeq("firstEvent"); + it("should emit multiple events", async () => { + emitter.listen("firstEvent", dummyListener); - expect(unicorn).toBeTrue(); - }); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("firstEvent"); - it("should not execute an event listener without await (async behaviour)", async () => { - let unicorn: boolean = false; + expect(dummyCaller).toHaveBeenCalledTimes(5); + }); - emitter.listen("firstEvent", () => (unicorn = true)); + it("should not execute an event listener without await", async () => { + emitter.listen("firstEvent", dummyListener); - emitter.dispatchSeq("firstEvent"); + emitter.dispatch("firstEvent"); - expect(unicorn).toBeFalse(); + expect(dummyCaller).not.toHaveBeenCalled(); + }); }); - it("should emit all events in sequence", async () => { - const events: number[] = []; + describe(".dispatchSeq", () => { + it("should execute a wildcard listener with await", async () => { + emitter.listen("*", dummyListener); - const listener = async (data: any) => { - events.push(data); + await emitter.dispatchSeq("firstEvent"); - if (events.length >= 3) { - expect(events).toEqual([1, 2, 3]); - } - }; + expect(dummyCaller).toHaveBeenCalled(); + }); - emitter.listen("firstEvent", () => listener(1)); - emitter.listen("firstEvent", () => listener(2)); - emitter.listen("firstEvent", () => listener(3)); + it("should not execute an event listener without await (async behaviour)", async () => { + emitter.listen("firstEvent", dummyListener); - await emitter.dispatchSeq("firstEvent"); - }); -}); + emitter.dispatchSeq("firstEvent"); -describe(".dispatchSync", () => { - it("should execute an event listener without await", () => { - let unicorn: boolean = false; + expect(dummyCaller).not.toHaveBeenCalled(); + }); - emitter.listen("firstEvent", () => (unicorn = true)); + it("should emit all events in sequence", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - emitter.dispatchSync("firstEvent"); + await emitter.dispatchSeq("firstEvent"); - expect(unicorn).toBeTrue(); + expect(dummyCaller).toHaveBeenCalledTimes(3); + }); }); - it("should execute a wildcard listener without await", () => { - let unicorn: boolean = false; + describe(".dispatchSync", () => { + it("should execute an event listener without await", () => { + emitter.listen("firstEvent", dummyListener); - emitter.listen("*", () => (unicorn = true)); + emitter.dispatchSync("firstEvent"); - emitter.dispatchSync("firstEvent"); + expect(dummyCaller).toHaveBeenCalled(); + }); - expect(unicorn).toBeTrue(); - }); + it("should execute a wildcard listener without await", () => { + emitter.listen("*", dummyListener); - it("should emit all events in sequence", () => { - const events: number[] = []; + emitter.dispatchSync("firstEvent"); - const listener = async (data: any) => { - events.push(data); + expect(dummyCaller).toHaveBeenCalled(); + }); - if (events.length >= 3) { - expect(events).toEqual([1, 2, 3]); - } - }; + it("should emit all events in sequence", () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - emitter.listen("firstEvent", () => listener(1)); - emitter.listen("firstEvent", () => listener(2)); - emitter.listen("firstEvent", () => listener(3)); + emitter.dispatchSync("firstEvent"); - emitter.dispatchSync("firstEvent"); + expect(dummyCaller).toHaveBeenCalledTimes(3); + }); }); -}); - -describe(".dispatchMany", () => { - it("should emit all events", async () => { - const events: number[] = []; - - const listener = async (data: any) => { - events.push(data); - if (events.length >= 6) { - expect(events).toEqual([1, 2, 3, 4, 5, 6]); - } - }; + describe(".dispatchMany", () => { + it("should emit all events", async () => { + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - emitter.listen("firstEvent", () => listener(1)); - emitter.listen("firstEvent", () => listener(2)); - emitter.listen("firstEvent", () => listener(3)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); - emitter.listen("secondEvent", () => listener(4)); - emitter.listen("secondEvent", () => listener(5)); - emitter.listen("secondEvent", () => listener(6)); + await emitter.dispatchMany([ + ["firstEvent", undefined], + ["secondEvent", undefined], + ]); - await emitter.dispatchMany([ - ["firstEvent", undefined], - ["secondEvent", undefined], - ]); + expect(dummyCaller).toHaveBeenCalledTimes(6); + }); }); -}); - -describe(".dispatchManySeq", () => { - it("should emit all events", async () => { - const events: number[] = []; - const listener = async (data: any) => { - events.push(data); + describe(".dispatchManySeq", () => { + it("should emit all events", async () => { + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - if (events.length >= 6) { - expect(events).toEqual([1, 2, 3, 4, 5, 6]); - } - }; + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); - emitter.listen("firstEvent", () => listener(1)); - emitter.listen("firstEvent", () => listener(2)); - emitter.listen("firstEvent", () => listener(3)); + await emitter.dispatchManySeq([ + ["firstEvent", undefined], + ["secondEvent", undefined], + ]); - emitter.listen("secondEvent", () => listener(4)); - emitter.listen("secondEvent", () => listener(5)); - emitter.listen("secondEvent", () => listener(6)); - - await emitter.dispatchManySeq([ - ["firstEvent", undefined], - ["secondEvent", undefined], - ]); + expect(dummyCaller).toHaveBeenCalledTimes(6); + }); }); -}); - -describe(".dispatchManySync", () => { - it("should emit all events", async () => { - const events: number[] = []; - - const listener = async (data: any) => { - events.push(data); - if (events.length >= 6) { - expect(events).toEqual([1, 2, 3, 4, 5, 6]); - } - }; + describe(".dispatchManySync", () => { + it("should emit all events", async () => { + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); + emitter.listen("firstEvent", new DummyClass(dummyCaller)); - emitter.listen("firstEvent", () => listener(1)); - emitter.listen("firstEvent", () => listener(2)); - emitter.listen("firstEvent", () => listener(3)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); + emitter.listen("secondEvent", new DummyClass(dummyCaller)); - emitter.listen("secondEvent", () => listener(4)); - emitter.listen("secondEvent", () => listener(5)); - emitter.listen("secondEvent", () => listener(6)); + emitter.dispatchManySync([ + ["firstEvent", undefined], + ["secondEvent", undefined], + ]); - emitter.dispatchManySync([ - ["firstEvent", undefined], - ["secondEvent", undefined], - ]); + expect(dummyCaller).toHaveBeenCalledTimes(6); + }); }); -}); -describe(".flush", () => { - it("should clear all listeners", async () => { - const calls: string[] = []; + describe(".flush", () => { + it("should clear all listeners", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("secondEvent", dummyListener); + emitter.listen("*", dummyListener); - emitter.listen("firstEvent", () => calls.push("firstEvent")); - emitter.listen("secondEvent", () => calls.push("secondEvent")); - emitter.listen("*", () => calls.push("any")); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + expect(dummyCaller).toHaveBeenCalledTimes(4); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent"]); + emitter.flush(); - emitter.flush(); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); - - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent"]); + expect(dummyCaller).toHaveBeenCalledTimes(4); + }); }); -}); -describe(".forget", () => { - it("should clear all listeners for an event", async () => { - const calls: string[] = []; + describe(".forget", () => { + it("should clear all listeners for an event", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("secondEvent", dummyListener); + emitter.listen("*", dummyListener); - emitter.listen("firstEvent", () => calls.push("firstEvent")); - emitter.listen("secondEvent", () => calls.push("secondEvent")); - emitter.listen("*", () => calls.push("any")); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + expect(dummyCaller).toHaveBeenCalledTimes(4); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent"]); + emitter.forget("firstEvent"); - emitter.forget("firstEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); - - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent", "any", "any", "secondEvent"]); + expect(dummyCaller).toHaveBeenCalledTimes(7); + }); }); -}); - -describe(".forgetMany", () => { - it("should forget the given listeners by name", async () => { - const calls: string[] = []; - emitter.listen("firstEvent", () => calls.push("firstEvent")); - emitter.listen("secondEvent", () => calls.push("secondEvent")); - emitter.listen("*", () => calls.push("any")); + describe(".forgetMany", () => { + it("should forget the given listeners by name", async () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("secondEvent", dummyListener); + emitter.listen("*", dummyListener); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent"]); + expect(dummyCaller).toHaveBeenCalledTimes(4); - emitter.forgetMany(["firstEvent", "secondEvent"]); + emitter.forgetMany(["firstEvent", "secondEvent"]); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent", "any", "any"]); - }); - - it("should forget the given listeners by name and function signature", async () => { - const calls: string[] = []; + expect(dummyCaller).toHaveBeenCalledTimes(6); + }); - const firstEvent = () => calls.push("firstEvent"); - const secondEvent = () => calls.push("secondEvent"); + it("should forget the given listeners by name and function signature", async () => { + const firstEvent = new DummyClass(dummyCaller); + const secondEvent = new DummyClass(dummyCaller); - emitter.listen("firstEvent", firstEvent); - emitter.listen("secondEvent", secondEvent); - emitter.listen("*", () => calls.push("any")); + emitter.listen("firstEvent", firstEvent); + emitter.listen("secondEvent", secondEvent); + emitter.listen("*", new DummyClass(dummyCaller)); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent"]); + expect(dummyCaller).toHaveBeenCalledTimes(4); - emitter.forgetMany([ - ["firstEvent", firstEvent], - ["secondEvent", secondEvent], - ]); + emitter.forgetMany([ + ["firstEvent", firstEvent], + ["secondEvent", secondEvent], + ]); - await emitter.dispatch("firstEvent"); - await emitter.dispatch("secondEvent"); + await emitter.dispatch("firstEvent"); + await emitter.dispatch("secondEvent"); - expect(calls).toEqual(["any", "firstEvent", "any", "secondEvent", "any", "any"]); + expect(dummyCaller).toHaveBeenCalledTimes(6); + }); }); -}); - -describe(".getListeners", () => { - it("should return all listeners for the given event", () => { - const listener = (): null => null; - emitter.listen("firstEvent", listener); + describe(".getListeners", () => { + it("should return all listeners for the given event", () => { + emitter.listen("firstEvent", dummyListener); - expect(emitter.getListeners("firstEvent")).toEqual([listener]); + expect(emitter.getListeners("firstEvent")).toEqual([dummyListener]); + }); }); -}); -describe(".hasListeners", () => { - it("should return true if a listener is registered", () => { - emitter.listen("firstEvent", () => {}); + describe(".hasListeners", () => { + it("should return true if a listener is registered", () => { + emitter.listen("firstEvent", dummyListener); - expect(emitter.hasListeners("firstEvent")).toBeTrue(); - }); + expect(emitter.hasListeners("firstEvent")).toBeTrue(); + }); - it("should return false if no listener is registered", () => { - expect(emitter.hasListeners("firstEvent")).toBeFalse(); + it("should return false if no listener is registered", () => { + expect(emitter.hasListeners("firstEvent")).toBeFalse(); + }); }); -}); -describe(".countListeners", () => { - it("should return the total listener count", () => { - emitter.listen("firstEvent", () => null); - emitter.listen("secondEvent", () => null); - emitter.listen("*", () => null); + describe(".countListeners", () => { + it("should return the total listener count", () => { + emitter.listen("firstEvent", dummyListener); + emitter.listen("secondEvent", dummyListener); + emitter.listen("*", dummyListener); - expect(emitter.countListeners("firstEvent")).toBe(2); - expect(emitter.countListeners("secondEvent")).toBe(2); - expect(emitter.countListeners()).toBe(3); + expect(emitter.countListeners("firstEvent")).toBe(2); + expect(emitter.countListeners("secondEvent")).toBe(2); + expect(emitter.countListeners()).toBe(3); + }); }); }); diff --git a/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts b/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts index 71ae70b2cd..c8392ee5e4 100644 --- a/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts +++ b/__tests__/unit/core-kernel/services/queue/drivers/memory.test.ts @@ -1,124 +1,72 @@ -import "jest-extended"; - import { sleep } from "@arkecosystem/utils"; +import { Contracts } from "@packages/core-kernel/src"; import { MemoryQueue } from "@packages/core-kernel/src/services/queue/drivers/memory"; -const dummyFunction = async () => {}; +class DummyClass implements Contracts.Kernel.QueueJob { + public constructor(private readonly method?) {} + + public handle(): void { + this.method(); + } +} let driver: MemoryQueue; beforeEach(() => (driver = new MemoryQueue())); describe("MemoryQueue", () => { - it("should start the default queue and process jobs", async () => { - let fnValue: number = 0; + it("should start queue and process jobs", async () => { + let dummy: jest.Mock = jest.fn(); expect(driver.size()).toBe(0); - expect(fnValue).toBe(0); - await driver.push(async () => fnValue++); + await driver.push(new DummyClass(dummy)); await driver.start(); - expect(fnValue).toBe(1); - }); - - it("should start the given queue and process jobs", async () => { - let fnValue: number = 0; - - expect(driver.size("balance")).toBe(0); - expect(fnValue).toBe(0); - - await driver.pushOn("balance", async () => fnValue++); - - await driver.start("balance"); - - expect(fnValue).toBe(1); + expect(dummy).toHaveBeenCalled(); }); - it("should stop the default queue and not process new jobs", async () => { - let fnValue: number = 0; + it("should stop queue and not process new jobs", async () => { + let dummy: jest.Mock = jest.fn(); expect(driver.size()).toBe(0); - expect(fnValue).toBe(0); - await driver.push(async () => fnValue++); + await driver.push(new DummyClass(dummy)); await driver.start(); await driver.stop(); - await driver.push(async () => fnValue++); - - expect(fnValue).toBe(1); - }); - - it("should stop the given queue and not process new jobs", async () => { - let fnValue: number = 0; - const fnIncrement = async () => fnValue++; - - expect(driver.size("balance")).toBe(0); - expect(fnValue).toBe(0); - - await driver.pushOn("balance", async () => fnValue++); - - await driver.start("balance"); - - await driver.stop("balance"); + await driver.push(new DummyClass(dummy)); - await driver.bulkOn("balance", [fnIncrement, fnIncrement]); - - expect(fnValue).toBe(1); + expect(dummy).toHaveBeenCalled(); }); - it("should pause and resume the default queue", async () => { - let fnValue: number = 0; - const fnIncrement = async () => fnValue++; + it("should pause and resume queue", async () => { + let dummy: jest.Mock = jest.fn(); expect(driver.size()).toBe(0); - expect(fnValue).toBe(0); - await driver.push(fnIncrement); + await driver.push(new DummyClass(dummy)); await driver.start(); - expect(fnValue).toBe(1); + expect(dummy).toHaveBeenCalled(); await driver.pause(); - await driver.bulk([fnIncrement, fnIncrement]); + await driver.bulk([new DummyClass(dummy), new DummyClass(dummy)]); - await driver.start(); + await driver.resume(); - expect(fnValue).toBe(3); + expect(dummy).toHaveBeenCalledTimes(3); }); - it("should pause and resume the given queue", async () => { - let fnValue: number = 0; - const fnIncrement = async () => fnValue++; - - expect(driver.size("balance")).toBe(0); - expect(fnValue).toBe(0); - - await driver.pushOn("balance", fnIncrement); - - await driver.start("balance"); - - expect(fnValue).toBe(1); - - await driver.pause("balance"); - - await driver.bulkOn("balance", [fnIncrement, fnIncrement]); - - await driver.start("balance"); - - expect(fnValue).toBe(3); - }); - - it("should clear the default queue", async () => { + it("should clear queue", async () => { expect(driver.size()).toBe(0); - await driver.push(dummyFunction); + await driver.push(new DummyClass()); expect(driver.size()).toBe(1); @@ -127,85 +75,29 @@ describe("MemoryQueue", () => { expect(driver.size()).toBe(0); }); - it("should clear the given queue", async () => { - expect(driver.size("balance")).toBe(0); - - await driver.pushOn("balance", dummyFunction); - - expect(driver.size("balance")).toBe(1); - - await driver.clear("balance"); - - expect(driver.size("balance")).toBe(0); - }); - - it("should push the job onto the default queue", async () => { + it("should push the job onto queue", async () => { expect(driver.size()).toBe(0); - await driver.push(dummyFunction); + await driver.push(new DummyClass()); expect(driver.size()).toBe(1); }); - it("should push the job onto the given queue", () => { - expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(0); - - driver.pushOn("balance", dummyFunction); - + it("should push the job onto queue with a 2 second delay", async () => { expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(1); - }); - it("should push the job onto the default queue with a 2 second delay", async () => { - expect(driver.size()).toBe(0); - - await driver.later(2000, dummyFunction); + await driver.later(2000, new DummyClass()); await sleep(2000); expect(driver.size()).toBe(1); }); - it("should push the job onto the default queue with a 2 second delay", async () => { + it("should push the job onto queue", async () => { expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(0); - - await driver.laterOn("balance", 2000, dummyFunction); - await sleep(2000); - - expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(1); - }); - - it("should push the job onto the default queue", async () => { - expect(driver.size()).toBe(0); - - await driver.bulk([dummyFunction, dummyFunction]); + await driver.bulk([new DummyClass(), new DummyClass()]); expect(driver.size()).toBe(2); }); - - it("should push the job onto the given queue", async () => { - expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(0); - - await driver.bulkOn("balance", [dummyFunction, dummyFunction]); - - expect(driver.size()).toBe(0); - expect(driver.size("balance")).toBe(2); - }); - - it("should return the name of the default queue", () => { - expect(driver.getDefaultQueue()).toBe("default"); - }); - - it("should set the name of the default queue", () => { - expect(driver.getDefaultQueue()).toBe("default"); - - driver.setDefaultQueue("new-default"); - - expect(driver.getDefaultQueue()).toBe("new-default"); - }); }); diff --git a/__tests__/unit/core-kernel/services/triggers/triggers.test.ts b/__tests__/unit/core-kernel/services/triggers/triggers.test.ts index 7a0718ff8f..31ef3b373c 100644 --- a/__tests__/unit/core-kernel/services/triggers/triggers.test.ts +++ b/__tests__/unit/core-kernel/services/triggers/triggers.test.ts @@ -1,61 +1,78 @@ import { InvalidArgumentException } from "@packages/core-kernel/src/exceptions/logic"; import { Triggers, Action } from "@packages/core-kernel/src/services/triggers"; +import { ActionArguments } from "@packages/core-kernel/src/types"; + +class DummyAction extends Action { + public execute(args: ActionArguments): T { + return args.returnValue || true; + } +} + +class DummyActionWithException extends Action { + public execute(): T { + throw new Error("Hello World"); + } +} let triggers: Triggers; -beforeEach(() => (triggers = new Triggers())); -test("binds a trigger with a hook and executes them", async () => { - let fnValue = 0; +beforeEach(() => (triggers = new Triggers())); - triggers.bind("count", () => fnValue++).before(() => fnValue++); +test("binds a trigger and accepts arguments for calls", async () => { + let before: jest.Mock = jest.fn(); - await triggers.call("count"); + triggers.bind("count", new DummyAction()).before(before); - expect(fnValue).toBe(2); + await expect( + triggers.call("count", { + returnValue: "Hello World", + }), + ).resolves.toBe("Hello World"); + expect(before).toHaveBeenCalled(); }); -test("binds a trigger with an hook and executes them", async () => { - let fnValue = 0; +test("binds a trigger with a hook and executes them", async () => { + let before: jest.Mock = jest.fn(); - triggers - .bind("count", () => { - fnValue++; + triggers.bind("count", new DummyAction()).before(before); - throw new Error("Hello World"); - }) - .error(() => fnValue++); + await expect(triggers.call("count")).resolves.toBe(true); + expect(before).toHaveBeenCalled(); +}); + +test("binds a trigger with an hook and executes them", async () => { + let error: jest.Mock = jest.fn(); - await triggers.call("count"); + triggers.bind("count", new DummyActionWithException()).error(error); - expect(fnValue).toBe(2); + await expect(triggers.call("count")).resolves.toBeUndefined(); + expect(error).toHaveBeenCalled(); }); test("binds a trigger with an hook and executes them", async () => { - let fnValue = 0; - - triggers.bind("count", () => fnValue++).after(() => fnValue++); + let after: jest.Mock = jest.fn(); - await triggers.call("count"); + triggers.bind("count", new DummyAction()).after(after); - expect(fnValue).toBe(2); + await expect(triggers.call("count")).resolves.toBe(true); + expect(after).toHaveBeenCalled(); }); test("binds a trigger with hooks and executes them", async () => { - let fnValue = 0; + let before: jest.Mock = jest.fn(); + let error: jest.Mock = jest.fn(); + let after: jest.Mock = jest.fn(); triggers - .bind("count", () => { - fnValue++; - - throw new Error("Hello World"); - }) - .before(() => fnValue++) - .error(() => fnValue++) - .after(() => fnValue++); - - await triggers.call("count"); - - expect(fnValue).toBe(4); + .bind("count", new DummyActionWithException()) + .before(before) + .error(error) + .after(after); + + await expect(triggers.call("count")).resolves.toBeUndefined(); + expect(before).toHaveBeenCalled(); + expect(error).toHaveBeenCalled(); + expect(after).toHaveBeenCalled(); }); test("throws an error if a trigger is not registered", async () => { @@ -65,26 +82,24 @@ test("throws an error if a trigger is not registered", async () => { }); test("throws an error if a trigger is already registered", async () => { - triggers.bind("duplicate", Function); + triggers.bind("duplicate", new DummyAction()); expect(() => { - triggers.bind("duplicate", Function); + triggers.bind("duplicate", new DummyAction()); }).toThrowError(new InvalidArgumentException("The given trigger [duplicate] is already registered.")); }); test("throws an error if a trigger is reserved", async () => { expect(() => { - triggers.bind("internal.trigger", Function); + triggers.bind("internal.trigger", new DummyAction()); }).toThrowError(new InvalidArgumentException("The given trigger [internal.trigger] is reserved.")); }); describe("get", () => { test("returns a trigger for the given trigger", async () => { - const fn = () => {}; - - triggers.bind("count", fn); + triggers.bind("count", new DummyAction()); - expect(triggers.get("count")).toEqual(new Action(fn)); + expect(triggers.get("count")).toBeInstanceOf(Action); }); test("throws an error if a trigger is not registered", async () => { diff --git a/packages/core-blockchain/src/blockchain.ts b/packages/core-blockchain/src/blockchain.ts index cb46f79fb8..9cb851b0d2 100644 --- a/packages/core-blockchain/src/blockchain.ts +++ b/packages/core-blockchain/src/blockchain.ts @@ -137,8 +137,6 @@ export class Blockchain implements Contracts.Blockchain.Blockchain { this.dispatch("START"); - this.app.events.listenOnce("shutdown", () => this.stop()); - if (skipStartedCheck || process.env.CORE_SKIP_BLOCKCHAIN_STARTED_CHECK) { return true; } diff --git a/packages/core-forger/src/delegate-tracker.ts b/packages/core-forger/src/delegate-tracker.ts index db53e44f9f..2774d25e7f 100644 --- a/packages/core-forger/src/delegate-tracker.ts +++ b/packages/core-forger/src/delegate-tracker.ts @@ -58,11 +58,29 @@ export class DelegateTracker { @Container.inject(Container.Identifiers.WalletRepository) protected readonly walletRepository!: Contracts.State.WalletRepository; + /** + * @private + * @type {Delegate[]} + * @memberof DelegateTracker + */ + private delegates: Delegate[] = []; + + /** + * @param {Delegate[]} delegates + * @returns {this} + * @memberof DelegateTracker + */ + public initialize(delegates: Delegate[]): this { + this.delegates = delegates; + + return this; + } + /** * @returns {Promise} * @memberof DelegateTracker */ - public async execute(delegates: Delegate[]): Promise { + public async handle(): Promise { // Arrange... const { height, timestamp } = this.blockchainService.getLastBlock().data; const delegatesCount = Managers.configManager.getMilestone(height).activeDelegates; @@ -92,7 +110,7 @@ export class DelegateTracker { ); let secondsToNextRound: number | undefined; - for (const delegate of delegates) { + for (const delegate of this.delegates) { let secondsToForge: number = 0; for (let i = 0; i < nextForgers.length; i++) { if (nextForgers[i] === delegate.publicKey) { diff --git a/packages/core-forger/src/service-provider.ts b/packages/core-forger/src/service-provider.ts index 6d391a9774..656b9dc252 100644 --- a/packages/core-forger/src/service-provider.ts +++ b/packages/core-forger/src/service-provider.ts @@ -74,8 +74,9 @@ export class ServiceProvider extends Providers.ServiceProvider { if (this.config().get("tracker") === true) { this.app .get(Container.Identifiers.EventDispatcherService) - .listen(Enums.BlockEvent.Applied, async () => - this.app.resolve(DelegateTracker).execute(delegates), + .listen( + Enums.BlockEvent.Applied, + this.app.resolve(DelegateTracker).initialize(delegates), ); } } diff --git a/packages/core-kernel/package.json b/packages/core-kernel/package.json index ce4e62e91a..a704230a4e 100644 --- a/packages/core-kernel/package.json +++ b/packages/core-kernel/package.json @@ -34,7 +34,6 @@ "log-process-errors": "^5.0.2", "nanomatch": "^1.2.13", "nsfw": "^1.2.5", - "p-queue": "^6.2.0", "reflect-metadata": "^0.1.13", "semver": "^6.3.0", "type-fest": "^0.8.1" diff --git a/packages/core-kernel/src/bootstrap/service-providers/boot-service-providers.ts b/packages/core-kernel/src/bootstrap/service-providers/boot-service-providers.ts index 043c8ebf62..773c70d557 100644 --- a/packages/core-kernel/src/bootstrap/service-providers/boot-service-providers.ts +++ b/packages/core-kernel/src/bootstrap/service-providers/boot-service-providers.ts @@ -1,3 +1,4 @@ +import { Contracts } from "../.."; import { Application } from "../../contracts/kernel"; // @ts-ignore import { BlockEvent, KernelEvent, StateEvent } from "../../enums"; @@ -7,6 +8,7 @@ import { Identifiers, inject, injectable } from "../../ioc"; import { ServiceProvider, ServiceProviderRepository } from "../../providers"; import { assert } from "../../utils"; import { Bootstrapper } from "../interfaces"; +import { ChangeServiceProviderState } from "./listeners"; // todo: review the implementation /** @@ -61,37 +63,15 @@ export class BootServiceProviders implements Bootstrapper { this.serviceProviders.defer(name); } + const eventListener: Contracts.Kernel.EventListener = this.app + .resolve(ChangeServiceProviderState) + .initialize(serviceProviderName, serviceProvider); + // Register the "enable/disposeWhen" listeners to be triggered on every block. Use with care! - this.app.events.listen(BlockEvent.Applied, async () => await this.changeState(name, serviceProvider)); + this.app.events.listen(BlockEvent.Applied, eventListener); // We only want to trigger this if another service provider has been booted to avoid an infinite loop. - this.app.events.listen(KernelEvent.ServiceProviderBooted, async ({ data }) => { - if (data.name !== name) { - await this.changeState(name, serviceProvider, data.name); - } - }); - } - } - - /** - * @private - * @param {string} name - * @param {ServiceProvider} serviceProvider - * @param {string} [previous] - * @returns {Promise} - * @memberof BootServiceProviders - */ - private async changeState(name: string, serviceProvider: ServiceProvider, previous?: string): Promise { - if (this.serviceProviders.failed(name)) { - return; - } - - if (this.serviceProviders.loaded(name) && (await serviceProvider.disposeWhen(previous))) { - await this.serviceProviders.dispose(name); - } - - if (this.serviceProviders.deferred(name) && (await serviceProvider.bootWhen(previous))) { - await this.serviceProviders.boot(name); + this.app.events.listen(KernelEvent.ServiceProviderBooted, eventListener); } } } diff --git a/packages/core-kernel/src/bootstrap/service-providers/listeners.ts b/packages/core-kernel/src/bootstrap/service-providers/listeners.ts new file mode 100644 index 0000000000..bd1c6c1acd --- /dev/null +++ b/packages/core-kernel/src/bootstrap/service-providers/listeners.ts @@ -0,0 +1,81 @@ +import { EventListener } from "../../contracts/kernel"; +import { BlockEvent, KernelEvent } from "../../enums"; +import { Identifiers, inject, injectable } from "../../ioc"; +import { ServiceProvider, ServiceProviderRepository } from "../../providers"; + +/** + * @class Disconnect + * @implements {EventListener} + */ +@injectable() +export class ChangeServiceProviderState implements EventListener { + /** + * @private + * @type {ServiceProviderRepository} + * @memberof BootServiceProviders + */ + @inject(Identifiers.ServiceProviderRepository) + private readonly serviceProviders!: ServiceProviderRepository; + + /** + * @private + * @type {string} + * @memberof ChangeServiceProviderState + */ + private name!: string; + + /** + * @private + * @type {ServiceProvider} + * @memberof ChangeServiceProviderState + */ + private serviceProvider!: ServiceProvider; + + /** + * @param {string} name + * @param {ServiceProvider} serviceProvider + * @returns {this} + * @memberof ChangeServiceProviderState + */ + public initialize(name: string, serviceProvider: ServiceProvider): this { + this.name = name; + this.serviceProvider = serviceProvider; + + return this; + } + + /** + * @param {*} {name,data} + * @returns {Promise} + * @memberof ChangeServiceProviderState + */ + public async handle({ name, data }): Promise { + if (name === BlockEvent.Applied) { + return this.changeState(); + } + + if (name === KernelEvent.ServiceProviderBooted && data.name !== this.name) { + return this.changeState(data.name); + } + } + + /** + * @private + * @param {string} [previous] + * @returns {Promise} + * @memberof BootServiceProviders + */ + private async changeState(previous?: string): Promise { + if (this.serviceProviders.failed(this.name)) { + return; + } + + if (this.serviceProviders.loaded(this.name) && (await this.serviceProvider.disposeWhen(previous))) { + await this.serviceProviders.dispose(this.name); + } + + if (this.serviceProviders.deferred(this.name) && (await this.serviceProvider.bootWhen(previous))) { + await this.serviceProviders.boot(this.name); + } + } +} diff --git a/packages/core-kernel/src/contracts/kernel/events.ts b/packages/core-kernel/src/contracts/kernel/events.ts index a8199d1ad7..f026e4b85d 100644 --- a/packages/core-kernel/src/contracts/kernel/events.ts +++ b/packages/core-kernel/src/contracts/kernel/events.ts @@ -1,4 +1,11 @@ -import { EventListener, EventName } from "../../types/events"; +export type EventName = string | symbol; + +/** + * @interface EventListener + */ +export interface EventListener { + handle(payload: { name: EventName; data: any }): void; +} /** * @export diff --git a/packages/core-kernel/src/contracts/kernel/queue.ts b/packages/core-kernel/src/contracts/kernel/queue.ts index 7ae44d447c..25f8950e5f 100644 --- a/packages/core-kernel/src/contracts/kernel/queue.ts +++ b/packages/core-kernel/src/contracts/kernel/queue.ts @@ -1,3 +1,10 @@ +/** + * @interface QueueJob + */ +export interface QueueJob { + handle(): void; +} + /** * @export * @interface Queue @@ -10,7 +17,7 @@ export interface Queue { * @returns {Promise} * @memberof Queue */ - start(queue?: string): Promise; + start(): Promise; /** * Stop the queue. @@ -19,7 +26,7 @@ export interface Queue { * @returns {Promise} * @memberof Queue */ - stop(queue?: string): Promise; + stop(): Promise; /** * Pause the queue. @@ -28,81 +35,53 @@ export interface Queue { * @returns {Promise} * @memberof Queue */ - pause(queue?: string): Promise; + pause(): Promise; /** - * Clear the queue. + * Resume the queue. * * @param {string} [queue] * @returns {Promise} * @memberof Queue */ - clear(queue?: string): Promise; + resume(): Promise; /** - * Push a new job onto the default queue. + * Clear the queue. * - * @template T - * @param {() => PromiseLike} fn + * @param {string} [queue] * @returns {Promise} * @memberof Queue */ - push(fn: () => PromiseLike): Promise; + clear(): Promise; /** - * Push a new job onto the given queue. + * Push a new job onto the default queue. * - * @template T - * @param {string} queue - * @param {() => PromiseLike} fn + * @param {Function|QueueJob} job * @returns {Promise} * @memberof Queue */ - pushOn(queue: string, fn: () => PromiseLike): Promise; + push(job: Function | QueueJob): Promise; /** * Push a new job onto the default queue after a delay. * - * @template T - * @param {number} delay - * @param {() => PromiseLike} fn - * @returns {Promise} - * @memberof Queue - */ - later(delay: number, fn: () => PromiseLike): Promise; - - /** - * Push a new job onto the given queue after a delay. - * - * @template T - * @param {string} queue * @param {number} delay - * @param {() => PromiseLike} fn + * @param {Function|QueueJob} job * @returns {Promise} * @memberof Queue */ - laterOn(queue: string, delay: number, fn: () => PromiseLike): Promise; + later(delay: number, job: Function | QueueJob): Promise; /** * Push an array of jobs onto the default queue. * - * @template T - * @param {(() => PromiseLike)[]} functions + * @param {(Function|QueueJob)[]} jobs * @returns {Promise} * @memberof Queue */ - bulk(functions: (() => PromiseLike)[]): Promise; - - /** - * Push an array of jobs onto the given queue. - * - * @template T - * @param {string} queue - * @param {(() => PromiseLike)[]} functions - * @returns {Promise} - * @memberof Queue - */ - bulkOn(queue: string, functions: (() => PromiseLike)[]): Promise; + bulk(jobs: Function[] | QueueJob[]): Promise; /** * Get the size of the given queue. @@ -111,21 +90,5 @@ export interface Queue { * @returns {number} * @memberof Queue */ - size(queue?: string): number; - - /** - * Get the connection name for the queue. - * - * @returns {string} - * @memberof Queue - */ - getDefaultQueue(): string; - - /** - * Set the connection name for the queue. - * - * @param {string} name - * @memberof Queue - */ - setDefaultQueue(name: string): void; + size(): number; } diff --git a/packages/core-kernel/src/services/events/drivers/memory.ts b/packages/core-kernel/src/services/events/drivers/memory.ts index 919258f2b3..da1b8bf7e0 100644 --- a/packages/core-kernel/src/services/events/drivers/memory.ts +++ b/packages/core-kernel/src/services/events/drivers/memory.ts @@ -1,10 +1,34 @@ import mm from "micromatch"; -import { EventDispatcher as EventDispatcherContract } from "../../../contracts/kernel/events"; +import { EventDispatcher as EventDispatcherContract, EventListener, EventName } from "../../../contracts/kernel/events"; import { injectable } from "../../../ioc"; -import { EventListener, EventName } from "../../../types/events"; import { assert } from "../../../utils"; +/** + * @class OnceListener + * @implements {EventListener} + */ +class OnceListener implements EventListener { + /** + * @param {EventDispatcherContract} dispatcher + * @param {EventListener} listener + * @memberof OnceListener + */ + public constructor( + private readonly dispatcher: EventDispatcherContract, + private readonly listener: EventListener, + ) {} + + /** + * @param {*} {name} + * @returns {Promise} + * @memberof OnceListener + */ + public async handle({ name }): Promise { + this.dispatcher.forget(name, this.listener); + } +} + /** * @export * @class MemoryEventDispatcher @@ -52,11 +76,9 @@ export class MemoryEventDispatcher implements EventDispatcherContract { * @memberof MemoryEventDispatcher */ public listenOnce(name: EventName, listener: EventListener): void { - const off: () => void = this.listen(name, data => { - off(); + this.listen(name, listener); - listener(data); - }); + this.listen(name, new OnceListener(this, listener)); } /** @@ -138,7 +160,7 @@ export class MemoryEventDispatcher implements EventDispatcherContract { const resolvers: Array> = []; for (const listener of this.getListenersByPattern(event)) { - resolvers.push(new Promise(resolve => resolve(listener({ name: event, data })))); + resolvers.push(new Promise(resolve => resolve(listener.handle({ name: event, data })))); } await Promise.all(resolvers); @@ -155,10 +177,7 @@ export class MemoryEventDispatcher implements EventDispatcherContract { await Promise.resolve(); for (const listener of this.getListenersByPattern(event)) { - await listener({ - name: event, - data, - }); + await listener.handle({ name: event, data }); } } @@ -170,10 +189,7 @@ export class MemoryEventDispatcher implements EventDispatcherContract { */ public dispatchSync(event: EventName, data?: T): void { for (const listener of this.getListenersByPattern(event)) { - listener({ - name: event, - data, - }); + listener.handle({ name: event, data }); } } diff --git a/packages/core-kernel/src/services/events/drivers/null.ts b/packages/core-kernel/src/services/events/drivers/null.ts index ec7e4589a4..d977ffa720 100644 --- a/packages/core-kernel/src/services/events/drivers/null.ts +++ b/packages/core-kernel/src/services/events/drivers/null.ts @@ -1,6 +1,5 @@ -import { EventDispatcher as EventDispatcherContract } from "../../../contracts/kernel/events"; +import { EventDispatcher as EventDispatcherContract, EventListener, EventName } from "../../../contracts/kernel/events"; import { injectable } from "../../../ioc"; -import { EventListener, EventName } from "../../../types/events"; /** * @export diff --git a/packages/core-kernel/src/services/queue/drivers/memory.ts b/packages/core-kernel/src/services/queue/drivers/memory.ts index 6cc3d521b5..4672e7b8c3 100644 --- a/packages/core-kernel/src/services/queue/drivers/memory.ts +++ b/packages/core-kernel/src/services/queue/drivers/memory.ts @@ -1,8 +1,5 @@ -import PQueue from "p-queue"; - -import { Queue } from "../../../contracts/kernel/queue"; +import { Queue, QueueJob } from "../../../contracts/kernel/queue"; import { injectable } from "../../../ioc"; -import { assert } from "../../../utils"; /** * @export @@ -13,187 +10,175 @@ import { assert } from "../../../utils"; export class MemoryQueue implements Queue { /** * @private - * @type {Map} + * @type {(QueueJob[])} * @memberof MemoryQueue */ - private readonly queues: Map = new Map(); + private readonly jobs: QueueJob[] = []; /** * @private - * @type {string} + * @type {Promise} * @memberof MemoryQueue */ - private defaultQueue: string = "default"; + private lastQueue?: Promise; /** - * Start the queue. - * - * @param {string} [queue] - * @returns {Promise} + * @private + * @type {any[]} * @memberof MemoryQueue */ - public async start(queue?: string): Promise { - await this.firstOrCreate(queue).start(); - } + private lastResults: any[] = []; /** - * Stop the queue. - * - * @param {string} [queue] - * @returns {Promise} + * @private + * @type {boolean} * @memberof MemoryQueue */ - public async stop(queue?: string): Promise { - await this.queues.delete(queue || this.defaultQueue); - } + private isRunning: boolean = false; /** - * Pause the queue. - * - * @param {string} [queue] - * @returns {Promise} + * @private + * @type {number} * @memberof MemoryQueue */ - public async pause(queue?: string): Promise { - await this.firstOrCreate(queue).pause(); - } + private index: number = -1; /** - * Clear the queue. + * Start the queue. * - * @param {string} [queue] * @returns {Promise} * @memberof MemoryQueue */ - public async clear(queue?: string): Promise { - await this.firstOrCreate(queue).clear(); + public async start(): Promise { + this.lastQueue = this.lastQueue || this.processFromIndex(0); } /** - * Push a new job onto the default queue. + * Stop the queue. * - * @template T - * @param {() => PromiseLike} fn * @returns {Promise} * @memberof MemoryQueue */ - public async push(fn: () => PromiseLike): Promise { - this.firstOrCreate(this.defaultQueue).add(fn); + public async stop(): Promise { + await this.pause(); + + this.clear(); } /** - * Push a new job onto the given queue. + * Pause the queue. * - * @template T - * @param {string} queue - * @param {() => PromiseLike} fn * @returns {Promise} * @memberof MemoryQueue */ - public async pushOn(queue: string, fn: () => PromiseLike): Promise { - this.firstOrCreate(queue).add(fn); + public async pause(): Promise { + this.isRunning = false; } /** - * Push a new job onto the default queue after a delay. + * Resume the queue. * - * @template T - * @param {number} delay - * @param {() => PromiseLike} fn * @returns {Promise} * @memberof MemoryQueue */ - public async later(delay: number, fn: () => PromiseLike): Promise { - setTimeout(() => this.push(fn), delay); + public async resume(): Promise { + this.lastQueue = this.processFromIndex(this.index, this.lastResults); } /** - * Push a new job onto the given queue after a delay. + * Clear the queue. * - * @template T - * @param {string} queue - * @param {number} delay - * @param {() => PromiseLike} fn * @returns {Promise} * @memberof MemoryQueue */ - public async laterOn(queue: string, delay: number, fn: () => PromiseLike): Promise { - setTimeout(() => this.pushOn(queue, fn), delay); + public async clear(): Promise { + this.index = -1; + this.isRunning = false; + this.lastQueue = undefined; + this.jobs.splice(0); } /** - * Push an array of jobs onto the default queue. + * Push a new job onto the queue. * * @template T - * @param {(() => PromiseLike)[]} functions + * @param {QueueJob} job * @returns {Promise} * @memberof MemoryQueue */ - public async bulk(functions: (() => PromiseLike)[]): Promise { - this.firstOrCreate(this.defaultQueue).addAll(functions); + public async push(job: QueueJob): Promise { + this.jobs.push(job); } /** - * Push an array of jobs onto the given queue. + * Push a new job onto the queue after a delay. * * @template T - * @param {string} queue - * @param {(() => PromiseLike)[]} functions + * @param {number} delay + * @param {QueueJob} job * @returns {Promise} * @memberof MemoryQueue */ - public async bulkOn(queue: string, functions: (() => PromiseLike)[]): Promise { - this.firstOrCreate(queue).addAll(functions); + public async later(delay: number, job: QueueJob): Promise { + setTimeout(() => this.push(job), delay); } /** - * Get the size of the given queue. + * Push an array of jobs onto the queue. * - * @param {string} queue - * @returns {number} - * @memberof MemoryQueue - */ - public size(queue?: string): number { - return this.firstOrCreate(queue).size; - } - - /** - * Get the connection name for the queue. - * - * @returns {string} + * @param {QueueJob[]} jobs + * @returns {Promise} * @memberof MemoryQueue */ - public getDefaultQueue(): string { - return this.defaultQueue; + public async bulk(jobs: QueueJob[]): Promise { + for (const job of jobs) { + this.jobs.push(job); + } } /** - * Set the connection name for the queue. + * Get the size of the queue. * - * @param {string} name + * @returns {number} * @memberof MemoryQueue */ - public setDefaultQueue(name: string): void { - this.defaultQueue = name; + public size(): number { + return this.jobs.length; } /** * @private - * @param {string} name - * @returns {PQueue} + * @param {number} from + * @param {any[]} [lastResults=[]] + * @param {boolean} [isRunning=true] + * @returns {Promise} * @memberof MemoryQueue */ - private firstOrCreate(name?: string): PQueue { - name = name || this.defaultQueue; + private async processFromIndex(from: number, lastResults: any[] = [], isRunning: boolean = true): Promise { + this.lastResults = lastResults; - if (!this.queues.has(name)) { - this.queues.set(name, new PQueue({ autoStart: false })); - } + if (from < this.jobs.length) { + this.index = from; + + if (isRunning) { + this.isRunning = isRunning; - const queue: PQueue | undefined = this.queues.get(name); + try { + lastResults.push(await this.jobs[from].handle()); - assert.defined(queue); + return this.processFromIndex(from + 1, lastResults, this.isRunning); + } catch (error) { + this.isRunning = false; + + throw new Error( + `Queue halted at job #${from + 1} due to error in handler ${this.jobs[this.index]}.`, + ); + } + } + } else { + this.isRunning = false; + } - return queue; + return this.lastResults; } } diff --git a/packages/core-kernel/src/services/queue/drivers/null.ts b/packages/core-kernel/src/services/queue/drivers/null.ts index aa6225b299..931ef58f2d 100644 --- a/packages/core-kernel/src/services/queue/drivers/null.ts +++ b/packages/core-kernel/src/services/queue/drivers/null.ts @@ -1,4 +1,4 @@ -import { Queue } from "../../../contracts/kernel/queue"; +import { Queue, QueueJob } from "../../../contracts/kernel/queue"; import { injectable } from "../../../ioc"; /** @@ -11,122 +11,85 @@ export class NullQueue implements Queue { /** * Start the queue. * - * @param {string} [queue] * @returns {Promise} * @memberof MemoryQueue */ - public async start(queue?: string): Promise { - // + public async start(): Promise { + return; } /** * Stop the queue. * - * @param {string} [queue] * @returns {Promise} * @memberof MemoryQueue */ - public async stop(queue?: string): Promise { - // + public async stop(): Promise { + return; } /** * Pause the queue. * - * @param {string} [queue] * @returns {Promise} * @memberof MemoryQueue */ - public async pause(queue?: string): Promise { - // + public async pause(): Promise { + return; } /** - * Clear the queue. + * Resume the queue. * - * @param {string} [queue] * @returns {Promise} * @memberof MemoryQueue */ - public async clear(queue?: string): Promise { - // + public async resume(): Promise { + return; } /** - * Push a new job onto the default queue. + * Clear the queue. * - * @template T - * @param {() => PromiseLike} fn * @returns {Promise} * @memberof MemoryQueue */ - public async push(fn: () => PromiseLike): Promise { - // + public async clear(): Promise { + return; } /** - * Push a new job onto the given queue. + * Push a new job onto the default queue. * - * @template T - * @param {string} queue - * @param {() => PromiseLike} fn + * @param {QueueJob} job * @returns {Promise} * @memberof MemoryQueue */ - public async pushOn(queue: string, fn: () => PromiseLike): Promise { - // + public async push(job: QueueJob): Promise { + return; } /** * Push a new job onto the default queue after a delay. * - * @template T * @param {number} delay - * @param {() => PromiseLike} fn + * @param {QueueJob} job * @returns {Promise} * @memberof MemoryQueue */ - public async later(delay: number, fn: () => PromiseLike): Promise { - // - } - - /** - * Push a new job onto the given queue after a delay. - * - * @template T - * @param {string} queue - * @param {number} delay - * @param {() => PromiseLike} fn - * @returns {Promise} - * @memberof MemoryQueue - */ - public async laterOn(queue: string, delay: number, fn: () => PromiseLike): Promise { - // + public async later(delay: number, job: QueueJob): Promise { + return; } /** * Push an array of jobs onto the default queue. * - * @template T - * @param {(() => PromiseLike)[]} functions - * @returns {Promise} - * @memberof MemoryQueue - */ - public async bulk(functions: (() => PromiseLike)[]): Promise { - // - } - - /** - * Push an array of jobs onto the given queue. - * - * @template T - * @param {string} queue - * @param {(() => PromiseLike)[]} functions + * @param {(QueueJob)[]} jobs * @returns {Promise} * @memberof MemoryQueue */ - public async bulkOn(queue: string, functions: (() => PromiseLike)[]): Promise { - // + public async bulk(jobs: QueueJob[]): Promise { + return; } /** @@ -136,27 +99,7 @@ export class NullQueue implements Queue { * @returns {number} * @memberof MemoryQueue */ - public size(queue?: string): number { + public size(): number { return 0; } - - /** - * Get the connection name for the queue. - * - * @returns {string} - * @memberof MemoryQueue - */ - public getDefaultQueue(): string { - return ""; - } - - /** - * Set the connection name for the queue. - * - * @param {string} name - * @memberof MemoryQueue - */ - public setDefaultQueue(name: string): void { - // - } } diff --git a/packages/core-kernel/src/services/schedule/block-job.ts b/packages/core-kernel/src/services/schedule/block-job.ts index dbb84815bd..a00e485a7e 100644 --- a/packages/core-kernel/src/services/schedule/block-job.ts +++ b/packages/core-kernel/src/services/schedule/block-job.ts @@ -1,9 +1,10 @@ -import { Interfaces, Managers } from "@arkecosystem/crypto"; +import { Managers } from "@arkecosystem/crypto"; import { EventDispatcher } from "../../contracts/kernel/events"; import { BlockEvent } from "../../enums"; import { Identifiers, inject, injectable } from "../../ioc"; import { Job } from "./interfaces"; +import { ExecuteCallbackWhenReady } from "./listeners"; /** * @export @@ -32,11 +33,7 @@ export class BlockJob implements Job { * @memberof BlockJob */ public execute(callback: Function): void { - this.events.listen(BlockEvent.Received, async ({ data }: { data: Interfaces.IBlockData }) => { - if (data.height % this.blockCount === 0) { - await callback(); - } - }); + this.events.listen(BlockEvent.Received, new ExecuteCallbackWhenReady(callback, this.blockCount)); } /** diff --git a/packages/core-kernel/src/services/schedule/listeners.ts b/packages/core-kernel/src/services/schedule/listeners.ts new file mode 100644 index 0000000000..750c6fc9b8 --- /dev/null +++ b/packages/core-kernel/src/services/schedule/listeners.ts @@ -0,0 +1,44 @@ +import { EventListener } from "../../contracts/kernel"; + +/** + * @export + * @class ExecuteCallbackWhenReady + * @implements {EventListener} + */ +export class ExecuteCallbackWhenReady implements EventListener { + /** + * @private + * @type {number} + * @memberof ExecuteCallbackWhenReady + */ + private blockCount!: number; + + /** + * @private + * @type {Function} + * @memberof ExecuteCallbackWhenReady + */ + private callback!: Function; + + /** + * @param {Function} callback + * @param {number} blockCount + * @returns {this} + * @memberof ExecuteCallbackWhenReady + */ + public constructor(callback: Function, blockCount: number) { + this.blockCount = blockCount; + this.callback = callback; + } + + /** + * @param {*} {data} + * @returns {Promise} + * @memberof ExecuteCallbackWhenReady + */ + public async handle({ data }): Promise { + if (data.height % this.blockCount === 0) { + await this.callback(); + } + } +} diff --git a/packages/core-kernel/src/services/triggers/action.ts b/packages/core-kernel/src/services/triggers/action.ts index a329d12515..9a79b70d78 100644 --- a/packages/core-kernel/src/services/triggers/action.ts +++ b/packages/core-kernel/src/services/triggers/action.ts @@ -1,4 +1,6 @@ -export class Action { +import { ActionArguments } from "../../types"; + +export abstract class Action { /** * @private * @type {Set} @@ -21,18 +23,13 @@ export class Action { private readonly afterHooks: Set = new Set(); /** - * @param {Function} fn + * @abstract + * @template T + * @param {ActionArguments} args + * @returns {T} * @memberof Action */ - public constructor(private readonly fn: Function) {} - - /** - * @returns {Function} - * @memberof Action - */ - public execute(...args: any[]): T { - return this.fn(args); - } + public abstract execute(args: ActionArguments): T; /** * @param {Function} fn diff --git a/packages/core-kernel/src/services/triggers/triggers.ts b/packages/core-kernel/src/services/triggers/triggers.ts index edad0a2df0..3d40f58c53 100644 --- a/packages/core-kernel/src/services/triggers/triggers.ts +++ b/packages/core-kernel/src/services/triggers/triggers.ts @@ -1,5 +1,6 @@ import { InvalidArgumentException } from "../../exceptions/logic"; import { injectable } from "../../ioc"; +import { ActionArguments } from "../../types"; import { assert } from "../../utils"; import { Action } from "./action"; @@ -26,7 +27,7 @@ export class Triggers { * @returns {Action} * @memberof Actions */ - public bind(name: string, fn: Function): Action { + public bind(name: string, action: Action): Action { if (this.triggers.has(name)) { throw new InvalidArgumentException(`The given trigger [${name}] is already registered.`); } @@ -35,10 +36,9 @@ export class Triggers { throw new InvalidArgumentException(`The given trigger [${name}] is reserved.`); } - const trigger: Action = new Action(fn); - this.triggers.set(name, trigger); + this.triggers.set(name, action); - return trigger; + return action; } /** @@ -67,7 +67,7 @@ export class Triggers { * @returns {(Promise)} * @memberof Actions */ - public async call(name: string, ...args: Array): Promise { + public async call(name: string, args: ActionArguments = {}): Promise { this.throwIfActionIsMissing(name); await this.callHooks("before", name); diff --git a/packages/core-kernel/src/types/events.ts b/packages/core-kernel/src/types/events.ts deleted file mode 100644 index 44c9cb6546..0000000000 --- a/packages/core-kernel/src/types/events.ts +++ /dev/null @@ -1,3 +0,0 @@ -export type EventName = string | symbol; - -export type EventListener = (data: { name: EventName; data: any }) => void; diff --git a/packages/core-kernel/src/types/index.ts b/packages/core-kernel/src/types/index.ts index 82f25bb3a7..12c8a16de9 100644 --- a/packages/core-kernel/src/types/index.ts +++ b/packages/core-kernel/src/types/index.ts @@ -1,14 +1,13 @@ import { CacheStore as CacheStoreContract, Pipeline, Queue } from "../contracts/kernel"; -import * as Events from "./events"; export * from "type-fest"; export type KeyValuePair = Record; +export type ActionArguments = Record; + export type CacheStore = () => CacheStoreContract; export type PipelineFactory = () => Pipeline; export type QueueFactory = () => Queue; - -export { Events }; diff --git a/packages/core-p2p/src/event-listener.ts b/packages/core-p2p/src/event-listener.ts index da66b62144..cb661d4770 100644 --- a/packages/core-p2p/src/event-listener.ts +++ b/packages/core-p2p/src/event-listener.ts @@ -1,28 +1,21 @@ import { Container, Contracts, Enums } from "@arkecosystem/core-kernel"; -import { PeerConnector } from "./peer-connector"; +import { DisconnectPeer } from "./listeners"; // todo: review the implementation @Container.injectable() export class EventListener { + @Container.inject(Container.Identifiers.Application) + protected readonly app!: Contracts.Kernel.Application; + @Container.inject(Container.Identifiers.EventDispatcherService) private readonly emitter!: Contracts.Kernel.EventDispatcher; - @Container.inject(Container.Identifiers.PeerConnector) - private readonly connector!: PeerConnector; - - @Container.inject(Container.Identifiers.PeerStorage) - private readonly storage!: Contracts.P2P.PeerStorage; - @Container.inject(Container.Identifiers.PeerNetworkMonitor) private readonly networkMonitor!: Contracts.P2P.INetworkMonitor; public init() { - this.emitter.listen(Enums.PeerEvent.Disconnect, ({ data }) => { - this.connector.disconnect(data); - - this.storage.forgetPeer(data); - }); + this.emitter.listen(Enums.PeerEvent.Disconnect, this.app.resolve(DisconnectPeer)); const exitHandler = () => this.networkMonitor.stopServer(); diff --git a/packages/core-p2p/src/listeners.ts b/packages/core-p2p/src/listeners.ts new file mode 100644 index 0000000000..1a48b03679 --- /dev/null +++ b/packages/core-p2p/src/listeners.ts @@ -0,0 +1,83 @@ +import { Container, Contracts } from "@arkecosystem/core-kernel"; + +import { PeerConnector } from "./peer-connector"; +import { isValidVersion } from "./utils"; + +/** + * @class DisconnectInvalidPeers + * @implements {EventListener} + */ +@Container.injectable() +export class DisconnectInvalidPeers implements Contracts.Kernel.EventListener { + /** + * @private + * @type {Contracts.Kernel.Application} + * @memberof DisconnectInvalidPeers + */ + @Container.inject(Container.Identifiers.Application) + private readonly app!: Contracts.Kernel.Application; + + /** + * @private + * @type {Contracts.Kernel.EventDispatcher} + * @memberof DisconnectInvalidPeers + */ + @Container.inject(Container.Identifiers.EventDispatcherService) + private readonly emitter!: Contracts.Kernel.EventDispatcher; + + /** + * @private + * @type {Contracts.P2P.PeerStorage} + * @memberof DisconnectInvalidPeers + */ + @Container.inject(Container.Identifiers.PeerStorage) + private readonly storage!: Contracts.P2P.PeerStorage; + + /** + * @returns {Promise} + * @memberof DisconnectInvalidPeers + */ + public async handle(): Promise { + const peers: Contracts.P2P.Peer[] = this.storage.getPeers(); + + for (const peer of peers) { + if (!isValidVersion(this.app, peer)) { + this.emitter.dispatch("internal.p2p.disconnectPeer", { peer }); + } + } + } +} + +/** + * @class DisconnectPeer + * @implements {EventListener} + */ +@Container.injectable() +export class DisconnectPeer implements Contracts.Kernel.EventListener { + /** + * @private + * @type {PeerConnector} + * @memberof DisconnectPeer + */ + @Container.inject(Container.Identifiers.PeerConnector) + private readonly connector!: PeerConnector; + + /** + * @private + * @type {Contracts.P2P.PeerStorage} + * @memberof DisconnectPeer + */ + @Container.inject(Container.Identifiers.PeerStorage) + private readonly storage!: Contracts.P2P.PeerStorage; + + /** + * @param {*} {data} + * @returns {Promise} + * @memberof DisconnectPeer + */ + public async handle({ data }): Promise { + this.connector.disconnect(data); + + this.storage.forgetPeer(data); + } +} diff --git a/packages/core-p2p/src/peer-processor.ts b/packages/core-p2p/src/peer-processor.ts index 2cd7a80f55..9d76c8b26b 100644 --- a/packages/core-p2p/src/peer-processor.ts +++ b/packages/core-p2p/src/peer-processor.ts @@ -1,10 +1,11 @@ import { Container, Contracts, Enums, Providers, Utils as AppUtils } from "@arkecosystem/core-kernel"; import { Utils } from "@arkecosystem/crypto"; +import { DisconnectInvalidPeers } from "./listeners"; import { PeerCommunicator } from "./peer-communicator"; import { PeerConnector } from "./peer-connector"; import { AcceptNewPeerOptions, PeerFactory } from "./types"; -import { isValidVersion, isWhitelisted } from "./utils"; +import { isWhitelisted } from "./utils"; // todo: review the implementation @Container.injectable() @@ -34,7 +35,7 @@ export class PeerProcessor { private readonly serviceProviderRepository!: Providers.ServiceProviderRepository; public init() { - this.emitter.listen(Enums.CryptoEvent.MilestoneChanged, () => this.updatePeersAfterMilestoneChange()); + this.emitter.listen(Enums.CryptoEvent.MilestoneChanged, this.app.resolve(DisconnectInvalidPeers)); } public async validateAndAcceptPeer(peer: Contracts.P2P.Peer, options: AcceptNewPeerOptions = {}): Promise { @@ -76,15 +77,6 @@ export class PeerProcessor { return true; } - private updatePeersAfterMilestoneChange(): void { - const peers: Contracts.P2P.Peer[] = this.storage.getPeers(); - for (const peer of peers) { - if (!isValidVersion(this.app, peer)) { - this.emitter.dispatch("internal.p2p.disconnectPeer", { peer }); - } - } - } - private async acceptNewPeer(peer, options: AcceptNewPeerOptions = {}): Promise { if (this.storage.getPeer(peer.ip)) { return; diff --git a/packages/core-transaction-pool/src/connection.ts b/packages/core-transaction-pool/src/connection.ts index e52ca2c6d7..8ca98e3182 100644 --- a/packages/core-transaction-pool/src/connection.ts +++ b/packages/core-transaction-pool/src/connection.ts @@ -7,6 +7,7 @@ import { strictEqual } from "assert"; import differenceWith from "lodash.differencewith"; import { TransactionsProcessed } from "./interfaces"; +import { PurgeInvalidTransactions } from "./listeners"; import { Memory } from "./memory"; import { PoolWalletRepository } from "./pool-wallet-repository"; import { Processor } from "./processor"; @@ -75,7 +76,7 @@ export class Connection implements Contracts.TransactionPool.Connection { this.syncToPersistentStorage(); - this.emitter.listen(AppEnums.CryptoEvent.MilestoneChanged, () => this.purgeInvalidTransactions()); + this.emitter.listen(AppEnums.CryptoEvent.MilestoneChanged, new PurgeInvalidTransactions(this)); return this; } diff --git a/packages/core-transaction-pool/src/listeners.ts b/packages/core-transaction-pool/src/listeners.ts new file mode 100644 index 0000000000..0347b64d44 --- /dev/null +++ b/packages/core-transaction-pool/src/listeners.ts @@ -0,0 +1,23 @@ +import { Contracts } from "@arkecosystem/core-kernel"; + +/** + * @class PurgeInvalidTransactions + * @implements {EventListener} + */ +export class PurgeInvalidTransactions implements Contracts.Kernel.EventListener { + /** + * @param {Contracts.TransactionPool.Connection} pool + * @memberof PurgeInvalidTransactions + */ + // todo: inject + public constructor(private readonly pool: Contracts.TransactionPool.Connection) {} + + /** + * @returns {Promise} + * @memberof PurgeInvalidTransactions + */ + public async handle(): Promise { + // @ts-ignore - the interface is outdated + await this.pool.purgeInvalidTransactions(); + } +} diff --git a/packages/core-webhooks/src/listener.ts b/packages/core-webhooks/src/listener.ts index 8166acc1e7..33ddcb91c6 100644 --- a/packages/core-webhooks/src/listener.ts +++ b/packages/core-webhooks/src/listener.ts @@ -30,11 +30,11 @@ export class Listener { * @param {string} event * @memberof Listener */ - public async listen(event: string, payload: object): Promise { - const webhooks: Webhook[] = this.getWebhooks(event, payload); + public async handle({ name, data }): Promise { + const webhooks: Webhook[] = this.getWebhooks(name, data); for (const webhook of webhooks) { - await this.broadcast(webhook, payload); + await this.broadcast(webhook, data); } } diff --git a/packages/core-webhooks/src/service-provider.ts b/packages/core-webhooks/src/service-provider.ts index 0fa5bc36b0..9f77da4e77 100644 --- a/packages/core-webhooks/src/service-provider.ts +++ b/packages/core-webhooks/src/service-provider.ts @@ -1,4 +1,4 @@ -import { Enums, Providers, Types } from "@arkecosystem/core-kernel"; +import { Providers, Types } from "@arkecosystem/core-kernel"; import { Database } from "./database"; import { Identifiers } from "./identifiers"; @@ -65,8 +65,6 @@ export class ServiceProvider extends Providers.ServiceProvider { * @memberof ServiceProvider */ private startListeners(): void { - for (const event of Object.values(Enums.StateEvent)) { - this.app.events.listen(event, async payload => this.app.resolve(Listener).listen(event, payload)); - } + this.app.events.listen("*", this.app.resolve(Listener)); } } diff --git a/yarn.lock b/yarn.lock index 4412f43095..e6beb341ff 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5522,11 +5522,6 @@ eventemitter3@^3.1.0: resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-3.1.2.tgz#2d3d48f9c346698fce83a85d7d664e98535df6e7" integrity sha512-tvtQIeLVHjDkJYnzf2dgVMxfuSGJeM/7UCG17TT4EumTfNtF+0nebF/4zWOIkCreAbtNqhGEboB6BWrwqNaw4Q== -eventemitter3@^4.0.0: - version "4.0.0" - resolved "https://registry.yarnpkg.com/eventemitter3/-/eventemitter3-4.0.0.tgz#d65176163887ee59f386d64c82610b696a4a74eb" - integrity sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg== - evp_bytestokey@^1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/evp_bytestokey/-/evp_bytestokey-1.0.3.tgz#7fcbdb198dc71959432efe13842684e0525acb02" @@ -9291,26 +9286,11 @@ p-queue@^4.0.0: dependencies: eventemitter3 "^3.1.0" -p-queue@^6.2.0: - version "6.2.0" - resolved "https://registry.yarnpkg.com/p-queue/-/p-queue-6.2.0.tgz#c8122b9514d2bbe5f16d8a47f17dc2f9a8ac7235" - integrity sha512-B2LXNONcyn/G6uz2UBFsGjmSa0e/br3jznlzhEyCXg56c7VhEpiT2pZxGOfv32Q3FSyugAdys9KGpsv3kV+Sbg== - dependencies: - eventemitter3 "^4.0.0" - p-timeout "^3.1.0" - p-reduce@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/p-reduce/-/p-reduce-1.0.0.tgz#18c2b0dd936a4690a529f8231f58a0fdb6a47dfa" integrity sha1-GMKw3ZNqRpClKfgjH1ig/bakffo= -p-timeout@^3.1.0: - version "3.2.0" - resolved "https://registry.yarnpkg.com/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe" - integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg== - dependencies: - p-finally "^1.0.0" - p-try@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/p-try/-/p-try-1.0.0.tgz#cbc79cdbaf8fd4228e13f621f2b1a237c1b207b3"