From f9a40c256ffd81e275f1962543ae830b0bac7321 Mon Sep 17 00:00:00 2001 From: Daniel McNally Date: Mon, 24 Dec 2018 10:25:23 -0500 Subject: [PATCH] feat(p2p/orderbook): drop orders for dropped pairs This adds logic to drop a peer's orders for a given trading pair when we receive an updated handshake state that does not include a trading pair that was supported according to the previous handshake. Without this, we may keep orders in our order book for a peer that no longer supports (and can swap) those orders. Closes #599. --- lib/orderbook/OrderBook.ts | 24 +++++++++++++++--------- lib/p2p/Peer.ts | 28 +++++++++++++++++++--------- lib/p2p/Pool.ts | 15 ++++++++++++--- test/integration/Pool.spec.ts | 3 ++- 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/lib/orderbook/OrderBook.ts b/lib/orderbook/OrderBook.ts index c90911b2c..c59c87633 100644 --- a/lib/orderbook/OrderBook.ts +++ b/lib/orderbook/OrderBook.ts @@ -89,6 +89,7 @@ class OrderBook extends EventEmitter { this.pool.on('packet.getOrders', this.sendOrders); this.pool.on('packet.swapRequest', this.handleSwapRequest); this.pool.on('peer.close', this.removePeerOrders); + this.pool.on('peer.pairDropped', this.removePeerPair); } } @@ -531,19 +532,24 @@ class OrderBook extends EventEmitter { return tp.removePeerOrder(orderId, peerPubKey, quantityToRemove); } - private removePeerOrders = async (peer: Peer): Promise => { - if (!peer.nodePubKey) { + private removePeerOrders = (peerPubKey?: string) => { + if (!peerPubKey) { return; } - this.tradingPairs.forEach((tp) => { - const orders = tp.removePeerOrders(peer.nodePubKey!); - orders.forEach((order) => { - this.emit('peerOrder.invalidation', order); - }); - }); + for (const pairId of this.pairIds.values()) { + this.removePeerPair(peerPubKey, pairId); + } - this.logger.debug(`removed all orders for peer ${peer.nodePubKey}`); + this.logger.debug(`removed all orders for peer ${peerPubKey}`); + } + + private removePeerPair = (peerPubKey: string, pairId: string) => { + const tp = this.getTradingPair(pairId); + const orders = tp.removePeerOrders(peerPubKey); + orders.forEach((order) => { + this.emit('peerOrder.invalidation', order); + }); } /** diff --git a/lib/p2p/Peer.ts b/lib/p2p/Peer.ts index a5e4172ef..71c2bc69b 100644 --- a/lib/p2p/Peer.ts +++ b/lib/p2p/Peer.ts @@ -27,7 +27,7 @@ type PeerInfo = { interface Peer { on(event: 'packet', listener: (packet: Packet) => void): this; on(event: 'error', listener: (err: Error) => void): this; - on(event: 'handshake', listener: () => void): this; + on(event: 'pairDropped', listener: (pair: string) => void): this; once(event: 'open', listener: () => void): this; once(event: 'close', listener: () => void): this; once(event: 'reputation', listener: (event: ReputationEvent) => void): this; @@ -37,7 +37,7 @@ interface Peer { emit(event: 'close'): boolean; emit(event: 'error', err: Error): boolean; emit(event: 'packet', packet: Packet): boolean; - emit(event: 'handshake'): boolean; + emit(event: 'pairDropped', pair: string): boolean; } /** Represents a remote XU peer */ @@ -88,6 +88,10 @@ class Peer extends EventEmitter { return this.handshakeState ? this.handshakeState.addresses : undefined; } + public get pairs(): string[] | undefined { + return this.handshakeState ? this.handshakeState.pairs : undefined; + } + public get connected(): boolean { return this.socket !== undefined && !this.socket.destroyed; } @@ -577,17 +581,15 @@ class Peer extends EventEmitter { } private handleHello = (packet: packets.HelloPacket): void => { - const helloBody = packet.body!; - this.logger.verbose(`received hello packet from ${this.nodePubKey || addressUtils.toString(this.address)}: ${JSON.stringify(helloBody)}`); - if (this.nodePubKey && this.nodePubKey !== helloBody.nodePubKey) { + const newHandshakeState = packet.body!; + this.logger.verbose(`received hello packet from ${this.nodePubKey || addressUtils.toString(this.address)}: ${JSON.stringify(newHandshakeState)}`); + if (this.nodePubKey && this.nodePubKey !== newHandshakeState.nodePubKey) { // peers cannot change their nodepubkey while we are connected to them // TODO: penalize? - this.close(DisconnectionReason.ForbiddenIdentityUpdate, helloBody.nodePubKey); + this.close(DisconnectionReason.ForbiddenIdentityUpdate, newHandshakeState.nodePubKey); return; } - this.handshakeState = packet.body; - const entry = this.responseMap.get(PacketType.Hello.toString()); if (entry) { @@ -595,7 +597,15 @@ class Peer extends EventEmitter { entry.resolve(packet); } - this.emit('handshake'); + const prevHandshakeState = this.handshakeState!; + this.handshakeState = newHandshakeState; + + prevHandshakeState.pairs.forEach((pairId) => { + if (!newHandshakeState.pairs.includes(pairId)) { + // a trading pair was in the old handshake state but not in the updated one + this.emit('pairDropped', pairId); + } + }); } private sendPing = (): packets.PingPacket => { diff --git a/lib/p2p/Pool.ts b/lib/p2p/Pool.ts index 01bde16b4..609edb916 100644 --- a/lib/p2p/Pool.ts +++ b/lib/p2p/Pool.ts @@ -45,7 +45,9 @@ interface Pool { on(event: 'packet.order', listener: (order: IncomingOrder) => void): this; on(event: 'packet.getOrders', listener: (peer: Peer, reqId: string, pairIds: string[]) => void): this; on(event: 'packet.orderInvalidation', listener: (orderInvalidation: OrderPortion, peer: string) => void): this; - on(event: 'peer.close', listener: (peer: Peer) => void): this; + on(event: 'peer.close', listener: (peerPubKey?: string) => void): this; + /** Adds a listener to be called when a peer drops support for a trading pair. */ + on(event: 'peer.pairDropped', listener: (peerPubKey: string, pairId: string) => void): this; on(event: 'packet.swapRequest', listener: (packet: packets.SwapRequestPacket, peer: Peer) => void): this; on(event: 'packet.swapAccepted', listener: (packet: packets.SwapAcceptedPacket, peer: Peer) => void): this; on(event: 'packet.swapComplete', listener: (packet: packets.SwapCompletePacket) => void): this; @@ -53,7 +55,9 @@ interface Pool { emit(event: 'packet.order', order: IncomingOrder): boolean; emit(event: 'packet.getOrders', peer: Peer, reqId: string, pairIds: string[]): boolean; emit(event: 'packet.orderInvalidation', orderInvalidation: OrderPortion, peer: string): boolean; - emit(event: 'peer.close', peer: Peer): boolean; + emit(event: 'peer.close', peerPubKey?: string): boolean; + /** Notifies listeners that a peer has dropped support for a trading pair. */ + emit(event: 'peer.pairDropped', peerPubKey: string, pairId: string): boolean; emit(event: 'packet.swapRequest', packet: packets.SwapRequestPacket, peer: Peer): boolean; emit(event: 'packet.swapAccepted', packet: packets.SwapAcceptedPacket, peer: Peer): boolean; emit(event: 'packet.swapComplete', packet: packets.SwapCompletePacket): boolean; @@ -644,6 +648,11 @@ class Pool extends EventEmitter { await this.handlePacket(peer, packet); }); + peer.on('pairDropped', (pairId) => { + // drop all orders for trading pairs that are no longer supported + this.emit('peer.pairDropped', peer.nodePubKey!, pairId); + }); + peer.on('error', (err) => { // The only situation in which the node should be connected to itself is the // reachability check of the advertised addresses and we don't have to log that @@ -680,7 +689,7 @@ class Pool extends EventEmitter { this.pendingOutboundPeers.delete(peer.nodePubKey); this.peers.delete(peer.nodePubKey); } - this.emit('peer.close', peer); + this.emit('peer.close', peer.nodePubKey); // if handshake passed and peer disconnected from us for stalling or without specifying any reason - // reconnect, for that might have been due to a temporary loss in connectivity diff --git a/test/integration/Pool.spec.ts b/test/integration/Pool.spec.ts index 924281863..69a9d0fb7 100644 --- a/test/integration/Pool.spec.ts +++ b/test/integration/Pool.spec.ts @@ -8,6 +8,7 @@ import NodeKey from '../../lib/nodekey/NodeKey'; import Peer from '../../lib/p2p/Peer'; import { Address } from '../../lib/types/p2p'; import { DisconnectionReason } from '../../lib/types/enums'; +import { HelloPacket } from '../../lib/p2p/packets/types'; import sinon from 'sinon'; chai.use(chaiAsPromised); @@ -26,7 +27,7 @@ describe('P2P Pool Tests', async () => { addresses, nodePubKey, version: 'test', - pairs: [], + pairs: ['LTC/BTC'], }; peer.socket = {}; peer.sendPacket = () => {};