diff --git a/__tests__/integration/core-api/handlers/transactions.test.ts b/__tests__/integration/core-api/handlers/transactions.test.ts index 6baadeaed7..3ce5857815 100644 --- a/__tests__/integration/core-api/handlers/transactions.test.ts +++ b/__tests__/integration/core-api/handlers/transactions.test.ts @@ -81,7 +81,8 @@ describe("API 2.0 - Transactions", () => { pageCount: 2, previous: null, self: "/transactions?transform=true&page=1&limit=100", - totalCount: 110, + totalCount: expect.any(Number), // for some reason it can give a different number, + // if it's executed with the whole test suite :think: TODO fix it totalCountIsEstimate: true, }; expect(response.data.meta).toEqual(expectedMeta); diff --git a/__tests__/integration/core-p2p/mocks/core-container.ts b/__tests__/integration/core-p2p/mocks/core-container.ts index f32ba4f2ae..9e4841bf10 100644 --- a/__tests__/integration/core-p2p/mocks/core-container.ts +++ b/__tests__/integration/core-p2p/mocks/core-container.ts @@ -32,6 +32,9 @@ jest.mock("@arkecosystem/core-container", () => { }, getMilestone: () => ({ activeDelegates: 51, + block: { + maxTransactions: 500, + }, }), }; }, diff --git a/__tests__/integration/core-p2p/socket-server/peer-with-remote-access.test.ts b/__tests__/integration/core-p2p/socket-server/peer-with-remote-access.test.ts index 9978d458d8..c2b859f945 100644 --- a/__tests__/integration/core-p2p/socket-server/peer-with-remote-access.test.ts +++ b/__tests__/integration/core-p2p/socket-server/peer-with-remote-access.test.ts @@ -18,7 +18,7 @@ let emit; const headers = { version: "2.1.0", - port: "4009", + port: 4009, height: 1, "Content-Type": "application/json", }; diff --git a/__tests__/integration/core-p2p/socket-server/peer.test.ts b/__tests__/integration/core-p2p/socket-server/peer.test.ts index 7e0354ec35..9af9b22488 100644 --- a/__tests__/integration/core-p2p/socket-server/peer.test.ts +++ b/__tests__/integration/core-p2p/socket-server/peer.test.ts @@ -20,13 +20,14 @@ let server: SocketCluster; let socket; let connect; let emit; +let invalidOpcode; let ping; let pong; let send; const headers = { version: "2.1.0", - port: "4009", + port: 4009, height: 1, "Content-Type": "application/json", }; @@ -61,6 +62,7 @@ beforeAll(async () => { ping = () => socket.transport.socket.ping(); pong = () => socket.transport.socket.pong(); + invalidOpcode = () => socket.transport.socket._socket.write(Buffer.from("8780d0b6fbd2", "hex")); jest.spyOn(processor, "validateAndAcceptPeer").mockImplementation(jest.fn()); }); @@ -115,14 +117,17 @@ describe("Peer socket endpoint", () => { expect(data).toEqual({}); }); - it("should throw validation error when sending wrong data", async () => { + it("should throw error when sending wrong data", async () => { await delay(1000); await expect( emit("p2p.peer.postBlock", { data: {}, headers, }), - ).rejects.toHaveProperty("name", "Error"); + ).rejects.toHaveProperty("name", "BadConnectionError"); + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should throw error when sending wrong buffer", async () => { @@ -134,7 +139,37 @@ describe("Peer socket endpoint", () => { }, headers, }), - ).rejects.toHaveProperty("name", "BadConnectionError"); + ).rejects.toHaveProperty("name", "Error"); + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + }); + + it("should throw error if too many transactions are in the block", async () => { + await delay(2000); + const dummyBlock = BlockFactory.createDummy(); + const transaction = TransactionFactory.transfer(wallets[0].address, 111) + .withNetwork("unitnet") + .withPassphrase("one two three") + .build(); + + for (let i = 0; i < 1000; i++) { + dummyBlock.transactions.push(transaction[0]); + } + + dummyBlock.data.numberOfTransactions = 1000; + + await expect( + emit("p2p.peer.postBlock", { + data: { + block: Blocks.Serializer.serializeWithTransactions({ + ...dummyBlock.data, + transactions: dummyBlock.transactions.map(tx => tx.data), + }), + }, + headers, + }), + ).rejects.toHaveProperty("name", "Error"); }); }); @@ -155,29 +190,30 @@ describe("Peer socket endpoint", () => { // because our mocking makes all transactions to be invalid (already in cache) }); - it("should throw validation error when sending too much transactions", async () => { + it("should reject when sending too much transactions", async () => { const transactions = TransactionFactory.transfer(wallets[0].address, 111) .withNetwork("unitnet") .withPassphrase("one two three") .create(50); - // TODO: test makes no sense anymore await expect( emit("p2p.peer.postTransactions", { data: { transactions }, headers, }), - ).toResolve(); + ).toReject(); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should disconnect the client if it sends an invalid message payload", async () => { + connect(); await delay(1000); expect(socket.state).toBe("open"); - send('{"event": "#handshake", "data": {}, "cid": 1}'); - await delay(500); - send("Invalid payload"); await delay(1000); @@ -194,9 +230,6 @@ describe("Peer socket endpoint", () => { expect(socket.state).toBe("open"); - send('{"event": "#handshake", "data": {}, "cid": 1}'); - await delay(500); - send("#2"); await delay(1000); @@ -242,11 +275,58 @@ describe("Peer socket endpoint", () => { server.killWorkers({ immediate: true }); await delay(2000); // give time to workers to respawn }); + + it("should block the client if it sends an invalid opcode", async () => { + connect(); + await delay(1000); + + expect(socket.state).toBe("open"); + + invalidOpcode(); + await delay(500); + expect(socket.state).toBe("closed"); + await delay(500); + connect(); + await delay(500); + expect(socket.state).toBe("closed"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + }); }); }); describe("Socket errors", () => { + it("should disconnect all sockets from same ip if another connection is made from the same IP address", async () => { + connect(); + await delay(1000); + + expect(socket.state).toBe("open"); + + const secondSocket = socketCluster.create({ + port: 4007, + hostname: "127.0.0.1", + multiplex: false, + }); + secondSocket.on("error", () => { + // + }); + + secondSocket.connect(); + + await delay(1000); + + expect(socket.state).toBe("closed"); + expect(secondSocket.state).toBe("open"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + }); + it("should accept the request when below rate limit", async () => { + connect(); await delay(1000); for (let i = 0; i < 2; i++) { const { data } = await emit("p2p.peer.getStatus", { @@ -288,15 +368,16 @@ describe("Peer socket endpoint", () => { const block = BlockFactory.createDummy(); - const postBlock = () => emit("p2p.peer.postBlock", { - headers, - data: { - block: Blocks.Serializer.serializeWithTransactions({ - ...block.data, - transactions: block.transactions.map(tx => tx.data), - }), - }, - }); + const postBlock = () => + emit("p2p.peer.postBlock", { + headers, + data: { + block: Blocks.Serializer.serializeWithTransactions({ + ...block.data, + transactions: block.transactions.map(tx => tx.data), + }), + }, + }); await expect(postBlock()).toResolve(); await expect(postBlock()).toResolve(); @@ -326,9 +407,14 @@ describe("Peer socket endpoint", () => { }, ), ).rejects.toHaveProperty("name", "BadConnectionError"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should close the connection when the event does not start with p2p", async () => { + connect(); await delay(1000); await expect( @@ -337,9 +423,14 @@ describe("Peer socket endpoint", () => { data: {}, }), ).rejects.toHaveProperty("name", "BadConnectionError"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should close the connection when the version is invalid", async () => { + connect(); await delay(1000); await expect( @@ -348,9 +439,13 @@ describe("Peer socket endpoint", () => { data: {}, }), ).rejects.toHaveProperty("name", "BadConnectionError"); + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should close the connection and prevent reconnection if blocked", async () => { + connect(); await delay(1000); await emit("p2p.peer.getPeers", { @@ -375,6 +470,25 @@ describe("Peer socket endpoint", () => { await delay(1000); expect(socket.state).not.toBe("open"); + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn + }); + + it("should close the connection when using unsupported event messages", async () => { + connect(); + await delay(1000); + + await expect( + emit("#subscribe", { + headers, + data: {}, + }), + ).rejects.toHaveProperty("name", "BadConnectionError"); + + // kill workers to reset ipLastError (or we won't pass handshake for 1 minute) + server.killWorkers({ immediate: true }); + await delay(2000); // give time to workers to respawn }); it("should close the connection if it sends data after a disconnect packet", async () => { @@ -394,5 +508,31 @@ describe("Peer socket endpoint", () => { server.killWorkers({ immediate: true }); await delay(2000); // give time to workers to respawn }); + + it("should close the connection when the JSON includes additional properties", async () => { + connect(); + await delay(1000); + const payload: any = {}; + payload.event = "p2p.peer.getCommonBlocks"; + payload.data = { data: { ids: ["1"] }, headers: {} }; + payload.cid = 1; + + const symbol = String.fromCharCode(42); + for (let i = 0; i < 30000; i++) { + const char = String.fromCharCode(i); + if (JSON.stringify(String.fromCharCode(i)).length === 3) { + payload.data[char] = 1; + payload.data[symbol + char] = 1; + payload.data[symbol + char + symbol] = 1; + payload.data[char] = 1; + } + } + + const stringifiedPayload = JSON.stringify(payload).replace(/ /g, ""); + expect(socket.state).toBe("open"); + send(stringifiedPayload); + await delay(500); + expect(socket.state).not.toBe("open"); + }); }); }); diff --git a/__tests__/unit/core-blockchain/blockchain.test.ts b/__tests__/unit/core-blockchain/blockchain.test.ts index bec703f0e9..bbb8621e3e 100644 --- a/__tests__/unit/core-blockchain/blockchain.test.ts +++ b/__tests__/unit/core-blockchain/blockchain.test.ts @@ -243,7 +243,7 @@ describe("Blockchain", () => { .mockReturnValueOnce(1) .mockReturnValueOnce(1); - await blockchain.handleIncomingBlock(blocks101to155[54]); + blockchain.handleIncomingBlock(blocks101to155[54]); expect(loggerInfo).toHaveBeenCalledWith("Block disregarded because blockchain is not ready"); blockchain.state.started = true; @@ -343,4 +343,49 @@ describe("Blockchain", () => { expect(loggerInfo).toHaveBeenCalledWith("Starting ARK Core for a new world, welcome aboard"); }); }); + + describe("checkMissingBlocks", () => { + afterEach(() => { + jest.restoreAllMocks(); + }); + + it("should fork if random result is < 0.80", async () => { + jest.spyOn(blockchain, "checkMissingBlocks"); + jest.spyOn(getMonitor, "checkNetworkHealth").mockResolvedValueOnce({ forked: true }); + jest.spyOn(Math, "random").mockReturnValue(0.2); + + // @ts-ignore + blockchain.missedBlocks = Managers.configManager.getMilestone().activeDelegates; + await blockchain.checkMissingBlocks(); + + // @ts-ignore + expect(blockchain.missedBlocks).toBe(0); + }); + + it("should fork if random result is 0.80", async () => { + jest.spyOn(blockchain, "checkMissingBlocks"); + jest.spyOn(getMonitor, "checkNetworkHealth").mockResolvedValueOnce({ forked: true }); + jest.spyOn(Math, "random").mockReturnValue(0.8); + + // @ts-ignore + blockchain.missedBlocks = Managers.configManager.getMilestone().activeDelegates; + await blockchain.checkMissingBlocks(); + + // @ts-ignore + expect(blockchain.missedBlocks).toBe(0); + }); + + it("should not take action if random result is > 0.80", async () => { + jest.spyOn(blockchain, "checkMissingBlocks"); + jest.spyOn(getMonitor, "checkNetworkHealth").mockResolvedValueOnce({ forked: true }); + jest.spyOn(Math, "random").mockReturnValue(0.9); + + // @ts-ignore + blockchain.missedBlocks = Managers.configManager.getMilestone().activeDelegates; + await blockchain.checkMissingBlocks(); + + // @ts-ignore + expect(blockchain.missedBlocks).toBe(Managers.configManager.getMilestone().activeDelegates + 1); + }); + }); }); diff --git a/__tests__/unit/core-blockchain/mocks/p2p/network-monitor.ts b/__tests__/unit/core-blockchain/mocks/p2p/network-monitor.ts index b0478c4e5a..c111028e64 100644 --- a/__tests__/unit/core-blockchain/mocks/p2p/network-monitor.ts +++ b/__tests__/unit/core-blockchain/mocks/p2p/network-monitor.ts @@ -1,8 +1,7 @@ export const getMonitor = { getNetworkHeight: () => 1, updateNetworkStatus: () => undefined, - // tslint:disable-next-line: no-empty - checkNetworkHealth: () => {}, + checkNetworkHealth: () => undefined, downloadBlocksFromHeight: () => [], start: () => undefined, refreshPeersAfterFork: () => undefined, diff --git a/__tests__/unit/core-p2p/mocks/request.ts b/__tests__/unit/core-p2p/mocks/request.ts new file mode 100644 index 0000000000..3f490e1eb3 --- /dev/null +++ b/__tests__/unit/core-p2p/mocks/request.ts @@ -0,0 +1,6 @@ +export const request = { + event: undefined, + socket: { + terminate: jest.fn(), + }, +}; diff --git a/__tests__/unit/core-p2p/socket-server/versions/internal/handlers/rate-limiter.test.ts b/__tests__/unit/core-p2p/socket-server/versions/internal/handlers/rate-limiter.test.ts new file mode 100644 index 0000000000..db9f27f30e --- /dev/null +++ b/__tests__/unit/core-p2p/socket-server/versions/internal/handlers/rate-limiter.test.ts @@ -0,0 +1,24 @@ +import "../../../../mocks/core-container"; + +import { getRateLimitedEndpoints } from "../../../../../../../packages/core-p2p/src/socket-server/versions/internal"; +import { createPeerService } from "../../../../../../helpers/peers"; + +describe("Internal handlers - rate limiter", () => { + describe("getRateLimitedEndpoints", () => { + it("should return rate limited endpoints", async () => { + const { service } = createPeerService(); + const endpoints = getRateLimitedEndpoints({ service }); + + expect(endpoints).toEqual( + expect.arrayContaining([ + "p2p.peer.postBlock", + "p2p.peer.getBlocks", + "p2p.peer.getPeers", + "p2p.peer.getStatus", + "p2p.peer.getCommonBlocks", + ]), + ); + expect(endpoints).not.toContain("p2p.peer.postTransactions"); + }); + }); +}); diff --git a/__tests__/unit/core-p2p/socket-server/worker.test.ts b/__tests__/unit/core-p2p/socket-server/worker.test.ts index 9eaf9a09f0..20f1d9db10 100644 --- a/__tests__/unit/core-p2p/socket-server/worker.test.ts +++ b/__tests__/unit/core-p2p/socket-server/worker.test.ts @@ -1,3 +1,4 @@ +import { request } from "../mocks/request"; import "../mocks/scworker"; import delay from "delay"; @@ -21,7 +22,7 @@ describe("Worker", () => { await delay(500); // @ts-ignore - expect(worker.sendToMasterAsync).toHaveBeenLastCalledWith("p2p.utils.getConfig"); + expect(worker.sendToMasterAsync).toHaveBeenLastCalledWith("p2p.utils.getHandlers"); // registering endpoint on connection expect(worker.scServer.on).toHaveBeenCalledWith("connection", expect.any(Function)); @@ -37,5 +38,38 @@ describe("Worker", () => { expect.any(Function), ); }); + + it("should use the local rate limiter", async () => { + jest.restoreAllMocks(); + // @ts-ignore + jest.spyOn(worker, "sendToMasterAsync").mockResolvedValue({ data: {} }); + // @ts-ignore + jest.spyOn(worker, "getRateLimitedEndpoints").mockReturnValue({ + "p2p.peer.postBlock": true, + }); + request.event = "p2p.peer.postBlock"; + // @ts-ignore + await worker.handleEmit(request, undefined); + // @ts-ignore + expect(worker.sendToMasterAsync).toHaveBeenCalledWith( + "p2p.internal.getRateLimitStatus", + expect.any(Object), + ); + }); + + it("should use the shared rate limiter", async () => { + jest.restoreAllMocks(); + // @ts-ignore + jest.spyOn(worker, "sendToMasterAsync").mockResolvedValue({ data: {} }); + // @ts-ignore + jest.spyOn(worker, "getRateLimitedEndpoints").mockReturnValue({ + "p2p.peer.postBlock": true, + }); + request.event = "p2p.peer.postTransactions"; + // @ts-ignore + await worker.handleEmit(request, undefined); + // @ts-ignore + expect(worker.sendToMasterAsync).not.toHaveBeenCalled(); + }); }); }); diff --git a/packages/core-blockchain/src/blockchain.ts b/packages/core-blockchain/src/blockchain.ts index 99d51a6fc6..04bb40d4d4 100644 --- a/packages/core-blockchain/src/blockchain.ts +++ b/packages/core-blockchain/src/blockchain.ts @@ -159,17 +159,8 @@ export class Blockchain implements blockchain.IBlockchain { this.p2p.getMonitor().cleansePeers({ forcePing: true, peerCount: 10 }); - emitter.on(ApplicationEvents.ForgerMissing, async () => { - this.missedBlocks++; - if (this.missedBlocks >= Managers.configManager.getMilestone().activeDelegates / 3 - 1) { - const networkStatus = await this.p2p.getMonitor().checkNetworkHealth(); - if (networkStatus.forked) { - this.state.numberOfBlocksToRollback = networkStatus.blocksToRollback; - this.dispatch("FORK"); - } - - this.missedBlocks = 0; - } + emitter.on(ApplicationEvents.ForgerMissing, () => { + this.checkMissingBlocks(); }); emitter.on(ApplicationEvents.RoundApplied, () => { @@ -571,4 +562,23 @@ export class Blockchain implements blockchain.IBlockchain { public pushPingBlock(block: Interfaces.IBlockData, fromForger: boolean = false): void { this.state.pushPingBlock(block, fromForger); } + + /** + * Check if the blockchain should roll back due to missing blocks. + */ + public async checkMissingBlocks(): Promise { + this.missedBlocks++; + if ( + this.missedBlocks >= Managers.configManager.getMilestone().activeDelegates / 3 - 1 && + Math.random() <= 0.8 + ) { + const networkStatus = await this.p2p.getMonitor().checkNetworkHealth(); + if (networkStatus.forked) { + this.state.numberOfBlocksToRollback = networkStatus.blocksToRollback; + this.dispatch("FORK"); + } + + this.missedBlocks = 0; + } + } } diff --git a/packages/core-interfaces/src/core-blockchain/blockchain.ts b/packages/core-interfaces/src/core-blockchain/blockchain.ts index 26ba2ad298..d909801882 100644 --- a/packages/core-interfaces/src/core-blockchain/blockchain.ts +++ b/packages/core-interfaces/src/core-blockchain/blockchain.ts @@ -139,4 +139,6 @@ export interface IBlockchain { pushPingBlock(block: Interfaces.IBlockData, fromForger?: boolean): void; replay(targetHeight?: number): Promise; + + checkMissingBlocks(): Promise; } diff --git a/packages/core-interfaces/src/core-p2p/network-monitor.ts b/packages/core-interfaces/src/core-p2p/network-monitor.ts index b068ffac6c..58b277d234 100644 --- a/packages/core-interfaces/src/core-p2p/network-monitor.ts +++ b/packages/core-interfaces/src/core-p2p/network-monitor.ts @@ -27,6 +27,7 @@ export interface INetworkMonitor { discoverPeers(initialRun?: boolean): Promise; getNetworkHeight(): number; getNetworkState(): Promise; + getRateLimitedEndpoints(): string[]; getRateLimitStatus(ip: string, endpoint?: string): Promise; isBlockedByRateLimit(ip: string): Promise; refreshPeersAfterFork(): Promise; diff --git a/packages/core-p2p/src/network-monitor.ts b/packages/core-p2p/src/network-monitor.ts index cbac58bffc..b0140a8142 100644 --- a/packages/core-p2p/src/network-monitor.ts +++ b/packages/core-p2p/src/network-monitor.ts @@ -230,6 +230,10 @@ export class NetworkMonitor implements P2P.INetworkMonitor { }; } + public getRateLimitedEndpoints(): string[] { + return this.rateLimiter.getRateLimitedEndpoints(); + } + public async isBlockedByRateLimit(ip: string): Promise { return this.rateLimiter.isBlocked(ip); } @@ -328,7 +332,7 @@ export class NetworkMonitor implements P2P.INetworkMonitor { if (peersNotForked.length === 0) { this.logger.error( `Could not download blocks: We have ${pluralize("peer", peersAll.length, true)} but all ` + - `of them are on a different chain than us`, + `of them are on a different chain than us`, ); return []; } diff --git a/packages/core-p2p/src/rate-limiter.ts b/packages/core-p2p/src/rate-limiter.ts index b804e2a5f5..f860f6cbf7 100644 --- a/packages/core-p2p/src/rate-limiter.ts +++ b/packages/core-p2p/src/rate-limiter.ts @@ -50,6 +50,10 @@ export class RateLimiter { return false; } + public getRateLimitedEndpoints(): string[] { + return Array.from(this.endpoints.keys()); + } + public async isBlocked(ip: string): Promise { const res = await this.global.get(ip); return res !== null && res.remainingPoints <= 0; diff --git a/packages/core-p2p/src/socket-server/errors.ts b/packages/core-p2p/src/socket-server/errors.ts index 229d68a38d..1be99c6964 100644 --- a/packages/core-p2p/src/socket-server/errors.ts +++ b/packages/core-p2p/src/socket-server/errors.ts @@ -32,6 +32,14 @@ export class InvalidTransactionsError extends ServerError { } } +export class TooManyTransactionsError extends ServerError { + constructor(block: Interfaces.IBlockData) { + super( + `Received block ${block.id} at height ${block.height} contained too many transactions (${block.numberOfTransactions}).`, + ); + } +} + export class UnchainedBlockError extends ServerError { constructor(lastHeight: number, nextHeight: number) { super(`Last received block ${nextHeight} cannot be chained to ${lastHeight}.`); diff --git a/packages/core-p2p/src/socket-server/utils/validate.ts b/packages/core-p2p/src/socket-server/utils/validate.ts index c5b118c26a..492c602a52 100644 --- a/packages/core-p2p/src/socket-server/utils/validate.ts +++ b/packages/core-p2p/src/socket-server/utils/validate.ts @@ -11,3 +11,66 @@ export const validate = (schema, data) => { throw error; } }; + +// Specific light validation for transaction, to be used in socket workers +// to perform quick validation on transaction objects received in postTransactions +// TODO rework with v3 when refactoring p2p layer +export const validateTransactionLight = (transaction: any): boolean => { + if (!transaction || typeof transaction !== "object") { + return false; + } + + // except for multipayment transactions that are capped to 128 payments currently, + // a transaction should not have more than 100 properties total + const maxMainProperties = 50; + const maxAssetProperties = 100; // arbitrary, see below + const maxMultiPayments = 128; // hardcoded as will be refactored before increasing max multipayments + if (Object.keys(transaction).length > maxMainProperties) { + return false; + } + + if (transaction.asset && typeof transaction.asset === "object") { + if (transaction.asset.payments && Array.isArray(transaction.asset.payments)) { + if (transaction.asset.payments.length > maxMultiPayments) { + return false; + } + for (const p of transaction.asset.payments) { + if (!p || typeof p !== "object" || Object.keys(p).length !== 2 || !p.recipientId || !p.amount) { + return false; + } + } + } else { + // no "payments" asset, default to counting properties and checking vs maxProperties. + // totally arbitrary as we could have transactions with more properties in asset, + // but this is temporary and will be removed in v3 when p2p layer is refactored + if (objectHasMorePropertiesThan(transaction.asset, maxAssetProperties)) { + return false; + } + } + } + + const shallowClone = { ...transaction }; + delete shallowClone.asset; // to count main properties now + if (objectHasMorePropertiesThan(shallowClone, maxMainProperties)) { + return false; + } + + return true; +}; + +const objectHasMorePropertiesThan = (obj: object, maxProperties: number) => { + let propertiesCount = 0; + try { + JSON.stringify(obj, (key, value) => { + propertiesCount++; + if (propertiesCount > maxProperties) { + throw new Error("exceeded maxProperties"); + } + return value; + }); + } catch (e) { + return true; + } + + return false; +}; diff --git a/packages/core-p2p/src/socket-server/versions/internal.ts b/packages/core-p2p/src/socket-server/versions/internal.ts index 7461659d54..302b53803b 100644 --- a/packages/core-p2p/src/socket-server/versions/internal.ts +++ b/packages/core-p2p/src/socket-server/versions/internal.ts @@ -97,3 +97,7 @@ export const syncBlockchain = (): void => { app.resolvePlugin("blockchain").forceWakeup(); }; + +export const getRateLimitedEndpoints = ({ service }: { service: P2P.IPeerService }): string[] => { + return service.getMonitor().getRateLimitedEndpoints(); +}; diff --git a/packages/core-p2p/src/socket-server/versions/peer.ts b/packages/core-p2p/src/socket-server/versions/peer.ts index 3f0f75cb40..0d209cd7e2 100644 --- a/packages/core-p2p/src/socket-server/versions/peer.ts +++ b/packages/core-p2p/src/socket-server/versions/peer.ts @@ -6,7 +6,7 @@ import pluralize from "pluralize"; import { MissingCommonBlockError } from "../../errors"; import { IPeerPingResponse } from "../../interfaces"; import { isWhitelisted } from "../../utils"; -import { InvalidTransactionsError, UnchainedBlockError } from "../errors"; +import { InvalidTransactionsError, TooManyTransactionsError, UnchainedBlockError } from "../errors"; import { getPeerConfig } from "../utils/get-peer-config"; import { mapAddr } from "../utils/map-addr"; @@ -54,10 +54,19 @@ export const getStatus = async (): Promise => { export const postBlock = async ({ req }): Promise => { const blockchain: Blockchain.IBlockchain = app.resolvePlugin("blockchain"); + const blockHex: string = (req.data.block as Buffer).toString("hex"); + + const deserializedHeader = Blocks.Deserializer.deserialize(blockHex, true); + + if (deserializedHeader.data.numberOfTransactions > app.getConfig().getMilestone().block.maxTransactions) { + throw new TooManyTransactionsError(deserializedHeader.data); + } + const deserialized: { data: Interfaces.IBlockData; transactions: Interfaces.ITransaction[]; - } = Blocks.Deserializer.deserialize((req.data.block as Buffer).toString("hex")); + } = Blocks.Deserializer.deserialize(blockHex); + const block: Interfaces.IBlockData = { ...deserialized.data, transactions: deserialized.transactions.map(tx => tx.data), diff --git a/packages/core-p2p/src/socket-server/versions/utils.ts b/packages/core-p2p/src/socket-server/versions/utils.ts index 4188220169..4b493049d6 100644 --- a/packages/core-p2p/src/socket-server/versions/utils.ts +++ b/packages/core-p2p/src/socket-server/versions/utils.ts @@ -26,5 +26,12 @@ export const isForgerAuthorized = ({ req }): { authorized: boolean } => { }; export const getConfig = (): Record => { - return app.resolveOptions("p2p"); + const config = app.resolveOptions("p2p"); + + // add maxTransactionsPerRequest config from transaction pool + config.maxTransactionsPerRequest = app.has("transaction-pool") + ? app.resolveOptions("transaction-pool").maxTransactionsPerRequest || 40 + : 40; + + return config; }; diff --git a/packages/core-p2p/src/socket-server/worker.ts b/packages/core-p2p/src/socket-server/worker.ts index c191f3a3a3..aebe4b5350 100644 --- a/packages/core-p2p/src/socket-server/worker.ts +++ b/packages/core-p2p/src/socket-server/worker.ts @@ -1,31 +1,55 @@ import { P2P } from "@arkecosystem/core-interfaces"; import Ajv from "ajv"; import { cidr } from "ip"; +import { RateLimiter } from "../rate-limiter"; +import { buildRateLimiter } from "../utils"; + import SCWorker from "socketcluster/scworker"; import { requestSchemas } from "../schemas"; import { codec } from "../utils/sc-codec"; +import { validateTransactionLight } from "./utils/validate"; const MINUTE_IN_MILLISECONDS = 1000 * 60; const HOUR_IN_MILLISECONDS = MINUTE_IN_MILLISECONDS * 60; -const ajv = new Ajv(); +const ajv = new Ajv({ extendRefs: true }); export class Worker extends SCWorker { private config: Record; + private handlers: string[] = []; private ipLastError: Record = {}; + private rateLimiter: RateLimiter; + private rateLimitedEndpoints: any; public async run() { this.log(`Socket worker started, PID: ${process.pid}`); this.scServer.setCodecEngine(codec); + await this.loadRateLimitedEndpoints(); await this.loadConfiguration(); + this.rateLimiter = buildRateLimiter({ + rateLimit: this.config.rateLimit, + remoteAccess: this.config.remoteAccess, + whitelist: this.config.whitelist, + }); + // purge ipLastError every hour to free up memory setInterval(() => (this.ipLastError = {}), HOUR_IN_MILLISECONDS); + await this.loadHandlers(); + // @ts-ignore this.scServer.wsServer.on("connection", (ws, req) => { + const clients = [...Object.values(this.scServer.clients), ...Object.values(this.scServer.pendingClients)]; + const existingSockets = clients.filter( + client => + client.remoteAddress === req.socket.remoteAddress && client.remotePort !== req.socket.remotePort, + ); + for (const socket of existingSockets) { + socket.terminate(); + } this.handlePayload(ws, req); }); this.scServer.on("connection", socket => this.handleConnection(socket)); @@ -35,70 +59,171 @@ export class Worker extends SCWorker { this.scServer.addMiddleware(this.scServer.MIDDLEWARE_EMIT, (req, next) => this.handleEmit(req, next)); } + private async loadHandlers(): Promise { + const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers"); + for (const [version, handlers] of Object.entries(data)) { + for (const handler of Object.values(handlers)) { + this.handlers.push(`p2p.${version}.${handler}`); + } + } + } + private async loadConfiguration(): Promise { const { data } = await this.sendToMasterAsync("p2p.utils.getConfig"); this.config = data; } + private async loadRateLimitedEndpoints(): Promise { + const { data } = await this.sendToMasterAsync("p2p.internal.getRateLimitedEndpoints", { data: {} }); + this.rateLimitedEndpoints = (Array.isArray(data) ? data : []).reduce((object, value) => { + object[value] = true; + return object; + }, {}); + } + + private getRateLimitedEndpoints() { + return this.rateLimitedEndpoints; + } + private handlePayload(ws, req) { + ws.removeAllListeners("ping"); + ws.removeAllListeners("pong"); ws.prependListener("ping", () => { this.setErrorForIpAndTerminate(ws, req); }); ws.prependListener("pong", () => { this.setErrorForIpAndTerminate(ws, req); }); + + ws.prependListener("error", error => { + if (error instanceof RangeError) { + this.setErrorForIpAndTerminate(ws, req); + } + }); + + const messageListeners = ws.listeners("message"); + ws.removeAllListeners("message"); ws.prependListener("message", message => { if (ws._disconnected) { - this.setErrorForIpAndTerminate(ws, req); + return this.setErrorForIpAndTerminate(ws, req); } else if (message === "#2") { const timeNow: number = new Date().getTime() / 1000; if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) { - this.setErrorForIpAndTerminate(ws, req); + return this.setErrorForIpAndTerminate(ws, req); } ws._lastPingTime = timeNow; } else if (message.length < 10) { // except for #2 message, we should have JSON with some required properties // (see below) which implies that message length should be longer than 10 chars - this.setErrorForIpAndTerminate(ws, req); + return this.setErrorForIpAndTerminate(ws, req); } else { try { const parsed = JSON.parse(message); if (parsed.event === "#disconnect") { ws._disconnected = true; + } else if (parsed.event === "#handshake") { + if (ws._handshake) { + this.setErrorForIpAndTerminate(ws, req); + } + ws._handshake = true; } else if ( typeof parsed.event !== "string" || typeof parsed.data !== "object" || + this.hasAdditionalProperties(parsed) || (typeof parsed.cid !== "number" && - (parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) + (parsed.event === "#disconnect" && typeof parsed.cid !== "undefined")) || + !this.handlers.includes(parsed.event) ) { - this.setErrorForIpAndTerminate(ws, req); + return this.setErrorForIpAndTerminate(ws, req); } } catch (error) { - this.setErrorForIpAndTerminate(ws, req); + return this.setErrorForIpAndTerminate(ws, req); } } + + // we call the other listeners ourselves + for (const listener of messageListeners) { + listener(message); + } }); } + private hasAdditionalProperties(object): boolean { + if (Object.keys(object).filter(key => key !== "event" && key !== "data" && key !== "cid").length) { + return true; + } + const event = object.event.split("."); + if (object.event !== "#handshake" && object.event !== "#disconnect") { + if (event.length !== 3) { + return true; + } + if (Object.keys(object.data).filter(key => key !== "data" && key !== "headers").length) { + return true; + } + } + if (object.data.data) { + // @ts-ignore + const [_, version, handler] = event; + const schema = requestSchemas[version][handler]; + try { + if (object.event === "p2p.peer.postTransactions") { + if ( + typeof object.data.data === "object" && + object.data.data.transactions && + Array.isArray(object.data.data.transactions) && + object.data.data.transactions.length <= this.config.maxTransactionsPerRequest + ) { + for (const transaction of object.data.data.transactions) { + if (!validateTransactionLight(transaction)) { + return true; + } + } + } else { + return true; + } + } else if (schema && !ajv.validate(schema, object.data.data)) { + return true; + } + } catch { + // + } + } + if (object.data.headers) { + if ( + Object.keys(object.data.headers).filter( + key => key !== "version" && key !== "port" && key !== "height" && key !== "Content-Type", + ).length + ) { + return true; + } + if ( + (object.data.headers.version && typeof object.data.headers.version !== "string") || + (object.data.headers.port && typeof object.data.headers.port !== "number") || + (object.data.headers["Content-Type"] && typeof object.data.headers["Content-Type"] !== "string") || + (object.data.headers.height && typeof object.data.headers.height !== "number") + ) { + // this prevents the nesting of other objects inside these properties + return true; + } + } + return false; + } + private setErrorForIpAndTerminate(ws, req): void { this.ipLastError[req.socket.remoteAddress] = Date.now(); ws.terminate(); } private async handleConnection(socket): Promise { - const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers"); - - for (const [version, handlers] of Object.entries(data)) { - for (const handler of Object.values(handlers)) { - // @ts-ignore - socket.on(`p2p.${version}.${handler}`, async (data, res) => { - try { - return res(undefined, await this.sendToMasterAsync(`p2p.${version}.${handler}`, data)); - } catch (e) { - return res(e); - } - }); - } + for (const handler of this.handlers) { + // @ts-ignore + socket.on(handler, async (data, res) => { + try { + return res(undefined, await this.sendToMasterAsync(handler, data)); + } catch (e) { + return res(e); + } + }); } } @@ -139,20 +264,31 @@ export class Worker extends SCWorker { req.socket.terminate(); return; } - - const { data }: { data: P2P.IRateLimitStatus } = await this.sendToMasterAsync( - "p2p.internal.getRateLimitStatus", - { - data: { - ip: req.socket.remoteAddress, - endpoint: req.event, + const rateLimitedEndpoints = this.getRateLimitedEndpoints(); + const useLocalRateLimiter: boolean = !rateLimitedEndpoints[req.event]; + if (useLocalRateLimiter) { + if (await this.rateLimiter.hasExceededRateLimit(req.socket.remoteAddress, req.event)) { + if (await this.rateLimiter.isBlocked(req.socket.remoteAddress)) { + req.socket.terminate(); + return; + } + req.socket.terminate(); + return; + } + } else { + const { data }: { data: P2P.IRateLimitStatus } = await this.sendToMasterAsync( + "p2p.internal.getRateLimitStatus", + { + data: { + ip: req.socket.remoteAddress, + endpoint: req.event, + }, }, - }, - ); - - if (data.exceededLimitOnEndpoint) { - req.socket.terminate(); - return; + ); + if (data.exceededLimitOnEndpoint) { + req.socket.terminate(); + return; + } } // ensure basic format of incoming data, req.data must be as { data, headers } @@ -187,18 +323,7 @@ export class Worker extends SCWorker { } } else if (version === "peer") { const requestSchema = requestSchemas.peer[handler]; - if (["postTransactions", "postBlock"].includes(handler)) { - // has to be in the peer list to use the endpoint - const { - data: { isPeerOrForger }, - } = await this.sendToMasterAsync("p2p.internal.isPeerOrForger", { - data: { ip: req.socket.remoteAddress }, - }); - if (!isPeerOrForger) { - req.socket.terminate(); - return; - } - } else if (requestSchema && !ajv.validate(requestSchema, req.data.data)) { + if (handler !== "postTransactions" && requestSchema && !ajv.validate(requestSchema, req.data.data)) { req.socket.terminate(); return; } diff --git a/packages/crypto/src/networks/unitnet/milestones.json b/packages/crypto/src/networks/unitnet/milestones.json index be60e6a1b1..f27ae7e772 100644 --- a/packages/crypto/src/networks/unitnet/milestones.json +++ b/packages/crypto/src/networks/unitnet/milestones.json @@ -7,7 +7,7 @@ "block": { "version": 0, "maxTransactions": 150, - "maxPayload": 2097152 + "maxPayload": 6300000 }, "epoch": "2017-03-21T13:00:00.000Z", "fees": { @@ -44,4 +44,4 @@ "idFullSha256": true } } -] \ No newline at end of file +]