Skip to content

Commit

Permalink
feat(p2p): replace order in single packet
Browse files Browse the repository at this point in the history
This modifies the procedure for replacing an order in the order book.
Previously, it would go like this:

1. Remove old order & send order invalidation to peers.
2. Place new order and wait for matching to complete (which may take
some time if it involves swap attempts).
3. Send new order to peers.

Now it goes:

1. Put the old order on hold, preventing further swaps/matching.
2. Place the new order and wait for matching to complete.
3. Remove the old order from the order book.
4. Send a single packet to peers with new order info and old order id.
5. Peers that receive this packet will remove the old order and add
the new one sequentially.

Closes #1805.
  • Loading branch information
sangaman committed Aug 14, 2020
1 parent 0d20cd3 commit 8fb68a4
Show file tree
Hide file tree
Showing 13 changed files with 375 additions and 81 deletions.
106 changes: 88 additions & 18 deletions lib/orderbook/OrderBook.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import errors, { errorCodes } from './errors';
import OrderBookRepository from './OrderBookRepository';
import TradingPair from './TradingPair';
import {
IncomingOrder, isOwnOrder, Order, OrderBookThresholds, OrderIdentifier, OrderPortion, OutgoingOrder, OwnLimitOrder, OwnMarketOrder,
IncomingOrder, isOwnOrder, Order, OrderBookThresholds, OrderIdentifier,
OrderInvalidation, OrderPortion, OutgoingOrder, OwnLimitOrder, OwnMarketOrder,
OwnOrder, Pair, PeerOrder, PlaceOrderEvent, PlaceOrderEventType, PlaceOrderResult,
} from './types';

Expand Down Expand Up @@ -122,9 +123,9 @@ class OrderBook extends EventEmitter {
this.bindSwaps();
}

