From 9796d138a0be797f3964bf36545e96a2a493fdd9 Mon Sep 17 00:00:00 2001 From: air1one <36802613+air1one@users.noreply.github.com> Date: Fri, 17 Jul 2020 07:54:21 +0200 Subject: [PATCH] feat(core-p2p): split into 3 ports for blocks / transactions / others (#3896) --- __tests__/unit/core-forger/client.test.ts | 42 ++- .../controllers/entities.test.ts | 6 +- __tests__/unit/core-p2p/listeners.test.ts | 6 +- .../unit/core-p2p/network-monitor.test.ts | 7 +- .../unit/core-p2p/peer-communicator.test.ts | 87 +++-- .../unit/core-p2p/peer-connector.test.ts | 40 +-- .../unit/core-p2p/peer-processor.test.ts | 2 +- .../socket-server/controllers/blocks.test.ts | 306 ++++++++++++++++++ .../socket-server/controllers/peer.test.ts | 254 +-------------- .../controllers/transactions.test.ts | 83 +++++ .../core-p2p/socket-server/server.test.ts | 16 +- packages/core-forger/src/client.ts | 42 ++- packages/core-forger/src/interfaces.ts | 8 +- .../src/contracts/p2p/peer-connector.ts | 8 +- packages/core-p2p/src/enums.ts | 6 + packages/core-p2p/src/listeners.ts | 7 +- packages/core-p2p/src/network-monitor.ts | 6 +- packages/core-p2p/src/peer-communicator.ts | 58 ++-- packages/core-p2p/src/peer-connector.ts | 24 +- packages/core-p2p/src/peer-processor.ts | 5 +- packages/core-p2p/src/schemas.ts | 18 +- .../src/socket-server/controllers/blocks.ts | 107 ++++++ .../src/socket-server/controllers/peer.ts | 110 +------ .../socket-server/controllers/transactions.ts | 15 + .../src/socket-server/plugins/validate.ts | 4 + .../src/socket-server/routes/blocks.ts | 27 ++ .../core-p2p/src/socket-server/routes/peer.ts | 16 - .../src/socket-server/routes/transactions.ts | 22 ++ .../src/socket-server/schemas/blocks.ts | 17 + .../src/socket-server/schemas/peer.ts | 18 -- .../src/socket-server/schemas/transactions.ts | 7 + packages/core-p2p/src/socket-server/server.ts | 77 ++++- .../src/socket-server/utils/get-peer-port.ts | 22 ++ 33 files changed, 881 insertions(+), 592 deletions(-) create mode 100644 __tests__/unit/core-p2p/socket-server/controllers/blocks.test.ts create mode 100644 __tests__/unit/core-p2p/socket-server/controllers/transactions.test.ts create mode 100644 packages/core-p2p/src/socket-server/controllers/blocks.ts create mode 100644 packages/core-p2p/src/socket-server/controllers/transactions.ts create mode 100644 packages/core-p2p/src/socket-server/routes/blocks.ts create mode 100644 packages/core-p2p/src/socket-server/routes/transactions.ts create mode 100644 packages/core-p2p/src/socket-server/schemas/blocks.ts create mode 100644 packages/core-p2p/src/socket-server/schemas/transactions.ts create mode 100644 packages/core-p2p/src/socket-server/utils/get-peer-port.ts diff --git a/__tests__/unit/core-forger/client.test.ts b/__tests__/unit/core-forger/client.test.ts index 593a31193a..e9f9e7f3bd 100644 --- a/__tests__/unit/core-forger/client.test.ts +++ b/__tests__/unit/core-forger/client.test.ts @@ -28,7 +28,7 @@ afterEach(() => { describe("Client", () => { let client: Client; - const host = { hostname: "127.0.0.1", port: 4000, socket: undefined }; + const host = { hostname: "127.0.0.1", port: 4000, blocksSocket: undefined, internalSocket: undefined }; const hosts = [host]; beforeEach(() => { @@ -42,14 +42,19 @@ describe("Client", () => { it("should register hosts", async () => { client.register(hosts); expect(Nes.Client).toHaveBeenCalledWith(`ws://${host.hostname}:${host.port}`); - expect(client.hosts).toEqual([{ ...host, socket: expect.anything() }]); + expect(client.hosts).toEqual([{ ...host, blocksSocket: expect.anything(), internalSocket: expect.anything() }]); }); it("on error the socket should call logger", () => { client.register(hosts); const fakeError = { message: "Fake Error" }; - client.hosts[0].socket.onError(fakeError); + client.hosts[0].blocksSocket.onError(fakeError); + + expect(logger.error).toHaveBeenCalledWith("Fake Error"); + + logger.error.mockReset(); + client.hosts[0].internalSocket.onError(fakeError); expect(logger.error).toHaveBeenCalledWith("Fake Error"); }); @@ -59,8 +64,10 @@ describe("Client", () => { it("should call disconnect on all sockets", () => { client.register([host, { hostname: "127.0.0.5", port: 4000 }]); client.dispose(); - expect(client.hosts[0].socket.disconnect).toHaveBeenCalled(); - expect(client.hosts[1].socket.disconnect).toHaveBeenCalled(); + expect(client.hosts[0].blocksSocket.disconnect).toHaveBeenCalled(); + expect(client.hosts[0].internalSocket.disconnect).toHaveBeenCalled(); + expect(client.hosts[1].blocksSocket.disconnect).toHaveBeenCalled(); + expect(client.hosts[1].internalSocket.disconnect).toHaveBeenCalled(); expect(nesClient.disconnect).toBeCalled(); }); }); @@ -80,11 +87,11 @@ describe("Client", () => { it("should not broadcast block when there is an issue with socket", async () => { client.register(hosts); - host.socket = {}; + host.blocksSocket = {}; await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve(); expect(logger.error).toHaveBeenCalledWith( - `Broadcast block failed: Request to ${host.hostname}:${host.port} failed, because of 'this.host.socket.request is not a function'.`, + `Broadcast block failed: Request to ${host.hostname}:${host.port} failed, because of 'socket.request is not a function'.`, ); }); @@ -93,7 +100,7 @@ describe("Client", () => { await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve(); expect(nesClient.request).toHaveBeenCalledWith({ - path: "p2p.peer.postBlock", + path: "p2p.blocks.postBlock", headers: {}, method: "POST", payload: { block: expect.anything() }, @@ -108,7 +115,7 @@ describe("Client", () => { await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve(); expect(logger.error).toHaveBeenCalledWith( - `Broadcast block failed: Request to ${host.hostname}:${host.port} failed, because of 'oops'.`, + `Broadcast block failed: Request to ${host.hostname}:${host.port} failed, because of 'oops'.`, ); }); }); @@ -121,7 +128,8 @@ describe("Client", () => { }); it("should select the first open socket", async () => { - hosts[4].socket._isReady = () => true; + hosts[4].blocksSocket._isReady = () => true; + hosts[4].internalSocket._isReady = () => true; client.register(hosts); client.selectHost(); @@ -129,7 +137,10 @@ describe("Client", () => { }); it("should log debug message when no sockets are open", async () => { - hosts.forEach((host) => (host.socket._isReady = () => false)); + hosts.forEach((host) => { + host.internalSocket._isReady = () => false; + host.blocksSocket._isReady = () => false; + }); client.register(hosts); await expect(client.selectHost()).rejects.toThrow( @@ -159,7 +170,8 @@ describe("Client", () => { describe("getRound", () => { it("should broadcast internal getRound transaction", async () => { client.register([host]); - host.socket._isReady = () => true; + host.blocksSocket._isReady = () => true; + host.internalSocket._isReady = () => true; await client.getRound(); @@ -175,7 +187,8 @@ describe("Client", () => { describe("syncWithNetwork", () => { it("should broadcast internal getRound transaction", async () => { client.register([host]); - host.socket._isReady = () => true; + host.blocksSocket._isReady = () => true; + host.internalSocket._isReady = () => true; await client.syncWithNetwork(); expect(nesClient.request).toHaveBeenCalledWith({ @@ -190,7 +203,8 @@ describe("Client", () => { it("should log error message if syncing fails", async () => { const errorMessage = "Fake Error"; nesClient.request.mockRejectedValueOnce(new Error(errorMessage)); - host.socket._isReady = () => true; + host.blocksSocket._isReady = () => true; + host.internalSocket._isReady = () => true; client.register([host]); await expect(client.syncWithNetwork()).toResolve(); expect(logger.error).toHaveBeenCalledWith( diff --git a/__tests__/unit/core-magistrate-api/controllers/entities.test.ts b/__tests__/unit/core-magistrate-api/controllers/entities.test.ts index 9080a2c219..b57c015c01 100644 --- a/__tests__/unit/core-magistrate-api/controllers/entities.test.ts +++ b/__tests__/unit/core-magistrate-api/controllers/entities.test.ts @@ -102,7 +102,7 @@ describe("EntityController", () => { const request: Hapi.Request = { params: { - id: senderWallet.publicKey, + id: registrationTxId, }, }; @@ -112,7 +112,7 @@ describe("EntityController", () => { it("should return error if entity is not found", async () => { const request: Hapi.Request = { params: { - id: senderWallet.publicKey, + id: registrationTxId, }, }; @@ -134,7 +134,7 @@ describe("EntityController", () => { const request: Hapi.Request = { params: { - id: senderWallet.address, + id: registrationTxId, }, }; diff --git a/__tests__/unit/core-p2p/listeners.test.ts b/__tests__/unit/core-p2p/listeners.test.ts index 634112d756..20d8321de1 100644 --- a/__tests__/unit/core-p2p/listeners.test.ts +++ b/__tests__/unit/core-p2p/listeners.test.ts @@ -48,7 +48,7 @@ describe("DisconnectInvalidPeers", () => { it("should emit 'internal.p2p.disconnectPeer' for invalid version peers", async () => { await disconnectInvalidPeers.handle(); - expect(emitter.dispatch).toBeCalledTimes(2); // 2 invalid peers version + expect(emitter.dispatch).toBeCalledTimes(2 * 3); // 2 invalid peers version * 3 sockets (ports) per peer }); }); }); @@ -76,12 +76,12 @@ describe("DisconnectPeer", () => { describe("handle", () => { it("should disconnect the peer provided", async () => { const peer = new Peer("187.176.1.1", 4000); - await disconnectPeer.handle({ data: { peer: peer } }); + await disconnectPeer.handle({ data: { peer: peer, port: 4000 } }); expect(storage.forgetPeer).toBeCalledTimes(1); expect(storage.forgetPeer).toBeCalledWith(peer); expect(connector.disconnect).toBeCalledTimes(1); - expect(connector.disconnect).toBeCalledWith(peer); + expect(connector.disconnect).toBeCalledWith(peer, 4000); }); }); }); diff --git a/__tests__/unit/core-p2p/network-monitor.test.ts b/__tests__/unit/core-p2p/network-monitor.test.ts index 0cc4316373..e02d590924 100644 --- a/__tests__/unit/core-p2p/network-monitor.test.ts +++ b/__tests__/unit/core-p2p/network-monitor.test.ts @@ -7,6 +7,7 @@ import { Blocks } from "@arkecosystem/crypto"; import delay from "delay"; import { cloneDeep } from "lodash"; import path from "path"; +import { PortsOffset } from "@arkecosystem/core-p2p/src/enums"; describe("NetworkMonitor", () => { let networkMonitor: NetworkMonitor; @@ -394,8 +395,10 @@ describe("NetworkMonitor", () => { await networkMonitor.cleansePeers({ peerCount: 5 }); expect(communicator.ping).toBeCalledTimes(peers.length); - expect(emitter.dispatch).toBeCalledTimes(2); - expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers) }); + expect(emitter.dispatch).toBeCalledTimes(4); // 3 for disconnecting each peer port + 1 for peer removed event + for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) { + expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers), port }); + } expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Removed, expect.toBeOneOf(peers)); }); diff --git a/__tests__/unit/core-p2p/peer-communicator.test.ts b/__tests__/unit/core-p2p/peer-communicator.test.ts index 97f74af671..c79ea3a708 100644 --- a/__tests__/unit/core-p2p/peer-communicator.test.ts +++ b/__tests__/unit/core-p2p/peer-communicator.test.ts @@ -13,11 +13,22 @@ import { PeerVerificationResult } from "@arkecosystem/core-p2p/src/peer-verifier import { replySchemas } from "@arkecosystem/core-p2p/src/schemas"; import { Blocks, Identities, Managers, Transactions, Utils } from "@arkecosystem/crypto"; import delay from "delay"; +import { PortsOffset } from "@arkecosystem/core-p2p/src/enums"; Managers.configManager.getMilestone().aip11 = true; const cloneObject = (obj) => JSON.parse(JSON.stringify(obj)); +const basePort = 4000; +const mapEventToPort = { + "p2p.blocks.postBlock": basePort + PortsOffset.Blocks, + "p2p.blocks.getBlocks": basePort + PortsOffset.Blocks, + "p2p.transactions.postTransactions": basePort + PortsOffset.Transactions, + "p2p.peer.getStatus": basePort + PortsOffset.Peer, + "p2p.peer.getPeers": basePort + PortsOffset.Peer, + "p2p.peer.getCommonBlocks": basePort + PortsOffset.Peer, +}; + describe("PeerCommunicator", () => { let peerCommunicator: PeerCommunicator; @@ -51,8 +62,8 @@ describe("PeerCommunicator", () => { }); describe("postBlock", () => { - it("should use connector to emit p2p.peer.postBlock", async () => { - const event = "p2p.peer.postBlock"; + it("should use connector to emit p2p.blocks.postBlock", async () => { + const event = "p2p.blocks.postBlock"; const block = { data: { id: "17882607875259085966", @@ -87,20 +98,20 @@ describe("PeerCommunicator", () => { await peerCommunicator.postBlock(peer, payload.block); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, { block: expect.any(Buffer) }); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, { block: expect.any(Buffer) }); }); }); describe("postTransactions", () => { - it("should use connector to emit p2p.peer.postTransactions", async () => { - const event = "p2p.peer.postTransactions"; + it("should use connector to emit p2p.transactions.postTransactions", async () => { + const event = "p2p.transactions.postTransactions"; const payload = { transactions: [] }; const peer = new Peer("187.168.65.65", 4000); await peerCommunicator.postTransactions(peer, payload.transactions); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); }); }); describe("ping", () => { @@ -143,7 +154,7 @@ describe("PeerCommunicator", () => { await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerStatusResponseError); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); }); it("should throw PeerStatusResponseError when there is no reply schema for getStatus", async () => { @@ -160,7 +171,7 @@ describe("PeerCommunicator", () => { await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerStatusResponseError); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); replySchemas["p2p.peer.getStatus"] = getStatusReplySchema; }); @@ -185,7 +196,7 @@ describe("PeerCommunicator", () => { await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerVerificationFailedError); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); }, ); @@ -203,7 +214,7 @@ describe("PeerCommunicator", () => { await expect(peerCommunicator.ping(peer, timeout)).rejects.toThrow(PeerPingTimeoutError); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); }); it("should throw PeerVerificationFailedError when verification fails", async () => { @@ -216,7 +227,7 @@ describe("PeerCommunicator", () => { await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerVerificationFailedError); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); }); it("should not throw otherwise", async () => { @@ -230,7 +241,7 @@ describe("PeerCommunicator", () => { const pingResult = await peerCommunicator.ping(peer, 6000); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); expect(pingResult).toEqual(baseGetStatusResponse.state); }); }); @@ -250,7 +261,7 @@ describe("PeerCommunicator", () => { const pingResult = await peerCommunicator.ping(peer, 1000); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, {}); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, {}); expect(pingResult).toEqual(baseGetStatusResponse.state); expect(peer.state).toEqual(baseGetStatusResponse.state); expect(peer.plugins).toEqual(baseGetStatusResponse.config.plugins); @@ -326,8 +337,10 @@ describe("PeerCommunicator", () => { "network.nethash", )}, his=${wrongNethash}.`, ); - expect(emitter.dispatch).toBeCalledTimes(1); - expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer }); + expect(emitter.dispatch).toBeCalledTimes(3); + for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) { + expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer, port }); + } }); it("should set peer ports = -1 when pinging the port fails", async () => { @@ -358,34 +371,10 @@ describe("PeerCommunicator", () => { const getPeersResult = await peerCommunicator.getPeers(peer); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); expect(getPeersResult).toEqual(mockConnectorResponse.payload); }); - it("should throttle outgoing requests when passing rate limit", async () => { - configuration.getOptional = jest.fn().mockReturnValueOnce(2); // global rate limit = 2 - peerCommunicator.initialize(); // to make rate limit kick in - - const event = "p2p.peer.getPeers"; - const payload = {}; - const peer = new Peer("187.168.65.65", 4000); - - const mockConnectorResponse = { payload: [{ ip: "177.176.1.1", port: 4000 }] }; - connector.emit = jest.fn().mockReturnValue(mockConnectorResponse); - const getPeersResult1 = await peerCommunicator.getPeers(peer); - const getPeersResult2 = await peerCommunicator.getPeers(peer); - - expect(connector.emit).toBeCalledTimes(2); - expect(connector.emit).toBeCalledWith(peer, event, payload); - expect(getPeersResult1).toEqual(mockConnectorResponse.payload); - expect(getPeersResult2).toEqual(mockConnectorResponse.payload); - expect(logger.debug).toBeCalledWith( - `Throttling outgoing requests to ${peer.ip}/${event} to avoid triggering their rate limit`, - ); - - connector.emit = jest.fn(); - }); - it.each([[true], [false]])("should return undefined when emit fails", async (throwErrorInstance) => { const event = "p2p.peer.getPeers"; const payload = {}; @@ -396,7 +385,7 @@ describe("PeerCommunicator", () => { const getPeersResult = await peerCommunicator.getPeers(peer); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); expect(getPeersResult).toBeUndefined(); }); @@ -410,7 +399,7 @@ describe("PeerCommunicator", () => { const getPeersResult = await peerCommunicator.getPeers(peer); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); expect(getPeersResult).toBeUndefined(); expect(logger.debug).toBeCalledWith(expect.stringContaining("Got unexpected reply from")); }); @@ -427,7 +416,7 @@ describe("PeerCommunicator", () => { const hasCommonBlocksResult = await peerCommunicator.hasCommonBlocks(peer, payload.ids, 1000); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); expect(hasCommonBlocksResult).toEqual(mockConnectorResponse.payload.common); }); @@ -441,7 +430,7 @@ describe("PeerCommunicator", () => { const hasCommonBlocksResult = await peerCommunicator.hasCommonBlocks(peer, payload.ids, 6000); expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, payload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, payload); expect(hasCommonBlocksResult).toBe(false); }); }); @@ -476,8 +465,8 @@ describe("PeerCommunicator", () => { ], }; - it.each([[true], [false]])("should use connector to emit p2p.peer.getBlocks", async (withTransactions) => { - const event = "p2p.peer.getBlocks"; + it.each([[true], [false]])("should use connector to emit p2p.blocks.getBlocks", async (withTransactions) => { + const event = "p2p.blocks.getBlocks"; const options = { fromBlockHeight: 1, blockLimit: 1, @@ -500,12 +489,12 @@ describe("PeerCommunicator", () => { serialized: true, }; expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, expectedEmitPayload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, expectedEmitPayload); expect(getPeerBlocksResult).toEqual(mockConnectorResponse.payload); }); it("should log a debug message when peer did not return any block", async () => { - const event = "p2p.peer.getBlocks"; + const event = "p2p.blocks.getBlocks"; const options = { fromBlockHeight: 1, headersOnly: false, @@ -523,7 +512,7 @@ describe("PeerCommunicator", () => { serialized: true, }; expect(connector.emit).toBeCalledTimes(1); - expect(connector.emit).toBeCalledWith(peer, event, expectedEmitPayload); + expect(connector.emit).toBeCalledWith(peer, mapEventToPort[event], event, expectedEmitPayload); expect(getPeerBlocksResult).toEqual([]); expect(logger.debug).toBeCalledWith( `Peer ${peer.ip} did not return any blocks via height ${options.fromBlockHeight}.`, diff --git a/__tests__/unit/core-p2p/peer-connector.test.ts b/__tests__/unit/core-p2p/peer-connector.test.ts index 8c3df1da1c..1c353e2478 100644 --- a/__tests__/unit/core-p2p/peer-connector.test.ts +++ b/__tests__/unit/core-p2p/peer-connector.test.ts @@ -14,6 +14,8 @@ describe("PeerConnector", () => { let peerConnector: PeerConnector; let logger; + const port = 4000; + beforeEach(() => { logger = { warning: jest.fn(), debug: jest.fn(), error: jest.fn(), info: jest.fn() }; @@ -32,8 +34,8 @@ describe("PeerConnector", () => { it("should return the connections", async () => { const peers = [new Peer("178.165.55.44", 4000), new Peer("178.165.55.33", 4000)]; - await peerConnector.connect(peers[0]); - await peerConnector.connect(peers[1]); + await peerConnector.connect(peers[0], port); + await peerConnector.connect(peers[1], port); expect(peerConnector.all()).toBeArrayOfSize(2); }); @@ -42,31 +44,31 @@ describe("PeerConnector", () => { describe("connection", () => { it("should return the connection", async () => { const peers = [new Peer("178.165.55.44", 4000), new Peer("178.165.55.33", 4000)]; - await peerConnector.connect(peers[0]); - await peerConnector.connect(peers[1]); + await peerConnector.connect(peers[0], port); + await peerConnector.connect(peers[1], port); - expect(peerConnector.connection(peers[0])).toBeInstanceOf(NesClient); - expect(peerConnector.connection(peers[1])).toBeInstanceOf(NesClient); + expect(peerConnector.connection(peers[0], port)).toBeInstanceOf(NesClient); + expect(peerConnector.connection(peers[1], port)).toBeInstanceOf(NesClient); }); it("should return undefined if there is no connection", async () => { const peerNotAdded = new Peer("178.0.0.0", 4000); - expect(peerConnector.connection(peerNotAdded)).toBeUndefined(); + expect(peerConnector.connection(peerNotAdded, port)).toBeUndefined(); }); }); describe("connect", () => { it("should set the connection in the connections and return it", async () => { const peer = new Peer("178.165.55.11", 4000); - const peerConnection = await peerConnector.connect(peer); + const peerConnection = await peerConnector.connect(peer, port); expect(peerConnection).toBeInstanceOf(NesClient); - expect(peerConnection).toBe(peerConnector.connection(peer)); + expect(peerConnection).toBe(peerConnector.connection(peer, port)); }); it("should log if error on connection", async () => { const peer = new Peer("178.165.55.11", 4000); - const peerConnection = await peerConnector.connect(peer); + const peerConnection = await peerConnector.connect(peer, port); peerConnection.onError(new Error("dummy")); @@ -78,23 +80,23 @@ describe("PeerConnector", () => { describe("disconnect", () => { it("should call disconnect on the connection and forget it", async () => { const peer = new Peer("178.165.55.11", 4000); - const peerConnection = await peerConnector.connect(peer); + const peerConnection = await peerConnector.connect(peer, port); const spyDisconnect = jest.spyOn(peerConnection, "disconnect"); - expect(peerConnector.connection(peer)).toBeInstanceOf(NesClient); + expect(peerConnector.connection(peer, port)).toBeInstanceOf(NesClient); - peerConnector.disconnect(peer); - expect(peerConnector.connection(peer)).toBeUndefined(); + peerConnector.disconnect(peer, port); + expect(peerConnector.connection(peer, port)).toBeUndefined(); expect(spyDisconnect).toBeCalledTimes(1); }); it("should not do anything if the peer is not defined", async () => { const peer = new Peer("178.165.0.0", 4000); - expect(peerConnector.connection(peer)).toBeUndefined(); + expect(peerConnector.connection(peer, port)).toBeUndefined(); - peerConnector.disconnect(peer); - expect(peerConnector.connection(peer)).toBeUndefined(); + peerConnector.disconnect(peer, port); + expect(peerConnector.connection(peer, port)).toBeUndefined(); }); }); @@ -102,13 +104,13 @@ describe("PeerConnector", () => { it("should connect to the peer and call connection.request", async () => { const peer = new Peer("178.165.11.12", 4000); - const peerConnection = await peerConnector.connect(peer); + const peerConnection = await peerConnector.connect(peer, port); const mockResponse = { payload: "mock payload" }; // @ts-ignore const spyRequest = jest.spyOn(peerConnection, "request").mockReturnValue(mockResponse); - const response = await peerConnector.emit(peer, "p2p.peer.getStatus", {}); + const response = await peerConnector.emit(peer, port, "p2p.peer.getStatus", {}); expect(spyRequest).toBeCalledTimes(1); expect(response).toEqual(mockResponse); diff --git a/__tests__/unit/core-p2p/peer-processor.test.ts b/__tests__/unit/core-p2p/peer-processor.test.ts index 98238b26c9..035e86632d 100644 --- a/__tests__/unit/core-p2p/peer-processor.test.ts +++ b/__tests__/unit/core-p2p/peer-processor.test.ts @@ -99,7 +99,7 @@ describe("PeerProcessor", () => { expect(peerStorage.setPendingPeer).toBeCalledTimes(1); expect(peerCommunicator.ping).toBeCalledTimes(1); expect(peerStorage.setPeer).toBeCalledTimes(0); - expect(peerConnector.disconnect).toBeCalledTimes(1); + expect(peerConnector.disconnect).toBeCalledTimes(3); // disconnect on the 3 sockets (3 ports) for the peer }); it("should not do anything if peer is already added", async () => { diff --git a/__tests__/unit/core-p2p/socket-server/controllers/blocks.test.ts b/__tests__/unit/core-p2p/socket-server/controllers/blocks.test.ts new file mode 100644 index 0000000000..f56d5d3ee9 --- /dev/null +++ b/__tests__/unit/core-p2p/socket-server/controllers/blocks.test.ts @@ -0,0 +1,306 @@ +import { Container } from "@arkecosystem/core-kernel"; +import { BlocksController } from "@arkecosystem/core-p2p/src/socket-server/controllers/blocks"; +import { TooManyTransactionsError, UnchainedBlockError } from "@arkecosystem/core-p2p/src/socket-server/errors"; +import { Blocks, Identities, Managers, Networks, Transactions, Utils, Interfaces } from "@arkecosystem/crypto"; + +Managers.configManager.getMilestone().aip11 = true; // for creating aip11 v2 transactions + +describe("BlocksController", () => { + let blocksController: BlocksController; + + const container = new Container.Container(); + + const logger = { warning: jest.fn(), debug: jest.fn(), info: jest.fn() }; + const peerStorage = { getPeers: jest.fn() }; + const database = { getCommonBlocks: jest.fn(), getBlocksForDownload: jest.fn() }; + const blockchain = { + getLastBlock: jest.fn(), + handleIncomingBlock: jest.fn(), + pingBlock: jest.fn(), + getLastDownloadedBlock: jest.fn(), + }; + const createProcessor = jest.fn(); + const appPlugins = [{ package: "@arkecosystem/core-api", options: {} }]; + const coreApiServiceProvider = { + name: () => "core-api", + configDefaults: () => ({ + server: { http: { port: 4003 } }, + }), + }; + const serviceProviders = { "@arkecosystem/core-api": coreApiServiceProvider }; + const configRepository = { get: () => appPlugins }; // get("app.plugins") + const serviceProviderRepository = { get: (plugin) => serviceProviders[plugin] }; + const appGet = { + [Container.Identifiers.BlockchainService]: blockchain, + [Container.Identifiers.TransactionPoolProcessorFactory]: createProcessor, + [Container.Identifiers.ConfigRepository]: configRepository, + [Container.Identifiers.ServiceProviderRepository]: serviceProviderRepository, + }; + const config = { getOptional: jest.fn().mockReturnValue(["127.0.0.1"]) }; // remoteAccess + const app = { + get: (key) => appGet[key], + getTagged: () => config, + version: () => "3.0.9", + resolve: () => ({ + from: () => ({ + merge: () => ({ + all: () => ({ + server: { http: { port: "4003" } }, + options: { + estimateTotalCount: true, + }, + }), + }), + }), + }), + }; + + beforeAll(() => { + container.unbindAll(); + container.bind(Container.Identifiers.LogService).toConstantValue(logger); + container.bind(Container.Identifiers.PeerStorage).toConstantValue(peerStorage); + container.bind(Container.Identifiers.DatabaseService).toConstantValue(database); + container.bind(Container.Identifiers.Application).toConstantValue(app); + }); + + beforeEach(() => { + blocksController = container.resolve(BlocksController); + }); + + describe("postBlock", () => { + const block = { + data: { + id: "3863292773792902701", + version: 0, + timestamp: 46583330, + height: 2, + reward: Utils.BigNumber.make("0"), + previousBlock: "17184958558311101492", + numberOfTransactions: 1, + totalAmount: Utils.BigNumber.make("0"), + totalFee: Utils.BigNumber.make("0"), + payloadLength: 0, + payloadHash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", + generatorPublicKey: "026c598170201caf0357f202ff14f365a3b09322071e347873869f58d776bfc565", + blockSignature: + "3045022100e7385c6ea42bd950f7f6ab8c8619cf2f66a41d8f8f185b0bc99af032cb25f30d02200b6210176a6cedfdcbe483167fd91c21d740e0e4011d24d679c601fdd46b0de9", + }, + transactions: [ + Transactions.BuilderFactory.transfer() + .amount("100") + .recipientId(Identities.Address.fromPassphrase("recipient's secret")) + .fee("100") + .sign("sender's secret") + .build(), + ], + } as Blocks.Block; + const deepClone = (obj) => JSON.parse(JSON.stringify(obj)); + + describe("when block contains too many transactions", () => { + it("should throw TooManyTransactionsError when numberOfTransactions is too much", async () => { + const blockTooManyTxs = deepClone(block); + blockTooManyTxs.data.numberOfTransactions = 350; + const blockSerialized = Blocks.Serializer.serializeWithTransactions({ + ...blockTooManyTxs.data, + transactions: blockTooManyTxs.transactions.map((tx) => tx.data), + }); + + await expect( + blocksController.postBlock({ payload: { block: { data: blockSerialized } } }, {}), + ).rejects.toBeInstanceOf(TooManyTransactionsError); + }); + + it("should throw TooManyTransactionsError when transactions.length is too much", async () => { + blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); + const blockTooManyTxs = deepClone(block); + + const transactions: Interfaces.ITransaction[] = []; + for (let i = 0; i < 2; i++) { + transactions.push( + Transactions.BuilderFactory.transfer() + .version(2) + .amount("100") + .recipientId(Identities.Address.fromPassphrase(`recipient secret ${i}`)) + .fee("100") + .nonce(`${i + 1}`) + .sign(`sender secret ${i}`) + .build(), + ); + } + blockTooManyTxs.transactions = transactions; + blockTooManyTxs.data.numberOfTransactions = 2; + + const blockSerialized = Blocks.Serializer.serializeWithTransactions({ + ...blockTooManyTxs.data, + transactions: transactions.map((tx) => tx.data), + }); + + // this is a trick to make the first numberOfTransactions check pass + // but then transactions.length fail + // probably some unreachable code though... + const milestone = Managers.configManager.getMilestone(); + const spyGetMilestone = jest.spyOn(Managers.configManager, "getMilestone"); + for (let i = 0; i < 113; i++) { + // yeah 113 times :wtf: before the one we are interested to mock kicks in + spyGetMilestone.mockReturnValueOnce({ + ...milestone, + block: { + maxTransactions: 150, + }, + }); + } + spyGetMilestone.mockReturnValueOnce({ + ...milestone, + block: { + maxTransactions: 1, + }, + }); + + + await expect( + blocksController.postBlock( + { + payload: { block: { data: blockSerialized } }, + info: { remoteAddress: "187.55.33.22" }, + }, + {}, + ), + ).rejects.toBeInstanceOf(TooManyTransactionsError); + + spyGetMilestone.mockRestore(); + }); + }); + + describe("when block is not chained", () => { + it.each([[true], [false]])( + "should throw UnchainedBlockError only if block is not known", + async (blockPing) => { + blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); + const blockUnchained = deepClone(block); + blockUnchained.data.height = 9; + const blockSerialized = Blocks.Serializer.serializeWithTransactions({ + ...blockUnchained.data, + transactions: blockUnchained.transactions.map((tx) => tx.data), + }); + + if (blockPing) { + blockchain.pingBlock = jest.fn().mockReturnValueOnce(true); + await expect( + blocksController.postBlock( + { + payload: { block: { data: blockSerialized } }, + info: { remoteAddress: "187.55.33.22" }, + }, + {}, + ), + ).toResolve(); + expect(blockchain.handleIncomingBlock).toBeCalledTimes(0); + } else { + await expect( + blocksController.postBlock( + { + payload: { block: { data: blockSerialized } }, + info: { remoteAddress: "187.55.33.22" }, + }, + {}, + ), + ).rejects.toBeInstanceOf(UnchainedBlockError); + } + }, + ); + }); + + describe("when block comes from forger", () => { + it("should call handleIncomingBlock with the block and fromForger=true", async () => { + blockchain.handleIncomingBlock = jest.fn(); + const ip = "187.55.33.22"; + config.getOptional.mockReturnValueOnce([ip]); + + const blockSerialized = Blocks.Serializer.serializeWithTransactions({ + ...block.data, + transactions: block.transactions.map((tx) => tx.data), + }); + await blocksController.postBlock( + { + payload: { block: { data: blockSerialized } }, + info: { remoteAddress: ip }, + }, + {}, + ); + + expect(blockchain.handleIncomingBlock).toBeCalledTimes(1); + expect(blockchain.handleIncomingBlock).toBeCalledWith(expect.objectContaining(block.data), true); + }); + }); + + describe("when block does not come from forger", () => { + it("should call handleIncomingBlock with the block and fromForger=false", async () => { + blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); + blockchain.handleIncomingBlock = jest.fn(); + const ip = "187.55.33.22"; + config.getOptional.mockReturnValueOnce(["188.66.55.44"]); + + const blockSerialized = Blocks.Serializer.serializeWithTransactions({ + ...block.data, + transactions: block.transactions.map((tx) => tx.data), + }); + await blocksController.postBlock( + { + payload: { block: { data: blockSerialized } }, + info: { remoteAddress: ip }, + }, + {}, + ); + + expect(blockchain.handleIncomingBlock).toBeCalledTimes(1); + expect(blockchain.handleIncomingBlock).toBeCalledWith(expect.objectContaining(block.data), false); + }); + }); + }); + + describe("getBlocks", () => { + it("should use database.getBlocksForDownload to get the blocks according to the request params", async () => { + // request parameters: lastBlockHeight, blockLimit, headersOnly + const mockBlocks = [Networks.testnet.genesisBlock]; + database.getBlocksForDownload = jest.fn().mockReturnValueOnce(mockBlocks); + const payload = { + lastBlockHeight: 1, + blockLimit: 100, + headersOnly: true, + }; + const ip = "187.55.33.22"; + + const blocks = await blocksController.getBlocks({ payload, info: { remoteAddress: ip } }, {}); + + expect(blocks).toEqual(mockBlocks); + expect(database.getBlocksForDownload).toBeCalledTimes(1); + expect(database.getBlocksForDownload).toBeCalledWith( + payload.lastBlockHeight + 1, + payload.blockLimit, + payload.headersOnly, + ); + }); + + it("should use database.getBlocksForDownload to get the blocks according to the request params with default block limit", async () => { + // request parameters: lastBlockHeight, blockLimit, headersOnly + const mockBlocks = [Networks.testnet.genesisBlock]; + database.getBlocksForDownload = jest.fn().mockReturnValueOnce(mockBlocks); + const payload = { + lastBlockHeight: 1, + blockLimit: null, + headersOnly: true, + }; + const ip = "187.55.33.22"; + + const blocks = await blocksController.getBlocks({ payload, info: { remoteAddress: ip } }, {}); + + expect(blocks).toEqual(mockBlocks); + expect(database.getBlocksForDownload).toBeCalledTimes(1); + expect(database.getBlocksForDownload).toBeCalledWith( + payload.lastBlockHeight + 1, + 400, + payload.headersOnly, + ); + }); + }); +}); diff --git a/__tests__/unit/core-p2p/socket-server/controllers/peer.test.ts b/__tests__/unit/core-p2p/socket-server/controllers/peer.test.ts index 28c41e8528..b31057c9d7 100644 --- a/__tests__/unit/core-p2p/socket-server/controllers/peer.test.ts +++ b/__tests__/unit/core-p2p/socket-server/controllers/peer.test.ts @@ -2,9 +2,8 @@ import { Container } from "@arkecosystem/core-kernel"; import { MissingCommonBlockError } from "@arkecosystem/core-p2p/src/errors"; import { Peer } from "@arkecosystem/core-p2p/src/peer"; import { PeerController } from "@arkecosystem/core-p2p/src/socket-server/controllers/peer"; -import { TooManyTransactionsError, UnchainedBlockError } from "@arkecosystem/core-p2p/src/socket-server/errors"; import { getPeerConfig } from "@arkecosystem/core-p2p/src/socket-server/utils/get-peer-config"; -import { Blocks, Crypto, Identities, Managers, Networks, Transactions, Utils } from "@arkecosystem/crypto"; +import { Crypto, Managers } from "@arkecosystem/crypto"; Managers.configManager.getMilestone().aip11 = true; // for creating aip11 v2 transactions @@ -170,255 +169,4 @@ describe("PeerController", () => { }); }); }); - - describe("postBlock", () => { - const block = { - data: { - id: "3863292773792902701", - version: 0, - timestamp: 46583330, - height: 2, - reward: Utils.BigNumber.make("0"), - previousBlock: "17184958558311101492", - numberOfTransactions: 1, - totalAmount: Utils.BigNumber.make("0"), - totalFee: Utils.BigNumber.make("0"), - payloadLength: 0, - payloadHash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", - generatorPublicKey: "026c598170201caf0357f202ff14f365a3b09322071e347873869f58d776bfc565", - blockSignature: - "3045022100e7385c6ea42bd950f7f6ab8c8619cf2f66a41d8f8f185b0bc99af032cb25f30d02200b6210176a6cedfdcbe483167fd91c21d740e0e4011d24d679c601fdd46b0de9", - }, - transactions: [ - Transactions.BuilderFactory.transfer() - .amount("100") - .recipientId(Identities.Address.fromPassphrase("recipient's secret")) - .fee("100") - .sign("sender's secret") - .build(), - ], - } as Blocks.Block; - const deepClone = (obj) => JSON.parse(JSON.stringify(obj)); - - describe("when block contains too many transactions", () => { - it("should throw TooManyTransactionsError when numberOfTransactions is too much", async () => { - const blockTooManyTxs = deepClone(block); - blockTooManyTxs.data.numberOfTransactions = 350; - const blockSerialized = Blocks.Serializer.serializeWithTransactions({ - ...blockTooManyTxs.data, - transactions: blockTooManyTxs.transactions.map((tx) => tx.data), - }); - - await expect( - peerController.postBlock({ payload: { block: { data: blockSerialized } } }, {}), - ).rejects.toBeInstanceOf(TooManyTransactionsError); - }); - - it("should throw TooManyTransactionsError when transactions.length is too much", async () => { - blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); - const blockTooManyTxs = deepClone(block); - - const transactions = []; - for (let i = 0; i < 2; i++) { - transactions.push( - Transactions.BuilderFactory.transfer() - .version(2) - .amount("100") - .recipientId(Identities.Address.fromPassphrase(`recipient secret ${i}`)) - .fee("100") - .nonce(`${i + 1}`) - .sign(`sender secret ${i}`) - .build(), - ); - } - blockTooManyTxs.transactions = transactions; - blockTooManyTxs.data.numberOfTransactions = 2; - - const blockSerialized = Blocks.Serializer.serializeWithTransactions({ - ...blockTooManyTxs.data, - transactions: transactions.map((tx) => tx.data), - }); - - // this is a trick to make the first numberOfTransactions check pass - // but then transactions.length fail - // probably some unreachable code though... - const milestone = Managers.configManager.getMilestone(); - const spyGetMilestone = jest.spyOn(Managers.configManager, "getMilestone"); - for (let i = 0; i < 75; i++) { - // yeah 75 times :wtf: before the one we are interested to mock kicks in - spyGetMilestone.mockReturnValueOnce({ - ...milestone, - block: { - maxTransactions: 150, - }, - }); - } - spyGetMilestone.mockReturnValueOnce({ - ...milestone, - block: { - maxTransactions: 1, - }, - }); - - await expect( - peerController.postBlock( - { - payload: { block: { data: blockSerialized } }, - info: { remoteAddress: "187.55.33.22" }, - }, - {}, - ), - ).rejects.toBeInstanceOf(TooManyTransactionsError); - - spyGetMilestone.mockRestore(); - }); - }); - - describe("when block is not chained", () => { - it.each([[true], [false]])( - "should throw UnchainedBlockError only if block is not known", - async (blockPing) => { - blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); - const blockUnchained = deepClone(block); - blockUnchained.data.height = 9; - const blockSerialized = Blocks.Serializer.serializeWithTransactions({ - ...blockUnchained.data, - transactions: blockUnchained.transactions.map((tx) => tx.data), - }); - - if (blockPing) { - blockchain.pingBlock = jest.fn().mockReturnValueOnce(true); - await expect( - peerController.postBlock( - { - payload: { block: { data: blockSerialized } }, - info: { remoteAddress: "187.55.33.22" }, - }, - {}, - ), - ).toResolve(); - expect(blockchain.handleIncomingBlock).toBeCalledTimes(0); - } else { - await expect( - peerController.postBlock( - { - payload: { block: { data: blockSerialized } }, - info: { remoteAddress: "187.55.33.22" }, - }, - {}, - ), - ).rejects.toBeInstanceOf(UnchainedBlockError); - } - }, - ); - }); - - describe("when block comes from forger", () => { - it("should call handleIncomingBlock with the block and fromForger=true", async () => { - blockchain.handleIncomingBlock = jest.fn(); - const ip = "187.55.33.22"; - config.getOptional.mockReturnValueOnce([ip]); - - const blockSerialized = Blocks.Serializer.serializeWithTransactions({ - ...block.data, - transactions: block.transactions.map((tx) => tx.data), - }); - await peerController.postBlock( - { - payload: { block: { data: blockSerialized } }, - info: { remoteAddress: ip }, - }, - {}, - ); - - expect(blockchain.handleIncomingBlock).toBeCalledTimes(1); - expect(blockchain.handleIncomingBlock).toBeCalledWith(expect.objectContaining(block.data), true); - }); - }); - - describe("when block does not come from forger", () => { - it("should call handleIncomingBlock with the block and fromForger=false", async () => { - blockchain.getLastDownloadedBlock = jest.fn().mockReturnValueOnce(Networks.testnet.genesisBlock); - blockchain.handleIncomingBlock = jest.fn(); - const ip = "187.55.33.22"; - config.getOptional.mockReturnValueOnce(["188.66.55.44"]); - - const blockSerialized = Blocks.Serializer.serializeWithTransactions({ - ...block.data, - transactions: block.transactions.map((tx) => tx.data), - }); - await peerController.postBlock( - { - payload: { block: { data: blockSerialized } }, - info: { remoteAddress: ip }, - }, - {}, - ); - - expect(blockchain.handleIncomingBlock).toBeCalledTimes(1); - expect(blockchain.handleIncomingBlock).toBeCalledWith(expect.objectContaining(block.data), false); - }); - }); - }); - - describe("postTransactions", () => { - it("should create transaction processor and use it to process the transactions", async () => { - const transactions = Networks.testnet.genesisBlock.transactions; - const processor = { process: jest.fn(), accept: [transactions[0].id] }; - createProcessor.mockReturnValueOnce(processor); - - expect(await peerController.postTransactions({ payload: { transactions } }, {})).toEqual([ - transactions[0].id, - ]); - - expect(processor.process).toBeCalledTimes(1); - expect(processor.process).toBeCalledWith(transactions); - }); - }); - - describe("getBlocks", () => { - it("should use database.getBlocksForDownload to get the blocks according to the request params", async () => { - // request parameters: lastBlockHeight, blockLimit, headersOnly - const mockBlocks = [Networks.testnet.genesisBlock]; - database.getBlocksForDownload = jest.fn().mockReturnValueOnce(mockBlocks); - const payload = { - lastBlockHeight: 1, - blockLimit: 100, - headersOnly: true, - }; - const ip = "187.55.33.22"; - - const blocks = await peerController.getBlocks({ payload, info: { remoteAddress: ip } }, {}); - - expect(blocks).toEqual(mockBlocks); - expect(database.getBlocksForDownload).toBeCalledTimes(1); - expect(database.getBlocksForDownload).toBeCalledWith( - payload.lastBlockHeight + 1, - payload.blockLimit, - payload.headersOnly, - ); - }); - - it("should use database.getBlocksForDownload to get the blocks according to the request params with default block limit", async () => { - // request parameters: lastBlockHeight, blockLimit, headersOnly - const mockBlocks = [Networks.testnet.genesisBlock]; - database.getBlocksForDownload = jest.fn().mockReturnValueOnce(mockBlocks); - const payload = { - lastBlockHeight: 1, - blockLimit: null, - headersOnly: true, - }; - const ip = "187.55.33.22"; - - const blocks = await peerController.getBlocks({ payload, info: { remoteAddress: ip } }, {}); - - expect(blocks).toEqual(mockBlocks); - expect(database.getBlocksForDownload).toBeCalledTimes(1); - expect(database.getBlocksForDownload).toBeCalledWith( - payload.lastBlockHeight + 1, - 400, - payload.headersOnly, - ); - }); - }); }); diff --git a/__tests__/unit/core-p2p/socket-server/controllers/transactions.test.ts b/__tests__/unit/core-p2p/socket-server/controllers/transactions.test.ts new file mode 100644 index 0000000000..929d0082d6 --- /dev/null +++ b/__tests__/unit/core-p2p/socket-server/controllers/transactions.test.ts @@ -0,0 +1,83 @@ +import { Container } from "@arkecosystem/core-kernel"; +import { TransactionsController } from "@arkecosystem/core-p2p/src/socket-server/controllers/transactions"; +import { Managers, Networks } from "@arkecosystem/crypto"; + +Managers.configManager.getMilestone().aip11 = true; // for creating aip11 v2 transactions + +describe("TransactionsController", () => { + let transactionsController: TransactionsController; + + const container = new Container.Container(); + + const logger = { warning: jest.fn(), debug: jest.fn(), info: jest.fn() }; + const peerStorage = { getPeers: jest.fn() }; + const database = { getCommonBlocks: jest.fn(), getBlocksForDownload: jest.fn() }; + const blockchain = { + getLastBlock: jest.fn(), + handleIncomingBlock: jest.fn(), + pingBlock: jest.fn(), + getLastDownloadedBlock: jest.fn(), + }; + const createProcessor = jest.fn(); + const appPlugins = [{ package: "@arkecosystem/core-api", options: {} }]; + const coreApiServiceProvider = { + name: () => "core-api", + configDefaults: () => ({ + server: { http: { port: 4003 } }, + }), + }; + const serviceProviders = { "@arkecosystem/core-api": coreApiServiceProvider }; + const configRepository = { get: () => appPlugins }; // get("app.plugins") + const serviceProviderRepository = { get: (plugin) => serviceProviders[plugin] }; + const appGet = { + [Container.Identifiers.BlockchainService]: blockchain, + [Container.Identifiers.TransactionPoolProcessorFactory]: createProcessor, + [Container.Identifiers.ConfigRepository]: configRepository, + [Container.Identifiers.ServiceProviderRepository]: serviceProviderRepository, + }; + const config = { getOptional: jest.fn().mockReturnValue(["127.0.0.1"]) }; // remoteAccess + const app = { + get: (key) => appGet[key], + getTagged: () => config, + version: () => "3.0.9", + resolve: () => ({ + from: () => ({ + merge: () => ({ + all: () => ({ + server: { http: { port: "4003" } }, + options: { + estimateTotalCount: true, + }, + }), + }), + }), + }), + }; + + beforeAll(() => { + container.unbindAll(); + container.bind(Container.Identifiers.LogService).toConstantValue(logger); + container.bind(Container.Identifiers.PeerStorage).toConstantValue(peerStorage); + container.bind(Container.Identifiers.DatabaseService).toConstantValue(database); + container.bind(Container.Identifiers.Application).toConstantValue(app); + }); + + beforeEach(() => { + transactionsController = container.resolve(TransactionsController); + }); + + describe("postTransactions", () => { + it("should create transaction processor and use it to process the transactions", async () => { + const transactions = Networks.testnet.genesisBlock.transactions; + const processor = { process: jest.fn(), accept: [transactions[0].id] }; + createProcessor.mockReturnValueOnce(processor); + + expect(await transactionsController.postTransactions({ payload: { transactions } }, {})).toEqual([ + transactions[0].id, + ]); + + expect(processor.process).toBeCalledTimes(1); + expect(processor.process).toBeCalledWith(transactions); + }); + }); +}); diff --git a/__tests__/unit/core-p2p/socket-server/server.test.ts b/__tests__/unit/core-p2p/socket-server/server.test.ts index c87cfb5590..bb41392e9f 100644 --- a/__tests__/unit/core-p2p/socket-server/server.test.ts +++ b/__tests__/unit/core-p2p/socket-server/server.test.ts @@ -15,7 +15,7 @@ const hapiServer = { route: jest.fn(), register: jest.fn(), app: {}, -}; +} as any; const spyHapiServer = jest.spyOn(hapi, "Server").mockReturnValue(hapiServer); describe("Server", () => { @@ -60,7 +60,7 @@ describe("Server", () => { it("should instantiate a new Hapi server", async () => { await server.initialize(name, options); - expect(spyHapiServer).toBeCalledTimes(1); + expect(spyHapiServer).toBeCalledTimes(3); // 3 servers listening on the 3 ports }); }); @@ -103,40 +103,40 @@ describe("Server", () => { }); describe("register", () => { - it("should call server.register() with the options provided", async () => { + it("should call server.register() with the options provided - for each server", async () => { await server.initialize(name, options); hapiServer.register.mockReset(); const plugin = { name: "my plugin" }; await server.register(plugin); - expect(hapiServer.register).toBeCalledTimes(1); + expect(hapiServer.register).toBeCalledTimes(3); // 3 servers listening on the 3 ports expect(hapiServer.register).toBeCalledWith(plugin); }); }); describe("route", () => { - it("should call server.register() with the options provided", async () => { + it("should call server.register() with the options provided - for each server", async () => { await server.initialize(name, options); hapiServer.route.mockReset(); const route = { method: "POST", path: "/the/path" }; await server.route(route); - expect(hapiServer.route).toBeCalledTimes(1); + expect(hapiServer.route).toBeCalledTimes(3); // 3 servers listening on the 3 ports expect(hapiServer.route).toBeCalledWith(route); }); }); describe("inject", () => { - it("should call server.register() with the options provided", async () => { + it("should call server.register() with the options provided - for each server", async () => { await server.initialize(name, options); hapiServer.inject.mockReset(); const toInject = { name: "thing to inject" }; await server.inject(toInject); - expect(hapiServer.inject).toBeCalledTimes(1); + expect(hapiServer.inject).toBeCalledTimes(3); // 3 servers listening on the 3 ports expect(hapiServer.inject).toBeCalledWith(toInject); }); }); diff --git a/packages/core-forger/src/client.ts b/packages/core-forger/src/client.ts index db14aa9a9d..c30ff2859a 100644 --- a/packages/core-forger/src/client.ts +++ b/packages/core-forger/src/client.ts @@ -1,5 +1,5 @@ import { Container, Contracts, Utils } from "@arkecosystem/core-kernel"; -import { Nes, NetworkState, NetworkStateStatus } from "@arkecosystem/core-p2p"; +import { Nes, NetworkState, NetworkStateStatus, PortsOffset } from "@arkecosystem/core-p2p"; import { Blocks, Interfaces } from "@arkecosystem/crypto"; import { HostNoResponseError, RelayCommunicationError } from "./errors"; @@ -40,14 +40,25 @@ export class Client { */ public register(hosts: RelayHost[]) { this.hosts = hosts.map((host: RelayHost) => { - const connection = new Nes.Client(`ws://${host.hostname}:${host.port}`); - connection.connect().catch((e) => {}); // connect promise can fail when p2p is not ready, it's fine it will retry + const internalPort = Number(host.port) + Number(PortsOffset.Peer); + const internalConnection = new Nes.Client(`ws://${host.hostname}:${internalPort}`); + internalConnection.connect().catch((e) => {}); // connect promise can fail when p2p is not ready, it's fine it will retry - connection.onError = (e) => { + internalConnection.onError = (e) => { this.logger.error(e.message); }; - host.socket = connection; + host.internalSocket = internalConnection; + + const blocksPort = Number(host.port) + Number(PortsOffset.Blocks); + const blocksConnection = new Nes.Client(`ws://${host.hostname}:${blocksPort}`); + blocksConnection.connect().catch((e) => {}); // connect promise can fail when p2p is not ready, it's fine it will retry + + blocksConnection.onError = (e) => { + this.logger.error(e.message); + }; + + host.blocksSocket = blocksConnection; return host; }); @@ -60,10 +71,10 @@ export class Client { */ public dispose(): void { for (const host of this.hosts) { - const socket: Nes.Client | undefined = host.socket; - - if (socket) { - socket.disconnect(); + for (const socket of [host.blocksSocket, host.internalSocket]) { + if (socket) { + socket.disconnect(); + } } } } @@ -81,7 +92,7 @@ export class Client { ); try { - await this.emit("p2p.peer.postBlock", { + await this.emit("p2p.blocks.postBlock", { block: Blocks.Serializer.serializeWithTransactions({ ...block.data, transactions: block.transactions.map((tx) => tx.data), @@ -179,7 +190,10 @@ export class Client { public async selectHost(): Promise { for (let i = 0; i < 10; i++) { for (const host of this.hosts) { - if (host.socket && host.socket._isReady()) { + if ( + host.internalSocket && host.internalSocket._isReady() + && host.blocksSocket && host.blocksSocket._isReady() + ) { this.host = host; return; } @@ -208,7 +222,9 @@ export class Client { */ private async emit(event: string, payload: Record = {}, timeout = 4000): Promise { try { - Utils.assert.defined(this.host.socket); + const socket = event === "p2p.blocks.postBlock" ? this.host.blocksSocket : this.host.internalSocket; + + Utils.assert.defined(socket); const options = { path: event, @@ -217,7 +233,7 @@ export class Client { payload, }; - const response: any = await this.host.socket.request(options); + const response: any = await socket.request(options); return response.payload; } catch (error) { diff --git a/packages/core-forger/src/interfaces.ts b/packages/core-forger/src/interfaces.ts index 1715f5dc61..c1f0ac0ff1 100644 --- a/packages/core-forger/src/interfaces.ts +++ b/packages/core-forger/src/interfaces.ts @@ -22,7 +22,13 @@ export interface RelayHost { * @type {Nes.Client} * @memberof RelayHost */ - socket?: Nes.Client; + blocksSocket?: Nes.Client; + + /** + * @type {Nes.Client} + * @memberof RelayHost + */ + internalSocket?: Nes.Client; } /** diff --git a/packages/core-kernel/src/contracts/p2p/peer-connector.ts b/packages/core-kernel/src/contracts/p2p/peer-connector.ts index 1643d06819..ef6a59602e 100644 --- a/packages/core-kernel/src/contracts/p2p/peer-connector.ts +++ b/packages/core-kernel/src/contracts/p2p/peer-connector.ts @@ -4,13 +4,13 @@ import { Peer } from "./peer"; export interface PeerConnector { all(): Client[]; - connection(peer: Peer): Client | undefined; + connection(peer: Peer, port: number): Client | undefined; - connect(peer: Peer, maxPayload?: number): Promise; + connect(peer: Peer, port: number): Promise; - disconnect(peer: Peer): void; + disconnect(peer: Peer, port: number): void; - emit(peer: Peer, event: string, payload: any): Promise; + emit(peer: Peer, port: number, event: string, payload: any): Promise; getError(peer: Peer): string | undefined; diff --git a/packages/core-p2p/src/enums.ts b/packages/core-p2p/src/enums.ts index da514c81d2..a5f49c319d 100644 --- a/packages/core-p2p/src/enums.ts +++ b/packages/core-p2p/src/enums.ts @@ -26,3 +26,9 @@ export enum SocketErrors { SocketNotOpen = "CoreSocketNotOpenError", Validation = "CoreValidationError", } + +export enum PortsOffset { + Peer = 0, + Blocks = 10, + Transactions = 20, +}; diff --git a/packages/core-p2p/src/listeners.ts b/packages/core-p2p/src/listeners.ts index 82ece540f0..01d1191f19 100644 --- a/packages/core-p2p/src/listeners.ts +++ b/packages/core-p2p/src/listeners.ts @@ -2,6 +2,7 @@ import { Container, Contracts } from "@arkecosystem/core-kernel"; import { PeerConnector } from "./peer-connector"; import { isValidVersion } from "./utils"; +import { getAllPeerPorts } from "./socket-server/utils/get-peer-port"; /** * @class DisconnectInvalidPeers @@ -42,7 +43,9 @@ export class DisconnectInvalidPeers implements Contracts.Kernel.EventListener { for (const peer of peers) { if (!isValidVersion(this.app, peer)) { - this.events.dispatch("internal.p2p.disconnectPeer", { peer }); + for (const port of getAllPeerPorts(peer)) { + this.events.dispatch("internal.p2p.disconnectPeer", { peer, port }); + } } } } @@ -76,7 +79,7 @@ export class DisconnectPeer implements Contracts.Kernel.EventListener { * @memberof DisconnectPeer */ public async handle({ data }): Promise { - this.connector.disconnect(data.peer); + this.connector.disconnect(data.peer, data.port); this.storage.forgetPeer(data.peer); } diff --git a/packages/core-p2p/src/network-monitor.ts b/packages/core-p2p/src/network-monitor.ts index d32cc66282..3e36b0cbcd 100644 --- a/packages/core-p2p/src/network-monitor.ts +++ b/packages/core-p2p/src/network-monitor.ts @@ -7,6 +7,7 @@ import { NetworkState } from "./network-state"; import { Peer } from "./peer"; import { PeerCommunicator } from "./peer-communicator"; import { checkDNS, checkNTP } from "./utils"; +import { getAllPeerPorts } from "./socket-server/utils/get-peer-port"; // todo: review the implementation @Container.injectable() @@ -156,7 +157,10 @@ export class NetworkMonitor implements Contracts.P2P.NetworkMonitor { peerErrors[error] = peerErrors[error] || []; peerErrors[error].push(peer); - this.events.dispatch("internal.p2p.disconnectPeer", { peer }); + for (const port of getAllPeerPorts(peer)) { + this.events.dispatch("internal.p2p.disconnectPeer", { peer, port }); + } + this.events.dispatch(Enums.PeerEvent.Removed, peer); } }), diff --git a/packages/core-p2p/src/peer-communicator.ts b/packages/core-p2p/src/peer-communicator.ts index baf511adfd..e01cd2305a 100644 --- a/packages/core-p2p/src/peer-communicator.ts +++ b/packages/core-p2p/src/peer-communicator.ts @@ -1,15 +1,14 @@ import { Container, Contracts, Enums, Providers, Utils } from "@arkecosystem/core-kernel"; import { Blocks, Interfaces, Managers, Transactions, Validation } from "@arkecosystem/crypto"; import dayjs from "dayjs"; -import delay from "delay"; import { constants } from "./constants"; import { SocketErrors } from "./enums"; import { PeerPingTimeoutError, PeerStatusResponseError, PeerVerificationFailedError } from "./errors"; import { PeerVerifier } from "./peer-verifier"; -import { RateLimiter } from "./rate-limiter"; import { replySchemas } from "./schemas"; -import { buildRateLimiter, isValidVersion } from "./utils"; +import { isValidVersion } from "./utils"; +import { getAllPeerPorts, getPeerPortForEvent } from "./socket-server/utils/get-peer-port"; // todo: review the implementation @Container.injectable() @@ -30,23 +29,14 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { @Container.inject(Container.Identifiers.LogService) private readonly logger!: Contracts.Kernel.Logger; - private outgoingRateLimiter!: RateLimiter; - public initialize(): void { - this.outgoingRateLimiter = buildRateLimiter({ - // White listing anybody here means we would not throttle ourselves when sending - // them requests, ie we could spam them. - whitelist: [], - remoteAccess: [], - rateLimit: this.configuration.getOptional("rateLimit", false), - }); } public async postBlock(peer: Contracts.P2P.Peer, block: Interfaces.IBlock) { const postBlockTimeout = 10000; return this.emit( peer, - "p2p.peer.postBlock", + "p2p.blocks.postBlock", { block: Blocks.Serializer.serializeWithTransactions({ ...block.data, @@ -59,7 +49,7 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { public async postTransactions(peer: Contracts.P2P.Peer, transactions: Interfaces.ITransactionJson[]): Promise { const postTransactionsTimeout = 10000; - return this.emit(peer, "p2p.peer.postTransactions", { transactions }, postTransactionsTimeout); + return this.emit(peer, "p2p.transactions.postTransactions", { transactions }, postTransactionsTimeout); } // ! do not rely on parameter timeoutMsec as guarantee that ping method will resolve within it ! @@ -132,7 +122,9 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { `Disconnecting from ${peerHostPort}: ` + `nethash mismatch: our=${ourNethash}, his=${hisNethash}.`, ); - this.events.dispatch("internal.p2p.disconnectPeer", { peer }); + for (const port of getAllPeerPorts(peer)) { + this.events.dispatch("internal.p2p.disconnectPeer", { peer, port }); + } } } } else { @@ -176,11 +168,9 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { headersOnly, }: { fromBlockHeight: number; blockLimit?: number; headersOnly?: boolean }, ): Promise { - const maxPayload = headersOnly ? blockLimit * constants.KILOBYTE : constants.DEFAULT_MAX_PAYLOAD; - const peerBlocks = await this.emit( peer, - "p2p.peer.getBlocks", + "p2p.blocks.getBlocks", { lastBlockHeight: fromBlockHeight, blockLimit, @@ -188,7 +178,6 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { serialized: true, }, this.configuration.getRequired("getBlocksTimeout"), - maxPayload, ); if (!peerBlocks || !peerBlocks.length) { @@ -253,21 +242,21 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { return true; } - private async emit(peer: Contracts.P2P.Peer, event: string, payload: any, timeout?: number, maxPayload?: number) { - await this.throttle(peer, event); - + private async emit(peer: Contracts.P2P.Peer, event: string, payload: any, timeout?: number) { + const port = getPeerPortForEvent(peer, event); + let response; try { this.connector.forgetError(peer); const timeBeforeSocketCall: number = new Date().getTime(); - maxPayload = maxPayload || 100 * constants.KILOBYTE; // 100KB by default, enough for most requests - await this.connector.connect(peer, maxPayload); + await this.connector.connect(peer, port); - response = await this.connector.emit(peer, event, payload); + peer.sequentialErrorCounter = 0; // only counts connection errors + + response = await this.connector.emit(peer, port, event, payload); - peer.sequentialErrorCounter = 0; peer.latency = new Date().getTime() - timeBeforeSocketCall; this.parseHeaders(peer, response.payload); @@ -286,16 +275,6 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { return response.payload; } - private async throttle(peer: Contracts.P2P.Peer, event: string): Promise { - const msBeforeReCheck = 1000; - while (await this.outgoingRateLimiter.hasExceededRateLimit(peer.ip, event)) { - this.logger.debug( - `Throttling outgoing requests to ${peer.ip}/${event} to avoid triggering their rate limit`, - ); - await delay(msBeforeReCheck); - } - } - private handleSocketError(peer: Contracts.P2P.Peer, event: string, error: Error): void { if (!error.name) { return; @@ -303,6 +282,9 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { this.connector.setError(peer, error.name); peer.sequentialErrorCounter++; + if (peer.sequentialErrorCounter >= this.configuration.getRequired("maxPeerSequentialErrors")) { + this.events.dispatch(Enums.PeerEvent.Disconnect, { peer }); + } switch (error.name) { case SocketErrors.Validation: @@ -313,10 +295,6 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator { if (process.env.CORE_P2P_PEER_VERIFIER_DEBUG_EXTRA) { this.logger.debug(`Response error (peer ${peer.ip}/${event}) : ${error.message}`); } - - if (peer.sequentialErrorCounter >= this.configuration.getRequired("maxPeerSequentialErrors")) { - this.events.dispatch(Enums.PeerEvent.Disconnect, { peer }); - } break; default: /* istanbul ignore else */ diff --git a/packages/core-p2p/src/peer-connector.ts b/packages/core-p2p/src/peer-connector.ts index 5e191a3887..25de42b1e1 100644 --- a/packages/core-p2p/src/peer-connector.ts +++ b/packages/core-p2p/src/peer-connector.ts @@ -15,32 +15,32 @@ export class PeerConnector implements Contracts.P2P.PeerConnector { return Array.from(this.connections, ([key, value]) => value); } - public connection(peer: Contracts.P2P.Peer): Client | undefined { - const connection: Client | undefined = this.connections.get(peer.ip); + public connection(peer: Contracts.P2P.Peer, port: number): Client | undefined { + const connection: Client | undefined = this.connections.get(`${peer.ip}:${port}`); return connection; } - public async connect(peer: Contracts.P2P.Peer, maxPayload?: number): Promise { - const connection = this.connection(peer) || (await this.create(peer)); + public async connect(peer: Contracts.P2P.Peer, port: number): Promise { + const connection = this.connection(peer, port) || (await this.create(peer, port)); - this.connections.set(peer.ip, connection); + this.connections.set(`${peer.ip}:${port}`, connection); return connection; } - public disconnect(peer: Contracts.P2P.Peer): void { - const connection = this.connection(peer); + public disconnect(peer: Contracts.P2P.Peer, port: number): void { + const connection = this.connection(peer, port); if (connection) { connection.disconnect(); - this.connections.delete(peer.ip); + this.connections.delete(`${peer.ip}:${port}`); } } - public async emit(peer: Contracts.P2P.Peer, event: string, payload: any): Promise { - const connection: Client = await this.connect(peer); + public async emit(peer: Contracts.P2P.Peer, port: number, event: string, payload: any): Promise { + const connection: Client = await this.connect(peer, port); const options = { path: event, headers: {}, @@ -67,8 +67,8 @@ export class PeerConnector implements Contracts.P2P.PeerConnector { this.errors.delete(peer.ip); } - private async create(peer: Contracts.P2P.Peer): Promise { - const connection = new Client(`ws://${peer.ip}:${peer.port}`); + private async create(peer: Contracts.P2P.Peer, port: number): Promise { + const connection = new Client(`ws://${peer.ip}:${port}`); connection.onError = (error) => { this.logger.debug(`Socket error (peer ${peer.ip}) : ${error.message}`); diff --git a/packages/core-p2p/src/peer-processor.ts b/packages/core-p2p/src/peer-processor.ts index 3669feea22..f31b6f0c66 100644 --- a/packages/core-p2p/src/peer-processor.ts +++ b/packages/core-p2p/src/peer-processor.ts @@ -3,6 +3,7 @@ import { Utils } from "@arkecosystem/crypto"; import { PeerFactory } from "./contracts"; import { DisconnectInvalidPeers } from "./listeners"; +import { getAllPeerPorts } from "./socket-server/utils/get-peer-port"; // todo: review the implementation @Container.injectable() @@ -103,7 +104,9 @@ export class PeerProcessor implements Contracts.P2P.PeerProcessor { this.events.dispatch(Enums.PeerEvent.Added, newPeer); } catch (error) { - this.connector.disconnect(newPeer); + for (const port of getAllPeerPorts(newPeer)) { + this.connector.disconnect(newPeer, port); + } } finally { this.storage.forgetPendingPeer(peer); } diff --git a/packages/core-p2p/src/schemas.ts b/packages/core-p2p/src/schemas.ts index bccafaaafd..8a573161ce 100644 --- a/packages/core-p2p/src/schemas.ts +++ b/packages/core-p2p/src/schemas.ts @@ -80,13 +80,6 @@ export const requestSchemas = { }; export const replySchemas = { - "p2p.peer.getBlocks": { - type: "array", - maxItems: 400, - items: { - $ref: "blockHeader", - }, - }, "p2p.peer.getCommonBlocks": { type: "object", additionalProperties: false, @@ -263,10 +256,17 @@ export const replySchemas = { }, }, }, - "p2p.peer.postBlock": { + "p2p.blocks.getBlocks": { + type: "array", + maxItems: 400, + items: { + $ref: "blockHeader", + }, + }, + "p2p.blocks.postBlock": { type: "boolean", }, - "p2p.peer.postTransactions": { + "p2p.transactions.postTransactions": { type: "array", }, }; diff --git a/packages/core-p2p/src/socket-server/controllers/blocks.ts b/packages/core-p2p/src/socket-server/controllers/blocks.ts new file mode 100644 index 0000000000..b6a3fc9be7 --- /dev/null +++ b/packages/core-p2p/src/socket-server/controllers/blocks.ts @@ -0,0 +1,107 @@ +import { DatabaseService } from "@arkecosystem/core-database"; +import { Container, Contracts, Providers, Utils } from "@arkecosystem/core-kernel"; +import { Blocks, Interfaces, Managers } from "@arkecosystem/crypto"; +import Hapi from "@hapi/hapi"; + +import { TooManyTransactionsError, UnchainedBlockError } from "../errors"; +import { mapAddr } from "../utils/map-addr"; +import { Controller } from "./controller"; + +export class BlocksController extends Controller { + @Container.inject(Container.Identifiers.DatabaseService) + private readonly database!: DatabaseService; + + public async postBlock(request: Hapi.Request, h: Hapi.ResponseToolkit): Promise { + const configuration = this.app.getTagged( + Container.Identifiers.PluginConfiguration, + "plugin", + "@arkecosystem/core-p2p", + ); + + const blockBuffer = Buffer.from(request.payload.block.data); + const blockHex: string = blockBuffer.toString("hex"); + + const deserializedHeader = Blocks.Deserializer.deserialize(blockHex, true); + + if ( + deserializedHeader.data.numberOfTransactions > Managers.configManager.getMilestone().block.maxTransactions + ) { + throw new TooManyTransactionsError(deserializedHeader.data); + } + + const deserialized: { + data: Interfaces.IBlockData; + transactions: Interfaces.ITransaction[]; + } = Blocks.Deserializer.deserialize(blockHex); + + const block: Interfaces.IBlockData = { + ...deserialized.data, + transactions: deserialized.transactions.map((tx) => tx.data), + }; + + const fromForger: boolean = Utils.isWhitelisted( + configuration.getOptional("remoteAccess", []), + request.info.remoteAddress, + ); + + const blockchain = this.app.get(Container.Identifiers.BlockchainService); + + if (!fromForger) { + if (blockchain.pingBlock(block)) { + return true; + } + + const lastDownloadedBlock: Interfaces.IBlockData = blockchain.getLastDownloadedBlock(); + + const blockTimeLookup = await Utils.forgingInfoCalculator.getBlockTimeLookup(this.app, block.height); + + if (!Utils.isBlockChained(lastDownloadedBlock, block, blockTimeLookup)) { + throw new UnchainedBlockError(lastDownloadedBlock.height, block.height); + } + } + + if ( + block.transactions && + block.transactions.length > Managers.configManager.getMilestone().block.maxTransactions + ) { + throw new TooManyTransactionsError(block); + } + + this.logger.info( + `Received new block at height ${block.height.toLocaleString()} with ${Utils.pluralize( + "transaction", + block.numberOfTransactions, + true, + )} from ${mapAddr(request.info.remoteAddress)}`, + ); + + // TODO: check we don't need to await here (handleIncomingBlock is now an async operation) + blockchain.handleIncomingBlock(block, fromForger); + return true; + } + + public async getBlocks( + request: Hapi.Request, + h: Hapi.ResponseToolkit, + ): Promise { + const reqBlockHeight: number = +(request.payload as any).lastBlockHeight + 1; + const reqBlockLimit: number = +(request.payload as any).blockLimit || 400; + const reqHeadersOnly: boolean = !!(request.payload as any).headersOnly; + + const blocks: Contracts.Shared.DownloadBlock[] = await this.database.getBlocksForDownload( + reqBlockHeight, + reqBlockLimit, + reqHeadersOnly, + ); + + this.logger.info( + `${mapAddr(request.info.remoteAddress)} has downloaded ${Utils.pluralize( + "block", + blocks.length, + true, + )} from height ${reqBlockHeight.toLocaleString()}`, + ); + + return blocks; + } +} diff --git a/packages/core-p2p/src/socket-server/controllers/peer.ts b/packages/core-p2p/src/socket-server/controllers/peer.ts index 5af5e62237..c3eba3d28c 100644 --- a/packages/core-p2p/src/socket-server/controllers/peer.ts +++ b/packages/core-p2p/src/socket-server/controllers/peer.ts @@ -1,12 +1,10 @@ import { DatabaseService } from "@arkecosystem/core-database"; -import { Container, Contracts, Providers, Utils } from "@arkecosystem/core-kernel"; -import { Blocks, Crypto, Interfaces, Managers } from "@arkecosystem/crypto"; +import { Container, Contracts, Utils } from "@arkecosystem/core-kernel"; +import { Crypto, Interfaces } from "@arkecosystem/crypto"; import Hapi from "@hapi/hapi"; import { MissingCommonBlockError } from "../../errors"; -import { TooManyTransactionsError, UnchainedBlockError } from "../errors"; import { getPeerConfig } from "../utils/get-peer-config"; -import { mapAddr } from "../utils/map-addr"; import { Controller } from "./controller"; export class PeerController extends Controller { @@ -19,6 +17,7 @@ export class PeerController extends Controller { public getPeers(request: Hapi.Request, h: Hapi.ResponseToolkit): Contracts.P2P.PeerBroadcast[] { return this.peerStorage .getPeers() + .filter((peer) => peer.port !== -1 ) .map((peer) => peer.toBroadcast()) .sort((a, b) => { Utils.assert.defined(a.latency); @@ -78,107 +77,4 @@ export class PeerController extends Controller { config: getPeerConfig(this.app), }; } - - public async postBlock(request: Hapi.Request, h: Hapi.ResponseToolkit): Promise { - const configuration = this.app.getTagged( - Container.Identifiers.PluginConfiguration, - "plugin", - "@arkecosystem/core-p2p", - ); - - const blockBuffer = Buffer.from(request.payload.block.data); - const blockHex: string = blockBuffer.toString("hex"); - - const deserializedHeader = Blocks.Deserializer.deserialize(blockHex, true); - - if ( - deserializedHeader.data.numberOfTransactions > Managers.configManager.getMilestone().block.maxTransactions - ) { - throw new TooManyTransactionsError(deserializedHeader.data); - } - - const deserialized: { - data: Interfaces.IBlockData; - transactions: Interfaces.ITransaction[]; - } = Blocks.Deserializer.deserialize(blockHex); - - const block: Interfaces.IBlockData = { - ...deserialized.data, - transactions: deserialized.transactions.map((tx) => tx.data), - }; - - const fromForger: boolean = Utils.isWhitelisted( - configuration.getOptional("remoteAccess", []), - request.info.remoteAddress, - ); - - const blockchain = this.app.get(Container.Identifiers.BlockchainService); - - if (!fromForger) { - if (blockchain.pingBlock(block)) { - return true; - } - - const lastDownloadedBlock: Interfaces.IBlockData = blockchain.getLastDownloadedBlock(); - - const blockTimeLookup = await Utils.forgingInfoCalculator.getBlockTimeLookup(this.app, block.height); - - if (!Utils.isBlockChained(lastDownloadedBlock, block, blockTimeLookup)) { - throw new UnchainedBlockError(lastDownloadedBlock.height, block.height); - } - } - - if ( - block.transactions && - block.transactions.length > Managers.configManager.getMilestone().block.maxTransactions - ) { - throw new TooManyTransactionsError(block); - } - - this.logger.info( - `Received new block at height ${block.height.toLocaleString()} with ${Utils.pluralize( - "transaction", - block.numberOfTransactions, - true, - )} from ${mapAddr(request.info.remoteAddress)}`, - ); - - // TODO: check we don't need to await here (handleIncomingBlock is now an async operation) - blockchain.handleIncomingBlock(block, fromForger); - return true; - } - - public async postTransactions(request: Hapi.Request, h: Hapi.ResponseToolkit): Promise { - const createProcessor: Contracts.TransactionPool.ProcessorFactory = this.app.get( - Container.Identifiers.TransactionPoolProcessorFactory, - ); - const processor: Contracts.TransactionPool.Processor = createProcessor(); - await processor.process((request.payload as any).transactions); - return processor.accept; - } - - public async getBlocks( - request: Hapi.Request, - h: Hapi.ResponseToolkit, - ): Promise { - const reqBlockHeight: number = +(request.payload as any).lastBlockHeight + 1; - const reqBlockLimit: number = +(request.payload as any).blockLimit || 400; - const reqHeadersOnly: boolean = !!(request.payload as any).headersOnly; - - const blocks: Contracts.Shared.DownloadBlock[] = await this.database.getBlocksForDownload( - reqBlockHeight, - reqBlockLimit, - reqHeadersOnly, - ); - - this.logger.info( - `${mapAddr(request.info.remoteAddress)} has downloaded ${Utils.pluralize( - "block", - blocks.length, - true, - )} from height ${reqBlockHeight.toLocaleString()}`, - ); - - return blocks; - } } diff --git a/packages/core-p2p/src/socket-server/controllers/transactions.ts b/packages/core-p2p/src/socket-server/controllers/transactions.ts new file mode 100644 index 0000000000..d48a07046a --- /dev/null +++ b/packages/core-p2p/src/socket-server/controllers/transactions.ts @@ -0,0 +1,15 @@ +import { Container, Contracts } from "@arkecosystem/core-kernel"; +import Hapi from "@hapi/hapi"; + +import { Controller } from "./controller"; + +export class TransactionsController extends Controller { + public async postTransactions(request: Hapi.Request, h: Hapi.ResponseToolkit): Promise { + const createProcessor: Contracts.TransactionPool.ProcessorFactory = this.app.get( + Container.Identifiers.TransactionPoolProcessorFactory, + ); + const processor: Contracts.TransactionPool.Processor = createProcessor(); + await processor.process((request.payload as any).transactions); + return processor.accept; + } +} diff --git a/packages/core-p2p/src/socket-server/plugins/validate.ts b/packages/core-p2p/src/socket-server/plugins/validate.ts index 259af1db29..b0866509c7 100644 --- a/packages/core-p2p/src/socket-server/plugins/validate.ts +++ b/packages/core-p2p/src/socket-server/plugins/validate.ts @@ -3,6 +3,8 @@ import Boom from "@hapi/boom"; import { InternalRoute } from "../routes/internal"; import { PeerRoute } from "../routes/peer"; +import { BlocksRoute } from "../routes/blocks"; +import { TransactionsRoute } from "../routes/transactions"; @Container.injectable() export class ValidatePlugin { @@ -13,6 +15,8 @@ export class ValidatePlugin { const allRoutesConfigByPath = { ...this.app.resolve(InternalRoute).getRoutesConfigByPath(), ...this.app.resolve(PeerRoute).getRoutesConfigByPath(), + ...this.app.resolve(BlocksRoute).getRoutesConfigByPath(), + ...this.app.resolve(TransactionsRoute).getRoutesConfigByPath(), }; server.ext({ diff --git a/packages/core-p2p/src/socket-server/routes/blocks.ts b/packages/core-p2p/src/socket-server/routes/blocks.ts new file mode 100644 index 0000000000..da0e75cf65 --- /dev/null +++ b/packages/core-p2p/src/socket-server/routes/blocks.ts @@ -0,0 +1,27 @@ +import { blocksSchemas } from "../schemas/blocks"; +import { Route, RouteConfig } from "./route"; +import { BlocksController } from "../controllers/blocks"; +import { constants } from "../../constants"; + +export class BlocksRoute extends Route { + public getRoutesConfigByPath(): { [path: string]: RouteConfig } { + const controller = this.getController(); + return { + "/p2p/blocks/getBlocks": { + id: "p2p.blocks.getBlocks", + handler: controller.getBlocks, + validation: blocksSchemas.getBlocks, + }, + "/p2p/blocks/postBlock": { + id: "p2p.blocks.postBlock", + handler: controller.postBlock, + validation: blocksSchemas.postBlock, + maxBytes: constants.DEFAULT_MAX_PAYLOAD, + }, + }; + } + + protected getController(): BlocksController { + return this.app.resolve(BlocksController); + } +} diff --git a/packages/core-p2p/src/socket-server/routes/peer.ts b/packages/core-p2p/src/socket-server/routes/peer.ts index aed6965999..1586fe8e43 100644 --- a/packages/core-p2p/src/socket-server/routes/peer.ts +++ b/packages/core-p2p/src/socket-server/routes/peer.ts @@ -11,11 +11,6 @@ export class PeerRoute extends Route { handler: controller.getPeers, validation: peerSchemas.getPeers, }, - "/p2p/peer/getBlocks": { - id: "p2p.peer.getBlocks", - handler: controller.getBlocks, - validation: peerSchemas.getBlocks, - }, "/p2p/peer/getCommonBlocks": { id: "p2p.peer.getCommonBlocks", handler: controller.getCommonBlocks, @@ -26,17 +21,6 @@ export class PeerRoute extends Route { handler: controller.getStatus, validation: peerSchemas.getStatus, }, - "/p2p/peer/postBlock": { - id: "p2p.peer.postBlock", - handler: controller.postBlock, - validation: peerSchemas.postBlock, - maxBytes: 20 * 1024 * 1024, // TODO maxBytes for each route - }, - "/p2p/peer/postTransactions": { - id: "p2p.peer.postTransactions", - handler: controller.postTransactions, - validation: peerSchemas.postTransactions, - }, }; } diff --git a/packages/core-p2p/src/socket-server/routes/transactions.ts b/packages/core-p2p/src/socket-server/routes/transactions.ts new file mode 100644 index 0000000000..a5a1b502d7 --- /dev/null +++ b/packages/core-p2p/src/socket-server/routes/transactions.ts @@ -0,0 +1,22 @@ +import { transactionsSchemas } from "../schemas/transactions"; +import { Route, RouteConfig } from "./route"; +import { TransactionsController } from "../controllers/transactions"; +import { constants } from "../../constants"; + +export class TransactionsRoute extends Route { + public getRoutesConfigByPath(): { [path: string]: RouteConfig } { + const controller = this.getController(); + return { + "/p2p/transactions/postTransactions": { + id: "p2p.transactions.postTransactions", + handler: controller.postTransactions, + validation: transactionsSchemas.postTransactions, + maxBytes: constants.DEFAULT_MAX_PAYLOAD, + }, + }; + } + + protected getController(): TransactionsController { + return this.app.resolve(TransactionsController); + } +} diff --git a/packages/core-p2p/src/socket-server/schemas/blocks.ts b/packages/core-p2p/src/socket-server/schemas/blocks.ts new file mode 100644 index 0000000000..33e8a38592 --- /dev/null +++ b/packages/core-p2p/src/socket-server/schemas/blocks.ts @@ -0,0 +1,17 @@ +import Joi from "@hapi/joi"; + +export const blocksSchemas = { + getBlocks: Joi.object({ + lastBlockHeight: Joi.number().integer().min(1), + blockLimit: Joi.number().integer().min(1).max(400), + headersOnly: Joi.boolean(), + serialized: Joi.boolean(), + }), + + postBlock: Joi.object({ + block: Joi.object({ + type: "Buffer", + data: Joi.array(), // TODO better way to validate buffer ? + }), + }), +}; diff --git a/packages/core-p2p/src/socket-server/schemas/peer.ts b/packages/core-p2p/src/socket-server/schemas/peer.ts index 35157218f3..09c75e56f1 100644 --- a/packages/core-p2p/src/socket-server/schemas/peer.ts +++ b/packages/core-p2p/src/socket-server/schemas/peer.ts @@ -3,27 +3,9 @@ import Joi from "@hapi/joi"; export const peerSchemas = { getPeers: Joi.object().max(0), // empty object expected - getBlocks: Joi.object({ - lastBlockHeight: Joi.number().integer().min(1), - blockLimit: Joi.number().integer().min(1).max(400), - headersOnly: Joi.boolean(), - serialized: Joi.boolean(), - }), - getCommonBlocks: Joi.object({ ids: Joi.array().min(1).max(10).items(Joi.string()), // TODO strings are block ids }), getStatus: Joi.object().max(0), // empty object expected - - postBlock: Joi.object({ - block: Joi.object({ - type: "Buffer", - data: Joi.array(), // TODO better way to validate buffer ? - }), - }), - - postTransactions: Joi.object({ - transactions: Joi.array(), // TODO array of transactions, needs Joi transaction schema - }), }; diff --git a/packages/core-p2p/src/socket-server/schemas/transactions.ts b/packages/core-p2p/src/socket-server/schemas/transactions.ts new file mode 100644 index 0000000000..db214cee61 --- /dev/null +++ b/packages/core-p2p/src/socket-server/schemas/transactions.ts @@ -0,0 +1,7 @@ +import Joi from "@hapi/joi"; + +export const transactionsSchemas = { + postTransactions: Joi.object({ + transactions: Joi.array(), // TODO array of transactions, needs Joi transaction schema + }), +}; diff --git a/packages/core-p2p/src/socket-server/server.ts b/packages/core-p2p/src/socket-server/server.ts index e7bf7ac864..2d5ca82cae 100644 --- a/packages/core-p2p/src/socket-server/server.ts +++ b/packages/core-p2p/src/socket-server/server.ts @@ -7,6 +7,9 @@ import { ValidatePlugin } from "./plugins/validate"; import { WhitelistForgerPlugin } from "./plugins/whitelist-forger"; import { InternalRoute } from "./routes/internal"; import { PeerRoute } from "./routes/peer"; +import { PortsOffset } from "../enums"; +import { BlocksRoute } from "./routes/blocks"; +import { TransactionsRoute } from "./routes/transactions"; // todo: review the implementation @Container.injectable() @@ -32,7 +35,21 @@ export class Server { * @type {HapiServer} * @memberof Server */ - private server!: HapiServer; + private peerServer!: HapiServer; + + /** + * @private + * @type {HapiServer} + * @memberof Server + */ + private blocksServer!: HapiServer; + + /** + * @private + * @type {HapiServer} + * @memberof Server + */ + private transactionsServer!: HapiServer; /** * @private @@ -49,17 +66,29 @@ export class Server { */ public async initialize(name: string, optionsServer: Types.JsonObject): Promise { this.name = name; - this.server = new HapiServer({ address: optionsServer.hostname, port: optionsServer.port }); - (this.server.app as any).app = this.app; - await this.server.register({ plugin }); + const address = optionsServer.hostname; + const basePort = Number(optionsServer.port); + + this.peerServer = new HapiServer({ address, port: basePort + PortsOffset.Peer }); + this.blocksServer = new HapiServer({ address, port: basePort + PortsOffset.Blocks }); + this.transactionsServer = new HapiServer({ address, port: basePort + PortsOffset.Transactions }); + + for (const server of [this.peerServer, this.blocksServer, this.transactionsServer]) { + (server.app as any).app = this.app; + await server.register({ plugin }); + + this.app.resolve(ValidatePlugin).register(server); + }; + + this.app.resolve(InternalRoute).register(this.peerServer); + this.app.resolve(PeerRoute).register(this.peerServer); + this.app.resolve(WhitelistForgerPlugin).register(this.peerServer); + this.app.resolve(AcceptPeerPlugin).register(this.peerServer); - this.app.resolve(InternalRoute).register(this.server); - this.app.resolve(PeerRoute).register(this.server); + this.app.resolve(BlocksRoute).register(this.blocksServer); - this.app.resolve(ValidatePlugin).register(this.server); - this.app.resolve(AcceptPeerPlugin).register(this.server); - this.app.resolve(WhitelistForgerPlugin).register(this.server); + this.app.resolve(TransactionsRoute).register(this.transactionsServer); } /** @@ -68,9 +97,14 @@ export class Server { */ public async boot(): Promise { try { - await this.server.start(); + await this.peerServer.start(); + this.logger.info(`${this.name} P2P peer server started at ${this.peerServer.info.uri}`); - this.logger.info(`${this.name} P2P server started at ${this.server.info.uri}`); + await this.blocksServer.start(); + this.logger.info(`${this.name} P2P blocks server started at ${this.blocksServer.info.uri}`); + + await this.transactionsServer.start(); + this.logger.info(`${this.name} P2P transactions server started at ${this.transactionsServer.info.uri}`); } catch { await this.app.terminate(`Failed to start ${this.name} Server!`); } @@ -82,9 +116,14 @@ export class Server { */ public async dispose(): Promise { try { - await this.server.stop(); + await this.peerServer.stop(); + this.logger.info(`${this.name} P2P peer server stopped at ${this.peerServer.info.uri}`); + + await this.blocksServer.stop(); + this.logger.info(`${this.name} P2P blocks server stopped at ${this.blocksServer.info.uri}`); - this.logger.info(`${this.name} Server stopped at ${this.server.info.uri}`); + await this.transactionsServer.stop(); + this.logger.info(`${this.name} P2P transactions server stopped at ${this.transactionsServer.info.uri}`); } catch { await this.app.terminate(`Failed to stop ${this.name} Server!`); } @@ -97,7 +136,9 @@ export class Server { */ // @todo: add proper types public async register(plugins: any | any[]): Promise { - return this.server.register(plugins); + for (const server of [this.peerServer, this.blocksServer, this.transactionsServer]) { + await server.register(plugins); + } } /** @@ -106,7 +147,9 @@ export class Server { * @memberof Server */ public async route(routes: ServerRoute | ServerRoute[]): Promise { - return this.server.route(routes); + for (const server of [this.peerServer, this.blocksServer, this.transactionsServer]) { + await server.route(routes); + } } /** @@ -115,6 +158,8 @@ export class Server { * @memberof Server */ public async inject(options: string | ServerInjectOptions): Promise { - return this.server.inject(options); + for (const server of [this.peerServer, this.blocksServer, this.transactionsServer]) { + await server.inject(options); + } } } diff --git a/packages/core-p2p/src/socket-server/utils/get-peer-port.ts b/packages/core-p2p/src/socket-server/utils/get-peer-port.ts new file mode 100644 index 0000000000..44404793ad --- /dev/null +++ b/packages/core-p2p/src/socket-server/utils/get-peer-port.ts @@ -0,0 +1,22 @@ +import { Contracts } from "@arkecosystem/core-kernel"; +import { PortsOffset } from "../../enums"; + +const mapEventPrefixToPortOffset = { + "p2p.peer": PortsOffset.Peer, + "p2p.internal": PortsOffset.Peer, + "p2p.blocks": PortsOffset.Blocks, + "p2p.transactions": PortsOffset.Transactions, +}; + +export const getPeerPortForEvent = (peer: Contracts.P2P.Peer, event: string) => { + const eventPrefix = event.split(".").slice(0, 2).join("."); + return Number(peer.port) + mapEventPrefixToPortOffset[eventPrefix]; +}; + +export const getAllPeerPorts = (peer: Contracts.P2P.Peer) => { + return [ + PortsOffset.Peer, + PortsOffset.Blocks, + PortsOffset.Transactions + ].map((portOffset) => Number(peer.port) + portOffset); +};