Skip to content

Commit

Permalink
feat(p2p/orderbook): drop orders for dropped pairs
Browse files Browse the repository at this point in the history
This adds logic to drop a peer's orders for a given trading pair when
we receive an updated handshake state that does not include a trading
pair that was supported according to the previous handshake. Without
this, we may keep orders in our order book for a peer that no longer
supports (and can swap) those orders.

Closes #599.
  • Loading branch information
sangaman committed Dec 24, 2018
1 parent c774838 commit f9a40c2
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 22 deletions.
24 changes: 15 additions & 9 deletions lib/orderbook/OrderBook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class OrderBook extends EventEmitter {
this.pool.on('packet.getOrders', this.sendOrders);
this.pool.on('packet.swapRequest', this.handleSwapRequest);
this.pool.on('peer.close', this.removePeerOrders);
this.pool.on('peer.pairDropped', this.removePeerPair);
}
}

Expand Down Expand Up @@ -531,19 +532,24 @@ class OrderBook extends EventEmitter {
return tp.removePeerOrder(orderId, peerPubKey, quantityToRemove);
}

private removePeerOrders = async (peer: Peer): Promise<void> => {
if (!peer.nodePubKey) {
private removePeerOrders = (peerPubKey?: string) => {
if (!peerPubKey) {
return;
}

this.tradingPairs.forEach((tp) => {
const orders = tp.removePeerOrders(peer.nodePubKey!);
orders.forEach((order) => {
this.emit('peerOrder.invalidation', order);
});
});
for (const pairId of this.pairIds.values()) {
this.removePeerPair(peerPubKey, pairId);
}

this.logger.debug(`removed all orders for peer ${peer.nodePubKey}`);
this.logger.debug(`removed all orders for peer ${peerPubKey}`);
}

private removePeerPair = (peerPubKey: string, pairId: string) => {
const tp = this.getTradingPair(pairId);
const orders = tp.removePeerOrders(peerPubKey);
orders.forEach((order) => {
this.emit('peerOrder.invalidation', order);
});
}

/**
Expand Down
28 changes: 19 additions & 9 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type PeerInfo = {
interface Peer {
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'handshake', listener: () => void): this;
on(event: 'pairDropped', listener: (pair: string) => void): this;
once(event: 'open', listener: () => void): this;
once(event: 'close', listener: () => void): this;
once(event: 'reputation', listener: (event: ReputationEvent) => void): this;
Expand All @@ -37,7 +37,7 @@ interface Peer {
emit(event: 'close'): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: 'packet', packet: Packet): boolean;
emit(event: 'handshake'): boolean;
emit(event: 'pairDropped', pair: string): boolean;
}

/** Represents a remote XU peer */
Expand Down Expand Up @@ -88,6 +88,10 @@ class Peer extends EventEmitter {
return this.handshakeState ? this.handshakeState.addresses : undefined;
}

public get pairs(): string[] | undefined {
return this.handshakeState ? this.handshakeState.pairs : undefined;
}

public get connected(): boolean {
return this.socket !== undefined && !this.socket.destroyed;
}
Expand Down Expand Up @@ -577,25 +581,31 @@ class Peer extends EventEmitter {
}

private handleHello = (packet: packets.HelloPacket): void => {
const helloBody = packet.body!;
this.logger.verbose(`received hello packet from ${this.nodePubKey || addressUtils.toString(this.address)}: ${JSON.stringify(helloBody)}`);
if (this.nodePubKey && this.nodePubKey !== helloBody.nodePubKey) {
const newHandshakeState = packet.body!;
this.logger.verbose(`received hello packet from ${this.nodePubKey || addressUtils.toString(this.address)}: ${JSON.stringify(newHandshakeState)}`);
if (this.nodePubKey && this.nodePubKey !== newHandshakeState.nodePubKey) {
// peers cannot change their nodepubkey while we are connected to them
// TODO: penalize?
this.close(DisconnectionReason.ForbiddenIdentityUpdate, helloBody.nodePubKey);
this.close(DisconnectionReason.ForbiddenIdentityUpdate, newHandshakeState.nodePubKey);
return;
}

this.handshakeState = packet.body;

const entry = this.responseMap.get(PacketType.Hello.toString());

if (entry) {
this.responseMap.delete(PacketType.Hello.toString());
entry.resolve(packet);
}

this.emit('handshake');
const prevHandshakeState = this.handshakeState!;
this.handshakeState = newHandshakeState;

prevHandshakeState.pairs.forEach((pairId) => {
if (!newHandshakeState.pairs.includes(pairId)) {
// a trading pair was in the old handshake state but not in the updated one
this.emit('pairDropped', pairId);
}
});
}

private sendPing = (): packets.PingPacket => {
Expand Down
15 changes: 12 additions & 3 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,19 @@ interface Pool {
on(event: 'packet.order', listener: (order: IncomingOrder) => void): this;
on(event: 'packet.getOrders', listener: (peer: Peer, reqId: string, pairIds: string[]) => void): this;
on(event: 'packet.orderInvalidation', listener: (orderInvalidation: OrderPortion, peer: string) => void): this;
on(event: 'peer.close', listener: (peer: Peer) => void): this;
on(event: 'peer.close', listener: (peerPubKey?: string) => void): this;
/** Adds a listener to be called when a peer drops support for a trading pair. */
on(event: 'peer.pairDropped', listener: (peerPubKey: string, pairId: string) => void): this;
on(event: 'packet.swapRequest', listener: (packet: packets.SwapRequestPacket, peer: Peer) => void): this;
on(event: 'packet.swapAccepted', listener: (packet: packets.SwapAcceptedPacket, peer: Peer) => void): this;
on(event: 'packet.swapComplete', listener: (packet: packets.SwapCompletePacket) => void): this;
on(event: 'packet.swapFailed', listener: (packet: packets.SwapFailedPacket) => void): this;
emit(event: 'packet.order', order: IncomingOrder): boolean;
emit(event: 'packet.getOrders', peer: Peer, reqId: string, pairIds: string[]): boolean;
emit(event: 'packet.orderInvalidation', orderInvalidation: OrderPortion, peer: string): boolean;
emit(event: 'peer.close', peer: Peer): boolean;
emit(event: 'peer.close', peerPubKey?: string): boolean;
/** Notifies listeners that a peer has dropped support for a trading pair. */
emit(event: 'peer.pairDropped', peerPubKey: string, pairId: string): boolean;
emit(event: 'packet.swapRequest', packet: packets.SwapRequestPacket, peer: Peer): boolean;
emit(event: 'packet.swapAccepted', packet: packets.SwapAcceptedPacket, peer: Peer): boolean;
emit(event: 'packet.swapComplete', packet: packets.SwapCompletePacket): boolean;
Expand Down Expand Up @@ -644,6 +648,11 @@ class Pool extends EventEmitter {
await this.handlePacket(peer, packet);
});

peer.on('pairDropped', (pairId) => {
// drop all orders for trading pairs that are no longer supported
this.emit('peer.pairDropped', peer.nodePubKey!, pairId);
});

peer.on('error', (err) => {
// The only situation in which the node should be connected to itself is the
// reachability check of the advertised addresses and we don't have to log that
Expand Down Expand Up @@ -680,7 +689,7 @@ class Pool extends EventEmitter {
this.pendingOutboundPeers.delete(peer.nodePubKey);
this.peers.delete(peer.nodePubKey);
}
this.emit('peer.close', peer);
this.emit('peer.close', peer.nodePubKey);

// if handshake passed and peer disconnected from us for stalling or without specifying any reason -
// reconnect, for that might have been due to a temporary loss in connectivity
Expand Down
3 changes: 2 additions & 1 deletion test/integration/Pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import NodeKey from '../../lib/nodekey/NodeKey';
import Peer from '../../lib/p2p/Peer';
import { Address } from '../../lib/types/p2p';
import { DisconnectionReason } from '../../lib/types/enums';
import { HelloPacket } from '../../lib/p2p/packets/types';
import sinon from 'sinon';

chai.use(chaiAsPromised);
Expand All @@ -26,7 +27,7 @@ describe('P2P Pool Tests', async () => {
addresses,
nodePubKey,
version: 'test',
pairs: [],
pairs: ['LTC/BTC'],
};
peer.socket = {};
peer.sendPacket = () => {};
Expand Down

0 comments on commit f9a40c2

Please sign in to comment.