Skip to content

Commit

Permalink
feat(core-p2p): forget unresponsive peer (#3887)
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastijankuzner authored Jul 13, 2020
1 parent 381e9c7 commit 5222b5c
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 57 deletions.
3 changes: 1 addition & 2 deletions __tests__/unit/core-p2p/listeners.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Container } from "@arkecosystem/core-kernel";

import { DisconnectInvalidPeers, DisconnectPeer } from "@arkecosystem/core-p2p/src/listeners";
import { Peer } from "@arkecosystem/core-p2p/src/peer";

Expand Down Expand Up @@ -77,7 +76,7 @@ describe("DisconnectPeer", () => {
describe("handle", () => {
it("should disconnect the peer provided", async () => {
const peer = new Peer("187.176.1.1", 4000);
await disconnectPeer.handle({ data: peer });
await disconnectPeer.handle({ data: { peer: peer } });

expect(storage.forgetPeer).toBeCalledTimes(1);
expect(storage.forgetPeer).toBeCalledWith(peer);
Expand Down
127 changes: 77 additions & 50 deletions __tests__/unit/core-p2p/peer-communicator.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { Container, Utils as KernelUtils } from "@arkecosystem/core-kernel";
import "jest-extended";

import { PeerCommunicator } from "@arkecosystem/core-p2p/src/peer-communicator";
import { Peer } from "@arkecosystem/core-p2p/src/peer";
import { Blocks, Managers, Utils, Transactions, Identities } from "@arkecosystem/crypto";
import { Container, Utils as KernelUtils } from "@arkecosystem/core-kernel";
import { constants } from "@arkecosystem/core-p2p/src/constants";
import {
PeerPingTimeoutError,
PeerStatusResponseError,
PeerVerificationFailedError,
PeerPingTimeoutError,
} from "@arkecosystem/core-p2p/src/errors";
import delay from "delay";
import { Peer } from "@arkecosystem/core-p2p/src/peer";
import { PeerCommunicator } from "@arkecosystem/core-p2p/src/peer-communicator";
import { PeerVerificationResult } from "@arkecosystem/core-p2p/src/peer-verifier";
import { replySchemas } from "@arkecosystem/core-p2p/src/schemas";
import { constants } from "@arkecosystem/core-p2p/src/constants";
import { Blocks, Identities, Managers, Transactions, Utils } from "@arkecosystem/crypto";
import delay from "delay";

Managers.configManager.getMilestone().aip11 = true;

Expand Down Expand Up @@ -77,7 +78,7 @@ describe("PeerCommunicator", () => {
.nonce("1")
.fee("100")
.sign("sender's secret")
.build()
.build(),
],
} as Blocks.Block;
const payload = { block };
Expand Down Expand Up @@ -123,7 +124,7 @@ describe("PeerCommunicator", () => {
currentSlot: 1,
header: {},
},
headers: { height: 1 }
headers: { height: 1 },
};

it("should not call connector emit when peer.recentlyPinged() && !force", async () => {
Expand Down Expand Up @@ -165,26 +166,28 @@ describe("PeerCommunicator", () => {
});

describe("when !process.env.CORE_SKIP_PEER_STATE_VERIFICATION", () => {
it.each([[true], [false]])
("should throw PeerVerificationFailedError when peer config is not validated", async (withWrongNethash) => {
const event = "p2p.peer.getStatus";
const peer = new Peer("187.168.65.65", 4000);
const pingResponse = cloneObject(baseGetStatusResponse);

if (withWrongNethash) {
// tweaking the base nethash to make it invalid
pingResponse.config.network.nethash = pingResponse.config.network.nethash.replace("a", "b");
} else {
// tweaking the base version to make it invalid
pingResponse.config.version = "3.0.0.0";
}
connector.emit = jest.fn().mockReturnValueOnce({ payload: pingResponse });

await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerVerificationFailedError);

expect(connector.emit).toBeCalledTimes(1);
expect(connector.emit).toBeCalledWith(peer, event, {});
});
it.each([[true], [false]])(
"should throw PeerVerificationFailedError when peer config is not validated",
async (withWrongNethash) => {
const event = "p2p.peer.getStatus";
const peer = new Peer("187.168.65.65", 4000);
const pingResponse = cloneObject(baseGetStatusResponse);

if (withWrongNethash) {
// tweaking the base nethash to make it invalid
pingResponse.config.network.nethash = pingResponse.config.network.nethash.replace("a", "b");
} else {
// tweaking the base version to make it invalid
pingResponse.config.version = "3.0.0.0";
}
connector.emit = jest.fn().mockReturnValueOnce({ payload: pingResponse });

await expect(peerCommunicator.ping(peer, 1000)).rejects.toThrow(PeerVerificationFailedError);

expect(connector.emit).toBeCalledTimes(1);
expect(connector.emit).toBeCalledWith(peer, event, {});
},
);

it("should throw PeerPingTimeoutError when deadline is passed", async () => {
const event = "p2p.peer.getStatus";
Expand Down Expand Up @@ -315,12 +318,14 @@ describe("PeerCommunicator", () => {

await peerCommunicator.pingPorts(peer);

expect(peer.ports["core-api"]).toBeUndefined()
expect(peer.ports["custom-plugin"]).toBeUndefined;
expect(peer.ports["core-api"]).toBeUndefined();
expect(peer.ports["custom-plugin"]).toBeDefined();
expect(logger.warning).toBeCalledTimes(1);
expect(logger.warning).toBeCalledWith(`Disconnecting from ${ip}:${apiPort}: nethash mismatch: our=${
Managers.configManager.get("network.nethash")
}, his=${wrongNethash}.`);
expect(logger.warning).toBeCalledWith(
`Disconnecting from ${ip}:${apiPort}: nethash mismatch: our=${Managers.configManager.get(
"network.nethash",
)}, his=${wrongNethash}.`,
);
expect(emitter.dispatch).toBeCalledTimes(1);
expect(emitter.dispatch).toBeCalledWith("internal.p2p.disconnectPeer", { peer });
});
Expand Down Expand Up @@ -374,13 +379,14 @@ describe("PeerCommunicator", () => {
expect(connector.emit).toBeCalledWith(peer, event, payload);
expect(getPeersResult1).toEqual(mockConnectorResponse.payload);
expect(getPeersResult2).toEqual(mockConnectorResponse.payload);
expect(logger.debug).toBeCalledWith(`Throttling outgoing requests to ${peer.ip}/${event} to avoid triggering their rate limit`)
expect(logger.debug).toBeCalledWith(
`Throttling outgoing requests to ${peer.ip}/${event} to avoid triggering their rate limit`,
);

connector.emit = jest.fn();
});

it.each([[true], [false]])
("should return undefined when emit fails", async (throwErrorInstance) => {
it.each([[true], [false]])("should return undefined when emit fails", async (throwErrorInstance) => {
const event = "p2p.peer.getPeers";
const payload = {};
const peer = new Peer("187.168.65.65", 4000);
Expand Down Expand Up @@ -455,22 +461,22 @@ describe("PeerCommunicator", () => {
payloadHash: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855",
generatorPublicKey: "026c598170201caf0357f202ff14f365a3b09322071e347873869f58d776bfc565",
blockSignature:
"3045022100e7385c6ea42bd950f7f6ab8c8619cf2f66a41d8f8f185b0bc99af032cb25f30d02200b6210176a6cedfdcbe483167fd91c21d740e0e4011d24d679c601fdd46b0de9",
"3045022100e7385c6ea42bd950f7f6ab8c8619cf2f66a41d8f8f185b0bc99af032cb25f30d02200b6210176a6cedfdcbe483167fd91c21d740e0e4011d24d679c601fdd46b0de9",
transactions: [
Transactions.Serializer.serialize(Transactions.BuilderFactory.transfer()
.version(2)
.amount("100")
.recipientId(Identities.Address.fromPassphrase("recipient's secret"))
.nonce("1")
.fee("100")
.sign("sender's secret")
.build()
).toString("hex")
Transactions.Serializer.serialize(
Transactions.BuilderFactory.transfer()
.version(2)
.amount("100")
.recipientId(Identities.Address.fromPassphrase("recipient's secret"))
.nonce("1")
.fee("100")
.sign("sender's secret")
.build(),
).toString("hex"),
],
};

it.each([[true], [false]])
("should use connector to emit p2p.peer.getBlocks", async (withTransactions) => {
it.each([[true], [false]])("should use connector to emit p2p.peer.getBlocks", async (withTransactions) => {
const event = "p2p.peer.getBlocks";
const options = {
fromBlockHeight: 1,
Expand Down Expand Up @@ -519,7 +525,28 @@ describe("PeerCommunicator", () => {
expect(connector.emit).toBeCalledTimes(1);
expect(connector.emit).toBeCalledWith(peer, event, expectedEmitPayload);
expect(getPeerBlocksResult).toEqual([]);
expect(logger.debug).toBeCalledWith(`Peer ${peer.ip} did not return any blocks via height ${options.fromBlockHeight}.`);
expect(logger.debug).toBeCalledWith(
`Peer ${peer.ip} did not return any blocks via height ${options.fromBlockHeight}.`,
);
});
});

describe("handleSocketError", () => {
it("should dispatch 'Disconnect' event after 3 sequential error", async () => {
configuration.getRequired = jest.fn().mockReturnValue(3);

const peer = new Peer("187.168.65.65", 4000);

// @ts-ignore
peerCommunicator.handleSocketError(peer, "dummy_event", new Error());
// @ts-ignore
peerCommunicator.handleSocketError(peer, "dummy_event", new Error());

expect(emitter.dispatch).toHaveBeenCalledTimes(0);

// @ts-ignore
peerCommunicator.handleSocketError(peer, "dummy_event", new Error());
expect(emitter.dispatch).toHaveBeenCalledTimes(1);
});
});
});
5 changes: 4 additions & 1 deletion __tests__/unit/core-p2p/service-provider.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import "jest-extended";

import { Application, Container, Providers, Services } from "@arkecosystem/core-kernel";
import { Peer } from "@arkecosystem/core-p2p/src/peer";
import { ServiceProvider } from "@arkecosystem/core-p2p/src/service-provider";
Expand All @@ -15,6 +17,7 @@ describe("ServiceProvider", () => {
[Container.Identifiers.PeerNetworkMonitor]: { initialize: jest.fn() },
[Container.Identifiers.PeerProcessor]: { initialize: jest.fn() },
[Container.Identifiers.PeerCommunicator]: { initialize: jest.fn() },
[Container.Identifiers.PeerEventListener]: { initialize: jest.fn() },
[serverSymbol]: mockServer,
[Container.Identifiers.TriggerService]: triggerService,
};
Expand Down Expand Up @@ -74,7 +77,7 @@ describe("ServiceProvider", () => {
Identifiers.PeerProcessor,
Identifiers.PeerNetworkMonitor,
Identifiers.PeerTransactionBroadcaster,
"p2p.event-listener",
Identifiers.PeerEventListener,
]) {
expect(spyBind).toBeCalledWith(identifier);
}
Expand Down
1 change: 1 addition & 0 deletions packages/core-kernel/src/contracts/p2p/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ export interface Peer {
state: PeerState;
plugins: PeerPlugins;
lastPinged: Dayjs | undefined;
sequentialErrorCounter: number;
verificationResult: PeerVerificationResult | undefined;

isVerified(): boolean;
Expand Down
1 change: 1 addition & 0 deletions packages/core-kernel/src/ioc/identifiers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ export const Identifiers = {
PeerProcessor: Symbol.for("Peer<Processor>"),
PeerStorage: Symbol.for("Peer<Storage>"),
PeerTransactionBroadcaster: Symbol.for("Peer<TransactionBroadcaster>"),
PeerEventListener: Symbol.for("Peer<EventListener>"),
// Transaction Pool
TransactionPoolService: Symbol.for("TransactionPool<Service>"),
TransactionPoolCleaner: Symbol.for("TransactionPool<Cleaner>"),
Expand Down
4 changes: 4 additions & 0 deletions packages/core-p2p/src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ export const defaults = {
* The maximum authorized number of peers sharing same ip /24 subnet
*/
maxSameSubnetPeers: process.env.CORE_P2P_MAX_PEERS_SAME_SUBNET || 5,
/**
* The maximum peer consecutive errors before peer is forget from peer store.
*/
maxPeerSequentialErrors: process.env.CORE_P2P_MAX_PEER_SEQUENTIAL_ERRORS || 3,
/**
* The list of IPs we allow to be added to the peer list.
*/
Expand Down
6 changes: 3 additions & 3 deletions packages/core-p2p/src/listeners.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ export class DisconnectPeer implements Contracts.Kernel.EventListener {
private readonly storage!: Contracts.P2P.PeerStorage;

/**
* @param {*} {data}
* @param {*} {peer}
* @returns {Promise<void>}
* @memberof DisconnectPeer
*/
public async handle({ data }): Promise<void> {
this.connector.disconnect(data);
this.connector.disconnect(data.peer);

this.storage.forgetPeer(data);
this.storage.forgetPeer(data.peer);
}
}
6 changes: 6 additions & 0 deletions packages/core-p2p/src/peer-communicator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator {

response = await this.connector.emit(peer, event, payload);

peer.sequentialErrorCounter = 0;
peer.latency = new Date().getTime() - timeBeforeSocketCall;
this.parseHeaders(peer, response.payload);

Expand Down Expand Up @@ -301,6 +302,7 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator {
}

this.connector.setError(peer, error.name);
peer.sequentialErrorCounter++;

switch (error.name) {
case SocketErrors.Validation:
Expand All @@ -311,6 +313,10 @@ export class PeerCommunicator implements Contracts.P2P.PeerCommunicator {
if (process.env.CORE_P2P_PEER_VERIFIER_DEBUG_EXTRA) {
this.logger.debug(`Response error (peer ${peer.ip}/${event}) : ${error.message}`);
}

if (peer.sequentialErrorCounter >= this.configuration.getRequired<number>("maxPeerSequentialErrors")) {
this.events.dispatch(Enums.PeerEvent.Disconnect, { peer });
}
break;
default:
/* istanbul ignore else */
Expand Down
6 changes: 6 additions & 0 deletions packages/core-p2p/src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ export class Peer implements Contracts.P2P.Peer {
*/
public lastPinged: Dayjs | undefined;

/**
* @type {(number)}
* @memberof Peer
*/
public sequentialErrorCounter: number = 0;

/**
* @type {(PeerVerificationResult | undefined)}
* @memberof Peer
Expand Down
4 changes: 3 additions & 1 deletion packages/core-p2p/src/service-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export class ServiceProvider extends Providers.ServiceProvider {
* @memberof ServiceProvider
*/
public async boot(): Promise<void> {
this.app.get<EventListener>(Container.Identifiers.PeerEventListener).initialize();

return this.app.get<Server>(this.serverSymbol).boot();
}

Expand Down Expand Up @@ -79,7 +81,7 @@ export class ServiceProvider extends Providers.ServiceProvider {

this.app.get<PeerProcessor>(Container.Identifiers.PeerProcessor).initialize();

this.app.bind("p2p.event-listener").to(EventListener).inSingletonScope();
this.app.bind(Container.Identifiers.PeerEventListener).to(EventListener).inSingletonScope();

this.app.bind(Container.Identifiers.PeerTransactionBroadcaster).to(TransactionBroadcaster);
}
Expand Down

0 comments on commit 5222b5c

Please sign in to comment.