Skip to content

Commit

Permalink
Merge remote-tracking branch 'ArkEcosystem/core/develop' into is-bloc…
Browse files Browse the repository at this point in the history
…k-chained-verbose

* ArkEcosystem/core/develop:
  test: mock missing wsServer
  release: 2.5.24 (#2908)
  fix(core-p2p): terminate blocked client connections
  fix(core-p2p): drop connections with malformed messages
  fix(core-api): return block timestamp for v2 transactions (#2906)
  fix(core-blockchain): only shift milestoneHeights[] if at that height (#2904)
  fix(crypto): use `anyOf` for transactions schema (#2894)
  fix(core-webhooks): cast params in condition checks (#2887)
  feat(core-p2p): use compression on the p2p level (#2886)
  • Loading branch information
vasild committed Sep 5, 2019
2 parents 425fe65 + a9e2274 commit 7db1ba7
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 7 deletions.
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

## [Unreleased]

## [2.5.19] - 2019-08-14
## [2.5.24] - 2019-09-04

### Fixed

- Disconnect on invalid P2P requests ([#2875])
- Stricter P2P reply schemas ([#2875])
- Cast params in webhook condition checks ([#2887])
- Drop connections with malformed messages ([#2907])
- Terminate blocked client connections ([#2907])
- Use `anyOf` for transactions schema ([#2894])
- Use compression on the p2p level ([#2886])

## [2.5.17] - 2019-08-06

Expand Down Expand Up @@ -621,6 +624,7 @@ Closed security vulnerabilities:
- Initial Release

[unreleased]: https://github.com/ARKEcosystem/core/compare/master...develop
[2.5.24]: https://github.com/ARKEcosystem/core/compare/2.5.19...2.5.24
[2.5.19]: https://github.com/ARKEcosystem/core/compare/2.5.17...2.5.19
[2.5.17]: https://github.com/ARKEcosystem/core/compare/2.5.14...2.5.17
[2.5.14]: https://github.com/ARKEcosystem/core/compare/2.5.7...2.5.14
Expand Down Expand Up @@ -958,3 +962,7 @@ Closed security vulnerabilities:
[#2863]: https://github.com/ARKEcosystem/core/pull/2863
[#2864]: https://github.com/ARKEcosystem/core/pull/2864
[#2875]: https://github.com/ARKEcosystem/core/pull/2875
[#2886]: https://github.com/ARKEcosystem/core/pull/2886
[#2887]: https://github.com/ARKEcosystem/core/pull/2887
[#2894]: https://github.com/ARKEcosystem/core/pull/2894
[#2907]: https://github.com/ARKEcosystem/core/pull/2907
68 changes: 67 additions & 1 deletion __tests__/integration/core-p2p/socket-server/peer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ Managers.configManager.setFromPreset("unitnet");

let server: SocketCluster;
let socket;
let connect;
let emit;
let send;

const headers = {
version: "2.1.0",
Expand All @@ -32,7 +34,7 @@ beforeAll(async () => {

const { service, processor } = createPeerService();

server = await startSocketServer(service, { server: { port: 4007 } });
server = await startSocketServer(service, { server: { port: 4007, workers: 1 } });
await delay(1000);

socket = socketCluster.create({
Expand All @@ -43,11 +45,15 @@ beforeAll(async () => {
//
});

connect = () => socket.connect();

emit = (event, data) =>
new Promise((resolve, reject) => {
socket.emit(event, data, (err, val) => (err ? reject(err) : resolve(val)));
});

send = data => socket.send(data);

jest.spyOn(processor, "validateAndAcceptPeer").mockImplementation(jest.fn());
});

Expand Down Expand Up @@ -127,6 +133,41 @@ describe("Peer socket endpoint", () => {
}),
).toResolve();
});

it("should disconnect the client if it sends an invalid message payload", async () => {
await delay(1000);

expect(socket.state).toBe("open");

send('{"event": "#handshake", "data": {}, "cid": 1}');
await delay(500);

send("Invalid payload");
await delay(1000);

expect(socket.state).toBe("closed");
});

it("should disconnect the client if it sends too many pongs too quickly", async () => {
connect();
await delay(1000);

expect(socket.state).toBe("open");

send('{"event": "#handshake", "data": {}, "cid": 1}');
await delay(500);

send("#2");
await delay(1000);

expect(socket.state).toBe("open");

send("#2");
send("#2");
await delay(1000);

expect(socket.state).toBe("closed");
});
});
});

Expand Down Expand Up @@ -229,5 +270,30 @@ describe("Peer socket endpoint", () => {
}),
).rejects.toHaveProperty("name", "BadConnectionError");
});

it("should close the connection and prevent reconnection if blocked", async () => {
await delay(1000);

await emit("p2p.peer.getPeers", {
headers,
});

expect(socket.state).toBe("open");

for (let i = 0; i < 100; i++) {
await expect(
emit("p2p.peer.getPeers", {
headers,
}),
).rejects.toContainAnyEntries([["name", "CoreRateLimitExceededError"], ["name", "BadConnectionError"]]);
}

expect(socket.state).not.toBe("open");

socket.connect();
await delay(1000);

expect(socket.state).not.toBe("open");
});
});
});
3 changes: 3 additions & 0 deletions __tests__/unit/core-p2p/socket-server/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import { Worker } from "../../../../packages/core-p2p/src/socket-server/worker";

const worker = new Worker();

// @ts-ignore
worker.scServer.wsServer = { on: () => undefined };

describe("Worker", () => {
describe("run", () => {
it("should init the worker", async () => {
Expand Down
3 changes: 2 additions & 1 deletion packages/core-api/src/handlers/transactions/transformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export const transformTransaction = (model, transform) => {
const sender: string = databaseService.walletManager.findByPublicKey(data.senderPublicKey).address;

const lastBlock: Interfaces.IBlock = blockchain.getLastBlock();
const timestamp: number = data.version === 1 ? data.timestamp : model.timestamp;

return {
id: data.id,
Expand All @@ -38,7 +39,7 @@ export const transformTransaction = (model, transform) => {
vendorField: data.vendorField,
asset: data.asset,
confirmations: model.block ? lastBlock.data.height - model.block.height + 1 : 0,
timestamp: data.version === 1 ? formatTimestamp(data.timestamp) : undefined,
timestamp: timestamp !== undefined ? formatTimestamp(timestamp) : undefined,
nonce: data.version > 1 ? data.nonce.toFixed() : undefined,
};
};
4 changes: 3 additions & 1 deletion packages/core-blockchain/src/blockchain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ export class Blockchain implements blockchain.IBlockchain {
this.queue.push({ blocks: currentBlocksChunk });
currentBlocksChunk = [];
currentTransactionsCount = 0;
milestoneHeights.shift();
if (nextMilestone) {
milestoneHeights.shift();
}
}
}
this.queue.push({ blocks: currentBlocksChunk });
Expand Down
1 change: 1 addition & 0 deletions packages/core-p2p/src/enums.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ export enum SocketErrors {
Validation = "CoreValidationError",
RateLimitExceeded = "CoreRateLimitExceededError",
Forbidden = "CoreForbiddenError",
InvalidMessagePayload = "CoreInvalidMessagePayloadError",
}
34 changes: 33 additions & 1 deletion packages/core-p2p/src/socket-server/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ export class Worker extends SCWorker {

await this.loadConfiguration();

// @ts-ignore
this.scServer.wsServer.on("connection", (ws, req) => this.handlePayload(ws, req));
this.scServer.on("connection", socket => this.handleConnection(socket));
this.scServer.addMiddleware(this.scServer.MIDDLEWARE_HANDSHAKE_WS, (req, next) =>
this.handleHandshake(req, next),
Expand Down Expand Up @@ -55,6 +57,36 @@ export class Worker extends SCWorker {
});
}

private handlePayload(ws, req) {
ws.on("message", message => {
try {
const InvalidMessagePayloadError: Error = this.createError(
SocketErrors.InvalidMessagePayload,
"The message contained an invalid payload",
);
if (message === "#2") {
const timeNow: number = new Date().getTime() / 1000;
if (ws._lastPingTime && timeNow - ws._lastPingTime < 1) {
throw InvalidMessagePayloadError;
}
ws._lastPingTime = timeNow;
} else {
const parsed = JSON.parse(message);
if (
typeof parsed.event !== "string" ||
typeof parsed.data !== "object" ||
(typeof parsed.cid !== "number" &&
(parsed.event === "#disconnect" && typeof parsed.cid !== "undefined"))
) {
throw InvalidMessagePayloadError;
}
}
} catch (error) {
ws.terminate();
}
});
}

private async handleConnection(socket): Promise<void> {
const { data } = await this.sendToMasterAsync("p2p.utils.getHandlers");

Expand Down Expand Up @@ -86,7 +118,7 @@ export class Worker extends SCWorker {
private async handleEmit(req, next): Promise<void> {
if (await this.rateLimiter.hasExceededRateLimit(req.socket.remoteAddress, req.event)) {
if (await this.rateLimiter.isBlocked(req.socket.remoteAddress)) {
req.socket.disconnect(4403, "Forbidden");
req.socket.terminate();
return;
}

Expand Down

0 comments on commit 7db1ba7

Please sign in to comment.