Skip to content

Commit

Permalink
Merge pull request #759 from ExchangeUnion/fix/peer-stalling
Browse files Browse the repository at this point in the history
Fix peer stalling
  • Loading branch information
sangaman authored Dec 24, 2018
2 parents 54d0945 + 8d4d34f commit c774838
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 87 deletions.
16 changes: 8 additions & 8 deletions lib/orderbook/OrderBook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ class OrderBook extends EventEmitter {
this.bindSwaps();
}

private static createOutgoingOrder = (order: orders.OwnOrder): orders.OutgoingOrder => {
const { createdAt, localId, initialQuantity, hold, ...outgoingOrder } = order;
return outgoingOrder ;
}

private bindPool = () => {
if (this.pool) {
this.pool.on('packet.order', this.addPeerOrder);
Expand Down Expand Up @@ -552,8 +557,8 @@ class OrderBook extends EventEmitter {
// send only requested pairIds
if (pairIds.includes(tp.pairId)) {
const orders = tp.getOwnOrders();
orders.buy.forEach(order => outgoingOrders.push(this.createOutgoingOrder(order)));
orders.sell.forEach(order => outgoingOrders.push(this.createOutgoingOrder(order)));
orders.buy.forEach(order => outgoingOrders.push(OrderBook.createOutgoingOrder(order)));
orders.sell.forEach(order => outgoingOrders.push(OrderBook.createOutgoingOrder(order)));
}
});
peer.sendOrders(outgoingOrders, reqId);
Expand All @@ -565,7 +570,7 @@ class OrderBook extends EventEmitter {
private broadcastOrder = (order: orders.OwnOrder) => {
if (this.pool) {
if (this.swaps && this.swaps.isPairSupported(order.pairId)) {
const outgoingOrder = this.createOutgoingOrder(order);
const outgoingOrder = OrderBook.createOutgoingOrder(order);
this.pool.broadcastOrder(outgoingOrder);
}
}
Expand All @@ -583,11 +588,6 @@ class OrderBook extends EventEmitter {
return { ...order, id, initialQuantity: order.quantity, createdAt: ms() };
}

private createOutgoingOrder = (order: orders.OwnOrder): orders.OutgoingOrder => {
const { createdAt, localId, ...outgoingOrder } = order;
return outgoingOrder;
}

private handleOrderInvalidation = (oi: OrderPortion, peerPubKey: string) => {
try {
const removeResult = this.removePeerOrder(oi.id, oi.pairId, peerPubKey, oi.quantity);
Expand Down
39 changes: 17 additions & 22 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import { Packet, PacketDirection, PacketType } from './packets';
import { HandshakeState, Address, NodeConnectionInfo } from '../types/p2p';
import errors from './errors';
import addressUtils from '../utils/addressUtils';
import { DisconnectingPacketBody } from './packets/types/DisconnectingPacket';

/** Key info about a peer for display purposes */
type PeerInfo = {
Expand Down Expand Up @@ -45,7 +44,6 @@ interface Peer {
class Peer extends EventEmitter {
// TODO: properties documentation
public inbound!: boolean;
public connected = false;
public recvDisconnectionReason?: DisconnectionReason;
public sentDisconnectionReason?: DisconnectionReason;
public expectedNodePubKey?: string;
Expand Down Expand Up @@ -90,6 +88,10 @@ class Peer extends EventEmitter {
return this.handshakeState ? this.handshakeState.addresses : undefined;
}

public get connected(): boolean {
return this.socket !== undefined && !this.socket.destroyed;
}

public get info(): PeerInfo {
return {
address: addressUtils.toString(this.address),
Expand Down Expand Up @@ -119,7 +121,6 @@ class Peer extends EventEmitter {
const peer = new Peer(logger, addressUtils.fromSocket(socket));

peer.inbound = true;
peer.connected = true;
peer.socket = socket;

peer.bindSocket();
Expand Down Expand Up @@ -201,7 +202,6 @@ class Peer extends EventEmitter {
}

this.closed = true;
this.connected = false;

if (this.socket) {
if (reason !== undefined) {
Expand Down Expand Up @@ -247,11 +247,8 @@ class Peer extends EventEmitter {

public sendPacket = (packet: Packet): void => {
this.sendRaw(packet.toRaw());
if (this.nodePubKey !== undefined) {
this.logger.trace(`Sent packet to ${this.nodePubKey}: ${packet.body ? JSON.stringify(packet.body) : ''}`);
} else {
this.logger.trace(`Sent packet to ${addressUtils.toString(this.address)}: ${packet.body ? JSON.stringify(packet.body) : ''}`);
}
const recipient = this.nodePubKey !== undefined ? this.nodePubKey : addressUtils.toString(this.address);
this.logger.trace(`Sent ${PacketType[packet.type]} packet to ${recipient}: ${JSON.stringify(packet)}`);
this.packetCount += 1;

if (packet.direction === PacketDirection.Request) {
Expand Down Expand Up @@ -306,12 +303,14 @@ class Peer extends EventEmitter {
}
this.socket!.removeListener('error', onError);
this.socket!.removeListener('connect', onConnect);
this.retryConnectionTimer = undefined;
if (this.retryConnectionTimer) {
clearTimeout(this.retryConnectionTimer);
this.retryConnectionTimer = undefined;
}
};

const onConnect = () => {
this.connectTime = Date.now();
this.connected = true;

this.bindSocket();

Expand Down Expand Up @@ -391,7 +390,8 @@ class Peer extends EventEmitter {

for (const [packetId, entry] of this.responseMap) {
if (now > entry.timeout) {
this.emitError(`Peer (${this.nodePubKey}) is stalling (${packetId})`);
this.emitError(`Peer is stalling (${packetId})`);
entry.reject('response timed out');
this.close(DisconnectionReason.ResponseStalling, packetId);
return;
}
Expand Down Expand Up @@ -472,16 +472,7 @@ class Peer extends EventEmitter {
this.close();
});

this.socket!.on('data', (data) => {
this.lastRecv = Date.now();
const dataStr = data.toString();
if (this.nodePubKey !== undefined) {
this.logger.trace(`Received data from ${this.nodePubKey}: ${dataStr}`);
} else {
this.logger.trace(`Received data from ${addressUtils.toString(this.address)}: ${data.toString()}`);
}
this.parser.feed(data);
});
this.socket!.on('data', this.parser.feed);

this.socket!.setNoDelay(true);
}
Expand Down Expand Up @@ -530,6 +521,10 @@ class Peer extends EventEmitter {
}

private handlePacket = (packet: Packet): void => {
this.lastRecv = Date.now();
const sender = this.nodePubKey !== undefined ? this.nodePubKey : addressUtils.toString(this.address);
this.logger.trace(`Received ${PacketType[packet.type]} packet from ${sender}${JSON.stringify(packet)}`);

if (this.isPacketSolicited(packet)) {
switch (packet.type) {
case PacketType.Hello: {
Expand Down
87 changes: 49 additions & 38 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ class Pool extends EventEmitter {
}
}

public closePeer = async (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string): Promise<void> => {
public closePeer = (nodePubKey: string, reason?: DisconnectionReason, reasonPayload?: string) => {
const peer = this.peers.get(nodePubKey);
if (peer) {
peer.close(reason, reasonPayload);
Expand Down Expand Up @@ -570,12 +570,21 @@ class Pool extends EventEmitter {
return;
}

// check to make sure the socket was not destroyed during or immediately after the handshake
if (!peer.connected) {
this.logger.error(`the socket to node ${peer.nodePubKey} was disconnected`);
return;
}

this.logger.verbose(`opened connection to ${peer.nodePubKey} at ${addressUtils.toString(peer.address)}`);
this.peers.set(peer.nodePubKey, peer);
peer.active = true;

// request peer's orders
peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs }));
if (this.handshakeData.pairs.length > 0) {
// request peer's orders
peer.sendPacket(new packets.GetOrdersPacket({ pairIds: this.handshakeData.pairs }));
}

if (this.config.discover) {
// request peer's known nodes only if p2p.discover option is true
peer.sendPacket(new packets.GetNodesPacket());
Expand Down Expand Up @@ -648,41 +657,7 @@ class Pool extends EventEmitter {
this.pendingOutboundPeers.delete(peer.nodePubKey!);
});

peer.once('close', async () => {
if (!peer.nodePubKey && peer.expectedNodePubKey) {
this.pendingOutboundPeers.delete(peer.expectedNodePubKey);
}

if (!peer.active) {
return;
}

if (peer.nodePubKey) {
this.pendingOutboundPeers.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer);

// if handshake passed and peer disconnected from us for stalling or without specifying any reason -
// reconnect, for that might have been due to a temporary loss in connectivity
const unintentionalDisconnect =
(peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) &&
(peer.recvDisconnectionReason === undefined || peer.recvDisconnectionReason === DisconnectionReason.ResponseStalling);
const addresses = peer.addresses || [];

let lastAddress;
if (peer.inbound) {
lastAddress = addresses.length > 0 ? addresses[0] : undefined;
} else {
lastAddress = peer.address;
}

if (peer.nodePubKey && unintentionalDisconnect && (addresses.length || lastAddress)) {
this.logger.debug(`attempting to reconnect to a disconnected peer ${peer.nodePubKey}`);
const node = { lastAddress, addresses, nodePubKey: peer.nodePubKey };
await this.tryConnectNode(node, true);
}
});
peer.once('close', () => this.handlePeerClose(peer));

peer.once('reputation', async (event) => {
this.logger.debug(`Peer (${peer.nodePubKey || addressUtils.toString(peer.address)}), received reputation event: ${ReputationEvent[event]}`);
Expand All @@ -692,6 +667,42 @@ class Pool extends EventEmitter {
});
}

private handlePeerClose = async (peer: Peer) => {
if (!peer.nodePubKey && peer.expectedNodePubKey) {
this.pendingOutboundPeers.delete(peer.expectedNodePubKey);
}

if (!peer.active) {
return;
}

if (peer.nodePubKey) {
this.pendingOutboundPeers.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer);

// if handshake passed and peer disconnected from us for stalling or without specifying any reason -
// reconnect, for that might have been due to a temporary loss in connectivity
const unintentionalDisconnect =
(peer.sentDisconnectionReason === undefined || peer.sentDisconnectionReason === DisconnectionReason.ResponseStalling) &&
(peer.recvDisconnectionReason === undefined || peer.recvDisconnectionReason === DisconnectionReason.ResponseStalling);
const addresses = peer.addresses || [];

let lastAddress;
if (peer.inbound) {
lastAddress = addresses.length > 0 ? addresses[0] : undefined;
} else {
lastAddress = peer.address;
}

if (peer.nodePubKey && unintentionalDisconnect && (addresses.length || lastAddress)) {
this.logger.debug(`attempting to reconnect to a disconnected peer ${peer.nodePubKey}`);
const node = { lastAddress, addresses, nodePubKey: peer.nodePubKey };
await this.tryConnectNode(node, true);
}
}

private closePeers = () => {
this.peers.forEach(peer => peer.close(DisconnectionReason.Shutdown));
}
Expand Down
9 changes: 5 additions & 4 deletions lib/p2p/packets/Packet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,17 @@ abstract class Packet<T = any> implements PacketInterface {

if (bodyOrPacket) {
this.body = bodyOrPacket;
this.header.hash = this.hash(bodyOrPacket);
this.header.hash = Packet.hash(bodyOrPacket);
}
}
}
public abstract serialize(): Uint8Array;

private hash(value: any): string {
private static hash(value: any): string {
return MD5(stringify(value)).toString(CryptoJS.enc.Base64);
}

public abstract serialize(): Uint8Array;

/**
* Verify the header hash against the packet body.
*/
Expand All @@ -104,7 +105,7 @@ abstract class Packet<T = any> implements PacketInterface {
return false;
}

return this.header.hash === this.hash(this.body);
return this.header.hash === Packet.hash(this.body);
}

/**
Expand Down
29 changes: 17 additions & 12 deletions test/integration/Pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import NodeKey from '../../lib/nodekey/NodeKey';
import Peer from '../../lib/p2p/Peer';
import { Address } from '../../lib/types/p2p';
import { DisconnectionReason } from '../../lib/types/enums';
import sinon from 'sinon';

chai.use(chaiAsPromised);

Expand All @@ -16,15 +17,23 @@ describe('P2P Pool Tests', async () => {
let pool: Pool;
let nodePubKeyOne: string;
const loggers = Logger.createLoggers(Level.Warn);
const sandbox = sinon.createSandbox();

const createPeer = (nodePubKey: string, addresses: Address[]) => {
const peer = new Peer(loggers.p2p, addresses[0]);
peer['handshakeState'] = {
const peer = sandbox.createStubInstance(Peer) as any;
peer.address = addresses[0];
peer.handshakeState = {
addresses,
nodePubKey,
version: 'test',
pairs: [],
};
peer.socket = {};
peer.sendPacket = () => {};
peer.close = () => {
peer.sentDisconnectionReason = DisconnectionReason.NotAcceptingConnections;
pool['handlePeerClose'](peer);
};
pool['bindPeer'](peer);

return peer;
Expand All @@ -35,6 +44,7 @@ describe('P2P Pool Tests', async () => {

const config = new Config();
config.p2p.listen = false;
config.p2p.discover = false;
db = new DB(loggers.db);
await db.init();

Expand Down Expand Up @@ -71,19 +81,17 @@ describe('P2P Pool Tests', async () => {
expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host);
});

it('should close a peer', async () => {
const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
expect(closePromise).to.be.fulfilled;
await closePromise;
it('should close a peer', () => {
pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
expect(pool['peers'].has(nodePubKeyOne)).to.be.false;
expect(pool['peers'].size).to.equal(0);
});

it('should update a node on new handshake', async () => {
/* const addresses = [{ host: '86.75.30.9', port: 8885 }];
const addresses = [{ host: '86.75.30.9', port: 8885 }];
const peer = createPeer(nodePubKeyOne, addresses);

await pool['handleOpen'](peer);
pool['handleOpen'](peer);

const nodeInstance = await db.models.Node.find({
where: {
Expand All @@ -94,10 +102,7 @@ describe('P2P Pool Tests', async () => {
expect(nodeInstance!.addresses!).to.have.length(1);
expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host);

const closePromise = pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
expect(closePromise).to.be.fulfilled;
await closePromise;
*/
pool.closePeer(nodePubKeyOne, DisconnectionReason.NotAcceptingConnections);
});

after(async () => {
Expand Down
6 changes: 3 additions & 3 deletions test/p2p/sanity.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ describe('P2P Sanity Tests', () => {
.to.be.rejectedWith('already connected');
});

it('should disconnect successfully', async () => {
await expect(nodeOne['pool']['closePeer'](nodeTwo.nodePubKey, DisconnectionReason.NotAcceptingConnections)).to.be.fulfilled;
it('should disconnect successfully', () => {
nodeOne['pool']['closePeer'](nodeTwo.nodePubKey, DisconnectionReason.NotAcceptingConnections);

const listPeersResult = await nodeOne.service.listPeers();
const listPeersResult = nodeOne.service.listPeers();
expect(listPeersResult).to.be.empty;
});

Expand Down

0 comments on commit c774838

Please sign in to comment.