private static createOutgoingOrder = (order: OwnOrder): OutgoingOrder => {
private static createOutgoingOrder = (order: OwnOrder, replaceOrderId?: string): OutgoingOrder => {
const { createdAt, localId, initialQuantity, hold, ...outgoingOrder } = order;
return outgoingOrder;
return replaceOrderId ? { ...outgoingOrder, replaceOrderId } : outgoingOrder;
}

private checkThresholdCompliance = (order: OwnOrder | IncomingOrder) => {
Expand Down Expand Up @@ -180,7 +181,12 @@ class OrderBook extends EventEmitter {
// we must remove the amount that was put on hold while the swap was pending for the remaining order
this.removeOrderHold(orderId, pairId, quantity);

const ownOrder = this.removeOwnOrder(orderId, pairId, quantity, peerPubKey);
const ownOrder = this.removeOwnOrder({
orderId,
pairId,
takerPubKey: peerPubKey,
quantityToRemove: quantity,
});
this.emit('ownOrder.swapped', { pairId, quantity, id: orderId });
await this.persistTrade({
quantity: swapSuccess.quantity,
Expand Down Expand Up @@ -327,8 +333,13 @@ class OrderBook extends EventEmitter {
return pair.destroy();
}

public placeLimitOrder = async (order: OwnLimitOrder, immediateOrCancel = false,
onUpdate?: (e: PlaceOrderEvent) => void): Promise<PlaceOrderResult> => {
public placeLimitOrder = async ({ order, immediateOrCancel = false, replaceOrderId, onUpdate }:
{
order: OwnLimitOrder,
immediateOrCancel?: boolean,
replaceOrderId?: string,
onUpdate?: (e: PlaceOrderEvent) => void,
}): Promise<PlaceOrderResult> => {
const stampedOrder = this.stampOwnOrder(order);

if (order.quantity * order.price < 1) {
Expand All @@ -350,13 +361,17 @@ class OrderBook extends EventEmitter {

return this.placeOrder({
onUpdate,
replaceOrderId,
order: stampedOrder,
discardRemaining: immediateOrCancel,
maxTime: Date.now() + OrderBook.MAX_PLACEORDER_ITERATIONS_TIME,
});
}

public placeMarketOrder = async (order: OwnMarketOrder, onUpdate?: (e: PlaceOrderEvent) => void): Promise<PlaceOrderResult> => {
public placeMarketOrder = async ({ order, onUpdate }: {
order: OwnMarketOrder,
onUpdate?: (e: PlaceOrderEvent) => void,
}): Promise<PlaceOrderResult> => {
if (this.nomatching) {
throw errors.MARKET_ORDERS_NOT_ALLOWED();
}
Expand Down Expand Up @@ -389,12 +404,14 @@ class OrderBook extends EventEmitter {
retry = false,
onUpdate,
maxTime,
replaceOrderId,
}: {
order: OwnOrder,
discardRemaining?: boolean,
retry?: boolean,
onUpdate?: (e: PlaceOrderEvent) => void,
maxTime?: number,
replaceOrderId?: string,
}): Promise<PlaceOrderResult> => {
// Check if order complies to thresholds
if (this.thresholds.minQuantity > 0) {
Expand All @@ -403,6 +420,19 @@ class OrderBook extends EventEmitter {
}
}

assert(!(replaceOrderId && discardRemaining), 'can not replace order and discard remaining order');

let replacedOrderIdentifier: OrderIdentifier | undefined;
if (replaceOrderId) {
// put the order we are replacing on hold while we place the new order
replacedOrderIdentifier = this.localIdMap.get(replaceOrderId);
if (!replacedOrderIdentifier) {
throw errors.ORDER_NOT_FOUND(replaceOrderId);
}
assert(replacedOrderIdentifier.pairId === order.pairId);
this.addOrderHold(replacedOrderIdentifier.id, replacedOrderIdentifier.pairId);
}

// this method can be called recursively on swap failures retries.
// if max time exceeded, don't try to match
if (maxTime && Date.now() > maxTime) {
Expand Down Expand Up @@ -567,6 +597,9 @@ class OrderBook extends EventEmitter {
// failed swaps will be added to the remaining order which may be added to the order book.
await Promise.all(matchPromises);

if (replacedOrderIdentifier) {
this.removeOrderHold(replacedOrderIdentifier.id, replacedOrderIdentifier.pairId);
}
if (remainingOrder) {
if (discardRemaining) {
this.logger.verbose(`no more matches found for order ${order.id}, remaining order will be discarded`);
Expand All @@ -576,9 +609,15 @@ class OrderBook extends EventEmitter {
// instead we preserve the remainder and return it to the parent caller, which will sum
// up any remaining orders and add them to the order book as a single order once
// matching is complete
this.addOwnOrder(remainingOrder);
this.addOwnOrder(remainingOrder, replacedOrderIdentifier?.id);
onUpdate && onUpdate({ type: PlaceOrderEventType.RemainingOrder, order: remainingOrder });
}
} else if (replacedOrderIdentifier) {
// we tried to replace an order but the replacement order was fully matched, so simply remove the original order
this.removeOwnOrder({
orderId: replacedOrderIdentifier.id,
pairId: replacedOrderIdentifier.pairId,
});
}

failedMakerOrders.forEach((peerOrder) => {
Expand Down Expand Up @@ -630,18 +669,28 @@ class OrderBook extends EventEmitter {

/**
* Adds an own order to the order book and broadcasts it to peers.
* Optionally removes/replaces an existing order.
* @returns false if it's a duplicated order or with an invalid pair id, otherwise true
*/
private addOwnOrder = (order: OwnOrder): boolean => {
private addOwnOrder = (order: OwnOrder, replaceOrderId?: string): boolean => {
const tp = this.getTradingPair(order.pairId);

if (replaceOrderId) {
this.removeOwnOrder({
orderId: replaceOrderId,
pairId: order.pairId,
noBroadcast: true,
});
}

const result = tp.addOwnOrder(order);
assert(result, 'own order id is duplicated');

this.localIdMap.set(order.localId, { id: order.id, pairId: order.pairId });

this.emit('ownOrder.added', order);

const outgoingOrder = OrderBook.createOutgoingOrder(order);
const outgoingOrder = OrderBook.createOutgoingOrder(order, replaceOrderId);
this.pool.broadcastOrder(outgoingOrder);
return true;
}
Expand Down Expand Up @@ -742,22 +791,34 @@ class OrderBook extends EventEmitter {

const removableQuantity = order.quantity - order.hold;
if (remainingQuantityToRemove <= removableQuantity) {
this.removeOwnOrder(order.id, order.pairId, remainingQuantityToRemove);
this.removeOwnOrder({
orderId: order.id,
pairId: order.pairId,
quantityToRemove: remainingQuantityToRemove,
});
remainingQuantityToRemove = 0;
} else {
// we can't immediately remove the entire quantity because of a hold on the order.
if (!allowAsyncRemoval) {
throw errors.QUANTITY_ON_HOLD(localId, order.hold);
}

this.removeOwnOrder(order.id, order.pairId, removableQuantity);
this.removeOwnOrder({
orderId: order.id,
pairId: order.pairId,
quantityToRemove: removableQuantity,
});
remainingQuantityToRemove -= removableQuantity;

const failedHandler = (deal: SwapDeal) => {
if (deal.orderId === order.id) {
// remove the portion that failed now that it's not on hold
const quantityToRemove = Math.min(deal.quantity!, remainingQuantityToRemove);
this.removeOwnOrder(order.id, order.pairId, quantityToRemove);
this.removeOwnOrder({
quantityToRemove,
orderId: order.id,
pairId: order.pairId,
});
cleanup(quantityToRemove);
}
};
Expand Down Expand Up @@ -786,12 +847,12 @@ class OrderBook extends EventEmitter {
return remainingQuantityToRemove;
}

private addOrderHold = (orderId: string, pairId: string, holdAmount: number) => {
private addOrderHold = (orderId: string, pairId: string, holdAmount?: number) => {
const tp = this.getTradingPair(pairId);
tp.addOrderHold(orderId, holdAmount);
}

private removeOrderHold = (orderId: string, pairId: string, holdAmount: number) => {
private removeOrderHold = (orderId: string, pairId: string, holdAmount?: number) => {
const tp = this.getTradingPair(pairId);
tp.removeOrderHold(orderId, holdAmount);
}
Expand All @@ -802,7 +863,14 @@ class OrderBook extends EventEmitter {
* @param takerPubKey the node pub key of the taker who filled this order, if applicable
* @returns the removed portion of the order
*/
private removeOwnOrder = (orderId: string, pairId: string, quantityToRemove?: number, takerPubKey?: string) => {
private removeOwnOrder = ({ orderId, pairId, quantityToRemove, takerPubKey, noBroadcast }:
{
orderId: string,
pairId: string,
quantityToRemove?: number,
takerPubKey?: string,
noBroadcast?: boolean,
}) => {
const tp = this.getTradingPair(pairId);
try {
const removeResult = tp.removeOwnOrder(orderId, quantityToRemove);
Expand All @@ -811,7 +879,9 @@ class OrderBook extends EventEmitter {
this.localIdMap.delete(removeResult.order.localId);
}

this.pool.broadcastOrderInvalidation(removeResult.order, takerPubKey);
if (!noBroadcast) {
this.pool.broadcastOrderInvalidation(removeResult.order, takerPubKey);
}
return removeResult.order;
} catch (err) {
if (quantityToRemove !== undefined) {
Expand Down Expand Up @@ -979,7 +1049,7 @@ class OrderBook extends EventEmitter {
return { ...order, id, initialQuantity: order.quantity, hold: 0, createdAt: ms() };
}

private handleOrderInvalidation = (oi: OrderPortion, peerPubKey: string) => {
private handleOrderInvalidation = (oi: OrderInvalidation, peerPubKey: string) => {
try {
const removeResult = this.removePeerOrder(oi.id, oi.pairId, peerPubKey, oi.quantity);
this.emit('peerOrder.invalidation', removeResult.order);
Expand Down
35 changes: 25 additions & 10 deletions lib/orderbook/TradingPair.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,20 +308,35 @@ class TradingPair {
return maps.buyMap.get(orderId) || maps.sellMap.get(orderId);
}

public addOrderHold = (orderId: string, holdAmount: number) => {
public addOrderHold = (orderId: string, holdAmount?: number) => {
const order = this.getOwnOrder(orderId);
assert(holdAmount > 0);
assert(order.hold + holdAmount <= order.quantity, 'the amount of an order on hold cannot exceed the available quantity');
order.hold += holdAmount;
this.logger.debug(`added hold of ${holdAmount} on order ${orderId}`);
if (holdAmount === undefined) {
if (order.hold > 0) {
// we can't put an entire order on hold if part of it is already on hold
throw errors.QUANTITY_ON_HOLD(order.localId, order.hold);
}
order.hold = order.quantity;
this.logger.debug(`placed entire order ${orderId} on hold`);
} else {
assert(holdAmount > 0);
assert(order.hold + holdAmount <= order.quantity, 'the amount of an order on hold cannot exceed the available quantity');
order.hold += holdAmount;
this.logger.debug(`added hold of ${holdAmount} on order ${orderId}`);
}
}

public removeOrderHold = (orderId: string, holdAmount: number) => {
public removeOrderHold = (orderId: string, holdAmount?: number) => {
const order = this.getOwnOrder(orderId);
assert(holdAmount > 0);
assert(order.hold >= holdAmount, 'cannot remove more than is currently on hold for an order');
order.hold -= holdAmount;
this.logger.debug(`removed hold of ${holdAmount} on order ${orderId}`);
if (holdAmount === undefined) {
assert(order.hold > 0);
order.hold = 0;
this.logger.debug(`removed entire hold on order ${orderId}`);
} else {
assert(holdAmount > 0);
assert(order.hold >= holdAmount, 'cannot remove more than is currently on hold for an order');
order.hold -= holdAmount;
this.logger.debug(`removed hold of ${holdAmount} on order ${orderId}`);
}
}

/**
Expand Down
9 changes: 8 additions & 1 deletion lib/orderbook/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ export type PeerOrder = LimitOrder & Stamp & Remote;
export type Order = OwnOrder | PeerOrder;

/** An outgoing local order which only includes fields that are relevant to peers. */
export type OutgoingOrder = Pick<OwnOrder, Exclude<keyof OwnOrder, 'localId' | 'createdAt' | 'hold' | 'initialQuantity'>>;
export type OutgoingOrder = Pick<OwnOrder, Exclude<keyof OwnOrder, 'localId' | 'createdAt' | 'hold' | 'initialQuantity'>> & {
replaceOrderId?: string;
};

/** An incoming peer order which only includes fields that are relevant to us. */
export type IncomingOrder = OutgoingOrder & Remote;
Expand All @@ -110,6 +112,11 @@ export type OrderPortion = OrderIdentifier & {
localId?: string;
};

export type OrderInvalidation = OrderIdentifier & {
/** The quantity of the order being removed, or the entire order if quantity is undefined */
quantity?: number;
};

export type Currency = {
/** The ticker symbol for this currency such as BTC, LTC, ETH, etc... */
id: string;
Expand Down
27 changes: 18 additions & 9 deletions lib/p2p/Pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { DisconnectionReason, ReputationEvent, SwapFailureReason, XuNetwork } fr
import { Models } from '../db/DB';
import Logger from '../Logger';
import NodeKey from '../nodekey/NodeKey';
import { IncomingOrder, OrderPortion, OutgoingOrder } from '../orderbook/types';
import { IncomingOrder, OrderInvalidation, OrderPortion, OutgoingOrder } from '../orderbook/types';
import addressUtils from '../utils/addressUtils';
import { pubKeyToAlias } from '../utils/aliasUtils';
import { getExternalIp } from '../utils/utils';
Expand All @@ -29,7 +29,7 @@ type NodeReputationInfo = {
interface Pool {
on(event: 'packet.order', listener: (order: IncomingOrder) => void): this;
on(event: 'packet.getOrders', listener: (peer: Peer, reqId: string, pairIds: string[]) => void): this;
on(event: 'packet.orderInvalidation', listener: (orderInvalidation: OrderPortion, peer: string) => void): this;
on(event: 'packet.orderInvalidation', listener: (orderInvalidation: OrderInvalidation, peer: string) => void): this;
on(event: 'peer.active', listener: (peerPubKey: string) => void): this;
on(event: 'peer.close', listener: (peerPubKey?: string) => void): this;
/** Adds a listener to be called when a peer's advertised but inactive pairs should be verified. */
Expand All @@ -43,7 +43,7 @@ interface Pool {
on(event: 'packet.swapFailed', listener: (packet: packets.SwapFailedPacket) => void): this;
emit(event: 'packet.order', order: IncomingOrder): boolean;
emit(event: 'packet.getOrders', peer: Peer, reqId: string, pairIds: string[]): boolean;
emit(event: 'packet.orderInvalidation', orderInvalidation: OrderPortion, peer: string): boolean;
emit(event: 'packet.orderInvalidation', orderInvalidation: OrderInvalidation, peer: string): boolean;
emit(event: 'peer.active', peerPubKey: string): boolean;
emit(event: 'peer.close', peerPubKey?: string): boolean;
/** Notifies listeners that a peer's advertised but inactive pairs should be verified. */
Expand Down Expand Up @@ -773,19 +773,28 @@ class Pool extends EventEmitter {
case PacketType.Order: {
const receivedOrder: OutgoingOrder = (packet as packets.OrderPacket).body!;
this.logger.trace(`received order from ${peer.label}: ${JSON.stringify(receivedOrder)}`);
const incomingOrder: IncomingOrder = { ...receivedOrder, peerPubKey: peer.nodePubKey! };
const { id, pairId } = receivedOrder;

if (peer.isPairActive(pairId)) {
if (receivedOrder.replaceOrderId) {
const orderInvalidation: OrderInvalidation = {
pairId,
id: receivedOrder.replaceOrderId,
};
this.emit('packet.orderInvalidation', orderInvalidation, peer.nodePubKey!);
}

if (peer.isPairActive(incomingOrder.pairId)) {
const incomingOrder: IncomingOrder = { ...receivedOrder, peerPubKey: peer.nodePubKey! };
this.emit('packet.order', incomingOrder);
} else {
this.logger.debug(`received order ${incomingOrder.id} for deactivated trading pair`);
this.logger.debug(`received order ${id} for deactivated trading pair`);
}
break;
}
case PacketType.OrderInvalidation: {
const orderPortion = (packet as packets.OrderInvalidationPacket).body!;
this.logger.trace(`received order invalidation from ${peer.label}: ${JSON.stringify(orderPortion)}`);
this.emit('packet.orderInvalidation', orderPortion, peer.nodePubKey as string);
const orderInvalidation = (packet as packets.OrderInvalidationPacket).body!;
this.logger.trace(`received order invalidation from ${peer.label}: ${JSON.stringify(orderInvalidation)}`);
this.emit('packet.orderInvalidation', orderInvalidation, peer.nodePubKey as string);
break;
}
case PacketType.GetOrders: {
Expand Down
Loading

0 comments on commit 8fb68a4

Please sign in to comment.