Skip to content

Commit

Permalink
feat(core-p2p): split into 3 ports for blocks / transactions / others (
Browse files Browse the repository at this point in the history
  • Loading branch information
air1one authored Jul 17, 2020
1 parent 47d3cf2 commit 9796d13
Show file tree
Hide file tree
Showing 33 changed files with 881 additions and 592 deletions.
42 changes: 28 additions & 14 deletions __tests__/unit/core-forger/client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ afterEach(() => {
describe("Client", () => {
let client: Client;

const host = { hostname: "127.0.0.1", port: 4000, socket: undefined };
const host = { hostname: "127.0.0.1", port: 4000, blocksSocket: undefined, internalSocket: undefined };
const hosts = [host];

beforeEach(() => {
Expand All @@ -42,14 +42,19 @@ describe("Client", () => {
it("should register hosts", async () => {
client.register(hosts);
expect(Nes.Client).toHaveBeenCalledWith(`ws://${host.hostname}:${host.port}`);
expect(client.hosts).toEqual([{ ...host, socket: expect.anything() }]);
expect(client.hosts).toEqual([{ ...host, blocksSocket: expect.anything(), internalSocket: expect.anything() }]);
});

it("on error the socket should call logger", () => {
client.register(hosts);

const fakeError = { message: "Fake Error" };
client.hosts[0].socket.onError(fakeError);
client.hosts[0].blocksSocket.onError(fakeError);

expect(logger.error).toHaveBeenCalledWith("Fake Error");

logger.error.mockReset();
client.hosts[0].internalSocket.onError(fakeError);

expect(logger.error).toHaveBeenCalledWith("Fake Error");
});
Expand All @@ -59,8 +64,10 @@ describe("Client", () => {
it("should call disconnect on all sockets", () => {
client.register([host, { hostname: "127.0.0.5", port: 4000 }]);
client.dispose();
expect(client.hosts[0].socket.disconnect).toHaveBeenCalled();
expect(client.hosts[1].socket.disconnect).toHaveBeenCalled();
expect(client.hosts[0].blocksSocket.disconnect).toHaveBeenCalled();
expect(client.hosts[0].internalSocket.disconnect).toHaveBeenCalled();
expect(client.hosts[1].blocksSocket.disconnect).toHaveBeenCalled();
expect(client.hosts[1].internalSocket.disconnect).toHaveBeenCalled();
expect(nesClient.disconnect).toBeCalled();
});
});
Expand All @@ -80,11 +87,11 @@ describe("Client", () => {
it("should not broadcast block when there is an issue with socket", async () => {
client.register(hosts);

host.socket = {};
host.blocksSocket = {};
await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve();

expect(logger.error).toHaveBeenCalledWith(
`Broadcast block failed: Request to ${host.hostname}:${host.port}<p2p.peer.postBlock> failed, because of 'this.host.socket.request is not a function'.`,
`Broadcast block failed: Request to ${host.hostname}:${host.port}<p2p.blocks.postBlock> failed, because of 'socket.request is not a function'.`,
);
});

Expand All @@ -93,7 +100,7 @@ describe("Client", () => {

await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve();
expect(nesClient.request).toHaveBeenCalledWith({
path: "p2p.peer.postBlock",
path: "p2p.blocks.postBlock",
headers: {},
method: "POST",
payload: { block: expect.anything() },
Expand All @@ -108,7 +115,7 @@ describe("Client", () => {

await expect(client.broadcastBlock(forgedBlockWithTransactions)).toResolve();
expect(logger.error).toHaveBeenCalledWith(
`Broadcast block failed: Request to ${host.hostname}:${host.port}<p2p.peer.postBlock> failed, because of 'oops'.`,
`Broadcast block failed: Request to ${host.hostname}:${host.port}<p2p.blocks.postBlock> failed, because of 'oops'.`,
);
});
});
Expand All @@ -121,15 +128,19 @@ describe("Client", () => {
});

it("should select the first open socket", async () => {
hosts[4].socket._isReady = () => true;
hosts[4].blocksSocket._isReady = () => true;
hosts[4].internalSocket._isReady = () => true;

client.register(hosts);
client.selectHost();
expect((client as any).host).toEqual(hosts[4]);
});

it("should log debug message when no sockets are open", async () => {
hosts.forEach((host) => (host.socket._isReady = () => false));
hosts.forEach((host) => {
host.internalSocket._isReady = () => false;
host.blocksSocket._isReady = () => false;
});

client.register(hosts);
await expect(client.selectHost()).rejects.toThrow(
Expand Down Expand Up @@ -159,7 +170,8 @@ describe("Client", () => {
describe("getRound", () => {
it("should broadcast internal getRound transaction", async () => {
client.register([host]);
host.socket._isReady = () => true;
host.blocksSocket._isReady = () => true;
host.internalSocket._isReady = () => true;

await client.getRound();

Expand All @@ -175,7 +187,8 @@ describe("Client", () => {
describe("syncWithNetwork", () => {
it("should broadcast internal getRound transaction", async () => {
client.register([host]);
host.socket._isReady = () => true;
host.blocksSocket._isReady = () => true;
host.internalSocket._isReady = () => true;
await client.syncWithNetwork();

expect(nesClient.request).toHaveBeenCalledWith({
Expand All @@ -190,7 +203,8 @@ describe("Client", () => {
it("should log error message if syncing fails", async () => {
const errorMessage = "Fake Error";
nesClient.request.mockRejectedValueOnce(new Error(errorMessage));
host.socket._isReady = () => true;
host.blocksSocket._isReady = () => true;
host.internalSocket._isReady = () => true;
client.register([host]);
await expect(client.syncWithNetwork()).toResolve();
expect(logger.error).toHaveBeenCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ describe("EntityController", () => {

const request: Hapi.Request = {
params: {
id: senderWallet.publicKey,
id: registrationTxId,
},
};

Expand All @@ -112,7 +112,7 @@ describe("EntityController", () => {
it("should return error if entity is not found", async () => {
const request: Hapi.Request = {
params: {
id: senderWallet.publicKey,
id: registrationTxId,
},
};

Expand All @@ -134,7 +134,7 @@ describe("EntityController", () => {

const request: Hapi.Request = {
params: {
id: senderWallet.address,
id: registrationTxId,
},
};

Expand Down
6 changes: 3 additions & 3 deletions __tests__/unit/core-p2p/listeners.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe("DisconnectInvalidPeers", () => {
it("should emit 'internal.p2p.disconnectPeer' for invalid version peers", async () => {
await disconnectInvalidPeers.handle();

expect(emitter.dispatch).toBeCalledTimes(2); // 2 invalid peers version
expect(emitter.dispatch).toBeCalledTimes(2 * 3); // 2 invalid peers version * 3 sockets (ports) per peer
});
});
});
Expand Down Expand Up @@ -76,12 +76,12 @@ describe("DisconnectPeer", () => {
describe("handle", () => {
it("should disconnect the peer provided", async () => {
const peer = new Peer("187.176.1.1", 4000);
await disconnectPeer.handle({ data: { peer: peer } });
await disconnectPeer.handle({ data: { peer: peer, port: 4000 } });

expect(storage.forgetPeer).toBeCalledTimes(1);
expect(storage.forgetPeer).toBeCalledWith(peer);
expect(connector.disconnect).toBeCalledTimes(1);
expect(connector.disconnect).toBeCalledWith(peer);
expect(connector.disconnect).toBeCalledWith(peer, 4000);
});
});
});
7 changes: 5 additions & 2 deletions __tests__/unit/core-p2p/network-monitor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { Blocks } from "@arkecosystem/crypto";
import delay from "delay";
import { cloneDeep } from "lodash";
import path from "path";
import { PortsOffset } from "@arkecosystem/core-p2p/src/enums";

describe("NetworkMonitor", () => {
let networkMonitor: NetworkMonitor;
Expand Down Expand Up @@ -394,8 +395,10 @@ describe("NetworkMonitor", () => {
await networkMonitor.cleansePeers({ peerCount: 5 });

expect(communicator.ping).toBeCalledTimes(peers.length);
expect(emitter.dispatch).toBeCalledTimes(2);
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers) });
expect(emitter.dispatch).toBeCalledTimes(4); // 3 for disconnecting each peer port + 1 for peer removed event
for (const port of [4000 + PortsOffset.Peer, 4000 + PortsOffset.Blocks, 4000 + PortsOffset.Transactions]) {
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer: expect.toBeOneOf(peers), port });
}
expect(emitter.dispatch).toBeCalledWith(Enums.PeerEvent.Removed, expect.toBeOneOf(peers));
});

Expand Down
Loading

0 comments on commit 9796d13

Please sign in to comment.