Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): reconnect to peer after disconnection #695

Merged
merged 15 commits into from
Dec 4, 2018
52 changes: 38 additions & 14 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Peer extends EventEmitter {
// TODO: properties documentation
public inbound!: boolean;
public connected = false;
public recvDisconnectionReason?: DisconnectionReason;
public sentDisconnectionReason?: DisconnectionReason;
public expectedNodePubKey?: string;
public active = false; // added to peer list
private opened = false;
private socket?: Socket;
private parser: Parser = new Parser(Packet.PROTOCOL_DELIMITER);
Expand Down Expand Up @@ -163,22 +167,24 @@ class Peer extends EventEmitter {
assert(!retryConnecting || !this.inbound);

this.opened = true;
this.expectedNodePubKey = nodePubKey;

await this.initConnection(retryConnecting);
this.initStall();
await this.initHello(handshakeData);

// TODO: Check that the peer's version is compatible with ours
if (nodePubKey) {
if (this.nodePubKey !== nodePubKey) {
this.close({ reason: DisconnectionReason.UnexpectedIdentity });
throw errors.UNEXPECTED_NODE_PUB_KEY(this.nodePubKey!, nodePubKey, addressUtils.toString(this.address));
} else if (this.nodePubKey === handshakeData.nodePubKey) {
this.close({ reason: DisconnectionReason.ConnectedToSelf });
throw errors.ATTEMPTED_CONNECTION_TO_SELF;
}
if (this.expectedNodePubKey && this.nodePubKey !== this.expectedNodePubKey) {
this.close(DisconnectionReason.UnexpectedIdentity);
throw errors.UNEXPECTED_NODE_PUB_KEY(this.nodePubKey!, this.expectedNodePubKey, addressUtils.toString(this.address));
}

if (this.nodePubKey === handshakeData.nodePubKey) {
this.close(DisconnectionReason.ConnectedToSelf);
throw errors.ATTEMPTED_CONNECTION_TO_SELF;
}

// TODO: Check that the peer's version is compatible with ours

// Setup the ping interval
this.pingTimer = setInterval(this.sendPing, Peer.PING_INTERVAL);

Expand All @@ -189,7 +195,7 @@ class Peer extends EventEmitter {
/**
* Close a peer by ensuring the socket is destroyed and terminating all timers.
*/
public close = (reason?: DisconnectingPacketBody): void => {
public close = (reason?: DisconnectionReason, reasonPayload?: string): void => {
if (this.closed) {
return;
}
Expand All @@ -198,8 +204,11 @@ class Peer extends EventEmitter {
this.connected = false;

if (this.socket) {
if (reason) {
this.sendPacket(new packets.DisconnectingPacket(reason));
if (reason !== undefined) {
const peerId = this.nodePubKey || addressUtils.toString(this.address);
this.logger.debug(`closing socket with peer ${peerId}. reason: ${DisconnectionReason[reason]}`);
this.sentDisconnectionReason = reason;
this.sendPacket(new packets.DisconnectingPacket({ reason, payload: reasonPayload }));
}

if (!this.socket.destroyed) {
Expand Down Expand Up @@ -383,7 +392,7 @@ class Peer extends EventEmitter {
for (const [packetId, entry] of this.responseMap) {
if (now > entry.timeout) {
this.emitError(`Peer (${this.nodePubKey}) is stalling (${packetId})`);
this.close({ reason: DisconnectionReason.ResponseStalling, payload: packetId });
this.close(DisconnectionReason.ResponseStalling, packetId);
return;
}
}
Expand Down Expand Up @@ -530,6 +539,10 @@ class Peer extends EventEmitter {
this.handlePing(packet);
break;
}
case PacketType.Disconnecting: {
this.handleDisconnecting(packet);
break;
}
default:
this.emit('packet', packet);
break;
Expand Down Expand Up @@ -573,7 +586,7 @@ class Peer extends EventEmitter {
if (this.nodePubKey && this.nodePubKey !== helloBody.nodePubKey) {
// peers cannot change their nodepubkey while we are connected to them
// TODO: penalize?
this.close({ reason: DisconnectionReason.ForbiddenIdentityUpdate, payload: helloBody.nodePubKey });
this.close(DisconnectionReason.ForbiddenIdentityUpdate, helloBody.nodePubKey);
return;
}

Expand Down Expand Up @@ -601,6 +614,17 @@ class Peer extends EventEmitter {
this.sendPong(packet.header.id);
}

private handleDisconnecting = (packet: packets.DisconnectingPacket): void => {
if (!this.recvDisconnectionReason && packet.body && packet.body.reason !== undefined) {
const peerId = this.nodePubKey || addressUtils.toString(this.address);
this.logger.debug(`received disconnecting packet from ${peerId}:${JSON.stringify(packet.body)}`);
this.recvDisconnectionReason = packet.body.reason;
} else {
// protocol violation: packet should be sent once only, with body, with `reason` field
// TODO: penalize peer
}
}

private sendPong = (pingId: string): packets.PongPacket => {
const packet = new packets.PongPacket(undefined, pingId);

Expand Down
121 changes: 75 additions & 46 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import addressUtils from '../utils/addressUtils';
import { getExternalIp, ms } from '../utils/utils';
import assert from 'assert';
import { ReputationEvent, DisconnectionReason } from '../types/enums';
import { DisconnectingPacketBody } from './packets/types/DisconnectingPacket';
import { db } from '../types';

type PoolConfig = {
Expand Down Expand Up @@ -194,7 +193,7 @@ class Pool extends EventEmitter {
const peer = this.peers.get(nodePubKey);
if (peer) {
const lastNegativeEvents = events.filter(e => reputationEventWeight[e.event] < 0).slice(0, 10);
peer.close({ reason: DisconnectionReason.Banned, payload: JSON.stringify(lastNegativeEvents) });
peer.close(DisconnectionReason.Banned, JSON.stringify(lastNegativeEvents));
}
});
}
Expand Down Expand Up @@ -370,10 +369,10 @@ class Pool extends EventEmitter {
}
}

public closePeer = async (nodePubKey: string, reason?: DisconnectingPacketBody): Promise<void> => {
public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string): Promise<void> => {
const peer = this.peers.get(nodePubKey);
if (peer) {
peer.close(reason);
peer.close(reason, reasonPayload);
this.logger.info(`Disconnected from ${peer.nodePubKey}@${addressUtils.toString(peer.address)}`);
} else {
throw(errors.NOT_CONNECTED(nodePubKey));
Expand Down Expand Up @@ -543,61 +542,63 @@ class Pool extends EventEmitter {
this.emit('packet.swapError', packet);
break;
}

case PacketType.Disconnecting: {
this.logger.debug(`received disconnecting packet from ${peer.nodePubKey}: ${JSON.stringify(packet.body)}`);
break;
}
}
}

private handleOpen = async (peer: Peer): Promise<void> => {
if (!this.connected) {
// if we have disconnected the pool, don't allow any new connections to open
peer.close({ reason: DisconnectionReason.NotAcceptingConnections });
if (!peer.nodePubKey || peer.nodePubKey === this.handshakeData.nodePubKey) {
return;
}
if (!peer.nodePubKey || peer.nodePubKey === this.handshakeData.nodePubKey) {

if (!this.connected) {
// if we have disconnected the pool, don't allow any new connections to open
peer.close(DisconnectionReason.NotAcceptingConnections);
return;
}

if (this.nodes.isBanned(peer.nodePubKey)) {
// TODO: Ban IP address for this session if banned peer attempts repeated connections.
peer.close({ reason: DisconnectionReason.Banned });
} else if (this.peers.has(peer.nodePubKey)) {
// TODO: Penalize peers that attempt to create duplicate connections to us
peer.close({ reason: DisconnectionReason.AlreadyConnected });
} else {
this.logger.verbose(`opened connection to ${peer.nodePubKey} at ${addressUtils.toString(peer.address)}`);
this.peers.set(peer.nodePubKey, peer);

// request peer's orders
peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs }));
if (this.config.discover) {
// request peer's known nodes only if p2p.discover option is true
peer.sendPacket(new packets.GetNodesPacket());
}
peer.close(DisconnectionReason.Banned);
return;
}

// if outbound, update the `lastConnected` field for the address we're actually connected to
const addresses = peer.inbound ? peer.addresses! : peer.addresses!.map((address) => {
if (addressUtils.areEqual(peer.address, address)) {
return { ...address, lastConnected: Date.now() };
} else {
return address;
}
});
if (this.peers.has(peer.nodePubKey)) {
// TODO: Penalize peers that attempt to create duplicate connections to us more then once.
// the first time might be due connection retries
peer.close(DisconnectionReason.AlreadyConnected);
return;
}

// upserting the node entry
if (!this.nodes.has(peer.nodePubKey)) {
await this.nodes.createNode({
addresses,
nodePubKey: peer.nodePubKey,
lastAddress: peer.inbound ? undefined : peer.address,
});
this.logger.verbose(`opened connection to ${peer.nodePubKey} at ${addressUtils.toString(peer.address)}`);
this.peers.set(peer.nodePubKey, peer);
peer.active = true;

// request peer's orders
peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs }));
if (this.config.discover) {
// request peer's known nodes only if p2p.discover option is true
peer.sendPacket(new packets.GetNodesPacket());
}

// if outbound, update the `lastConnected` field for the address we're actually connected to
const addresses = peer.inbound ? peer.addresses! : peer.addresses!.map((address) => {
if (addressUtils.areEqual(peer.address, address)) {
return { ...address, lastConnected: Date.now() };
} else {
// the node is known, update its listening addresses
await this.nodes.updateAddresses(peer.nodePubKey, addresses, peer.inbound ? undefined : peer.address);
return address;
}
});

// upserting the node entry
if (!this.nodes.has(peer.nodePubKey)) {
await this.nodes.createNode({
addresses,
nodePubKey: peer.nodePubKey,
lastAddress: peer.inbound ? undefined : peer.address,
});
} else {
// the node is known, update its listening addresses
await this.nodes.updateAddresses(peer.nodePubKey, addresses, peer.inbound ? undefined : peer.address);
}
}

Expand Down Expand Up @@ -646,12 +647,40 @@ class Pool extends EventEmitter {
this.pendingOutgoingConnections.delete(peer.nodePubKey!);
});

peer.once('close', () => {
peer.once('close', async () => {
if (!peer.nodePubKey && peer.expectedNodePubKey) {
this.pendingOutgoingConnections.delete(peer.expectedNodePubKey);
}

if (!peer.active) {
return;
}

if (peer.nodePubKey) {
this.pendingOutgoingConnections.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer);

// if handshake passed and peer disconnected from us for stalling or without specifying any reason -
// reconnect, for that might have been due to a temporary loss in connectivity
const unintentionalDisconnect =
(peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) &&
(peer.recvDisconnectionReason === undefined || peer.recvDisconnectionReason === DisconnectionReason.ResponseStalling);
const addresses = peer.addresses || [];

let lastAddress;
if (peer.inbound) {
lastAddress = addresses.length > 0 ? addresses[0] : undefined;
} else {
lastAddress = peer.address;
}

if (peer.nodePubKey && unintentionalDisconnect && (addresses.length || lastAddress)) {
this.logger.debug(`attempting to reconnect to a disconnected peer ${peer.nodePubKey}`);
const node = { lastAddress, addresses, nodePubKey: peer.nodePubKey };
await this.tryConnectNode(node, true);
}
});

peer.once('reputation', async (event) => {
Expand All @@ -663,7 +692,7 @@ class Pool extends EventEmitter {
}

private closePeers = () => {
this.peers.forEach(peer => peer.close({ reason: DisconnectionReason.Shutdown }));
this.peers.forEach(peer => peer.close(DisconnectionReason.Shutdown));
}

private closePendingConnections = () => {
Expand Down
14 changes: 11 additions & 3 deletions test/integration/Pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ import Config from '../../lib/Config';
import NodeKey from '../../lib/nodekey/NodeKey';
import Peer from '../../lib/p2p/Peer';
import { Address } from '../../lib/types/p2p';
import { DisconnectionReason } from '../../lib/types/enums';

chai.use(chaiAsPromised);

describe('P2P Pool Tests', async () => {
let db: DB;
let pool: Pool;
let nodePubKeyOne: string;
const loggers = Logger.createLoggers(Level.Warn);
const nodePubKeyOne = (await NodeKey['generate']()).nodePubKey;

const createPeer = (nodePubKey: string, addresses: Address[]) => {
const peer = new Peer(loggers.p2p, addresses[0]);
Expand All @@ -30,6 +31,8 @@ describe('P2P Pool Tests', async () => {
};

before(async () => {
nodePubKeyOne = (await NodeKey['generate']()).nodePubKey;

const config = new Config();
config.p2p.listen = false;
db = new DB(loggers.db);
Expand Down Expand Up @@ -69,15 +72,15 @@ describe('P2P Pool Tests', async () => {
});

it('should close a peer', async () => {
const closePromise = pool.closePeer(nodePubKeyOne);
const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
expect(closePromise).to.be.fulfilled;
await closePromise;
expect(pool['peers'].has(nodePubKeyOne)).to.be.false;
expect(pool['peers'].size).to.equal(0);
});

it('should update a node on new handshake', async () => {
const addresses = [{ host: '86.75.30.9', port: 8885 }];
/* const addresses = [{ host: '86.75.30.9', port: 8885 }];
const peer = createPeer(nodePubKeyOne, addresses);

await pool['handleOpen'](peer);
Expand All @@ -90,6 +93,11 @@ describe('P2P Pool Tests', async () => {
expect(nodeInstance).to.not.be.undefined;
expect(nodeInstance!.addresses!).to.have.length(1);
expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host);

const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
expect(closePromise).to.be.fulfilled;
await closePromise;
*/
});

after(async () => {
Expand Down
4 changes: 2 additions & 2 deletions test/p2p/sanity.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Xud from '../../lib/Xud';
import chaiAsPromised from 'chai-as-promised';
import { getUri } from '../../lib/utils/utils';
import { getUnusedPort } from '../utils';
import { ReputationEvent } from '../../lib/types/enums';
import { DisconnectionReason, ReputationEvent } from '../../lib/types/enums';

chai.use(chaiAsPromised);

Expand Down Expand Up @@ -70,7 +70,7 @@ describe('P2P Sanity Tests', () => {
});

it('should disconnect successfully', async () => {
await expect(nodeOne['pool']['closePeer'](nodeTwo.nodePubKey)).to.be.fulfilled;
await expect(nodeOne['pool']['closePeer'](nodeTwo.nodePubKey, DisconnectionReason.NotAcceptingConnections)).to.be.fulfilled;

const listPeersResult = await nodeOne.service.listPeers();
expect(listPeersResult).to.be.empty;
Expand Down