Skip to content

Commit

Permalink
feat(p2p/orderbook): drop orders for dropped pairs
Browse files Browse the repository at this point in the history
This adds logic to drop a peer's orders for a given trading pair when
we receive an updated handshake state that does not include a trading
pair that was supported according to the previous handshake. Without
this, we may keep orders in our order book for a peer that no longer
supports (and can swap) those orders.

Closes #599.
  • Loading branch information
sangaman committed Nov 30, 2018
1 parent 1b88899 commit 0d6f8d5
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 5 deletions.
8 changes: 7 additions & 1 deletion lib/orderbook/OrderBook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class OrderBook extends EventEmitter {
this.pool.on('packet.getOrders', this.sendOrders);
this.pool.on('packet.swapRequest', this.handleSwapRequest);
this.pool.on('peer.close', this.removePeerOrders);
this.pool.on('peer.dropPair', this.removePeerPair);
}
}

Expand Down Expand Up @@ -487,7 +488,7 @@ class OrderBook extends EventEmitter {
return tp.removePeerOrder(orderId, peerPubKey, quantityToRemove);
}

private removePeerOrders = async (peer: Peer): Promise<void> => {
private removePeerOrders = (peer: Peer) => {
if (!peer.nodePubKey) {
return;
}
Expand All @@ -502,6 +503,11 @@ class OrderBook extends EventEmitter {
this.logger.debug(`removed all orders for peer ${peer.nodePubKey}`);
}

private removePeerPair = (peerPubKey: string, pairId: string) => {
const tp = this.getTradingPair(pairId);
tp.removePeerOrders(peerPubKey);
}

/**
* Send local orders to a given peer in an [[OrdersPacket].
* @param reqId the request id of a [[GetOrdersPacket]] packet that this method is responding to
Expand Down
11 changes: 8 additions & 3 deletions lib/p2p/Peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type PeerInfo = {
interface Peer {
on(event: 'packet', listener: (packet: Packet) => void): this;
on(event: 'error', listener: (err: Error) => void): this;
on(event: 'handshake', listener: () => void): this;
on(event: 'handshake', listener: (oldHandshakeState?: HandshakeState) => void): this;
once(event: 'open', listener: () => void): this;
once(event: 'close', listener: () => void): this;
once(event: 'reputation', listener: (event: ReputationEvent) => void): this;
Expand All @@ -38,7 +38,7 @@ interface Peer {
emit(event: 'close'): boolean;
emit(event: 'error', err: Error): boolean;
emit(event: 'packet', packet: Packet): boolean;
emit(event: 'handshake'): boolean;
emit(event: 'handshake', oldHandshakeState?: HandshakeState): boolean;
}

/** Represents a remote XU peer */
Expand Down Expand Up @@ -86,6 +86,10 @@ class Peer extends EventEmitter {
return this.handshakeState ? this.handshakeState.addresses : undefined;
}

public get pairs(): string[] | undefined {
return this.handshakeState ? this.handshakeState.pairs : undefined;
}

public get info(): PeerInfo {
return {
address: addressUtils.toString(this.address),
Expand Down Expand Up @@ -577,6 +581,7 @@ class Peer extends EventEmitter {
return;
}

const oldHandshakeState = this.handshakeState;
this.handshakeState = packet.body;

const entry = this.responseMap.get(PacketType.Hello);
Expand All @@ -586,7 +591,7 @@ class Peer extends EventEmitter {
entry.resolve(packet);
}

this.emit('handshake');
this.emit('handshake', oldHandshakeState);
}

private sendPing = (): packets.PingPacket => {
Expand Down
16 changes: 16 additions & 0 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ interface Pool {
on(event: 'packet.getOrders', listener: (peer: Peer, reqId: string, pairIds: string[]) => void): this;
on(event: 'packet.orderInvalidation', listener: (orderInvalidation: OrderPortion, peer: string) => void): this;
on(event: 'peer.close', listener: (peer: Peer) => void): this;
/** Adds a listener to be called when a peer drops support for a trading pair. */
on(event: 'peer.dropPair', listener: (peerPubKey: string, pairId: string) => void): this;
on(event: 'packet.swapRequest', listener: (packet: packets.SwapRequestPacket, peer: Peer) => void): this;
on(event: 'packet.swapResponse', listener: (packet: packets.SwapAcceptedPacket, peer: Peer) => void): this;
on(event: 'packet.swapComplete', listener: (packet: packets.SwapCompletePacket) => void): this;
Expand All @@ -55,6 +57,8 @@ interface Pool {
emit(event: 'packet.getOrders', peer: Peer, reqId: string, pairIds: string[]): boolean;
emit(event: 'packet.orderInvalidation', orderInvalidation: OrderPortion, peer: string): boolean;
emit(event: 'peer.close', peer: Peer): boolean;
/** Notifies listeners that a peer has dropped support for a trading pair. */
emit(event: 'peer.dropPair', peerPubKey: string, pairId: string): boolean;
emit(event: 'packet.swapRequest', packet: packets.SwapRequestPacket, peer: Peer): boolean;
emit(event: 'packet.swapResponse', packet: packets.SwapAcceptedPacket, peer: Peer): boolean;
emit(event: 'packet.swapComplete', packet: packets.SwapCompletePacket): boolean;
Expand Down Expand Up @@ -633,6 +637,18 @@ class Pool extends EventEmitter {
await this.handlePacket(peer, packet);
});

peer.on('handshake', (oldHandshakeState) => {
// drop all orders for trading pairs that are no longer supported
if (oldHandshakeState) {
oldHandshakeState.pairs.forEach((pairId) => {
if (!peer.pairs!.includes(pairId)) {
// a trading pair was in the old handshake state but not in the updated one
this.emit('peer.dropPair', peer.nodePubKey!, pairId);
}
});
}
});

peer.on('error', (err) => {
// The only situation in which the node should be connected to itself is the
// reachability check of the advertised addresses and we don't have to log that
Expand Down
21 changes: 20 additions & 1 deletion test/integration/Pool.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import Config from '../../lib/Config';
import NodeKey from '../../lib/nodekey/NodeKey';
import Peer from '../../lib/p2p/Peer';
import { Address } from '../../lib/types/p2p';
import { HelloPacket } from '../../lib/p2p/packets/types';
import { PacketType } from '../../lib/p2p/packets';

chai.use(chaiAsPromised);

Expand All @@ -22,7 +24,7 @@ describe('P2P Pool Tests', async () => {
addresses,
nodePubKey,
version: 'test',
pairs: [],
pairs: ['LTC/BTC'],
};
pool['bindPeer'](peer);

Expand Down Expand Up @@ -92,6 +94,23 @@ describe('P2P Pool Tests', async () => {
expect(nodeInstance!.addresses![0].host).to.equal(addresses[0].host);
});

it('should emit peer.dropPair when a handshake drops a trading pair', (done) => {
const peerOne = pool['peers'].get(nodePubKeyOne)!;
expect(peerOne.pairs).to.include('LTC/BTC');
const newHelloPacket: HelloPacket = new HelloPacket({
version: 'test',
nodePubKey: nodePubKeyOne,
pairs: [],
});

pool.once('peer.dropPair', (peerPubKey: string, pairId: string) => {
expect(peerPubKey).to.equal(nodePubKeyOne);
expect(pairId).to.equal('LTC/BTC');
done();
});
peerOne['handleHello'](newHelloPacket);
});

after(async () => {
await db.close();
await pool.disconnect();
Expand Down

0 comments on commit 0d6f8d5

Please sign in to comment.