Skip to content

Commit

Permalink
feat(rpc): add new order events rpc subscriptions
Browse files Browse the repository at this point in the history
This commit replaces the old streaming rpc subscription calls with three
new streaming rpc calls:

- subscribeAddedOrders: Notifies when orders are added to the order
book.
- subscribeRemovedOrders: Notifies when orders are removed from the
order book.
- subscribeSwaps: Notifies when swaps that are initiated by a remote
user are completed.

It also restructures and updates the `Order` grpc message type.

Closes #123.
  • Loading branch information
sangaman committed Sep 27, 2018
1 parent 8d0309e commit ba71088
Show file tree
Hide file tree
Showing 20 changed files with 1,300 additions and 943 deletions.
100 changes: 50 additions & 50 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions lib/cli/commands/subscribeorders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { loadXudClient } from '../command';
import { Arguments } from 'yargs';
import { SubscribeAddedOrdersRequest, Order, SubscribeRemovedOrdersRequest, SubscribeSwapsRequest, Swap, OrderRemoval } from '../../proto/xudrpc_pb';

export const command = 'streamorders';

export const describe = 'stream order added, removed, and swapped events (DEMO)';

export const handler = (argv: Arguments) => {
const addedOrdersSubscription = loadXudClient(argv).subscribeAddedOrders(new SubscribeAddedOrdersRequest());
addedOrdersSubscription.on('data', (order: Order) => {
console.log(`Order added: ${JSON.stringify(order.toObject())}`);
});

const removedOrdersSubscription = loadXudClient(argv).subscribeRemovedOrders(new SubscribeRemovedOrdersRequest());
removedOrdersSubscription.on('data', (orderRemoval: OrderRemoval) => {
console.log(`Order removed: ${JSON.stringify(orderRemoval.toObject())}`);
});

const swapsSubscription = loadXudClient(argv).subscribeSwaps(new SubscribeSwapsRequest());
swapsSubscription.on('data', (swap: Swap) => {
console.log(`Order swapped: ${JSON.stringify(swap.toObject())}`);
});
};
15 changes: 0 additions & 15 deletions lib/cli/commands/subscribepeerorders.ts

This file was deleted.

15 changes: 0 additions & 15 deletions lib/cli/commands/subscribeswaps.ts

This file was deleted.

3 changes: 2 additions & 1 deletion lib/grpc/GrpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class GrpcServer {
removeCurrency: grpcService.removeCurrency,
removePair: grpcService.removePair,
shutdown: grpcService.shutdown,
subscribePeerOrders: grpcService.subscribePeerOrders,
subscribeAddedOrders: grpcService.subscribeAddedOrders,
subscribeRemovedOrders: grpcService.subscribeRemovedOrders,
subscribeSwaps: grpcService.subscribeSwaps,
});

Expand Down
61 changes: 46 additions & 15 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import Logger from '../Logger';
import Service from '../service/Service';
import * as xudrpc from '../proto/xudrpc_pb';
import { ResolveRequest, ResolveResponse } from '../proto/lndrpc_pb';
import { StampedPeerOrder, StampedOrder, StampedOwnOrder } from '../types/orders';
import { StampedOrder, isOwnOrder, OrderPortion, SwapResult } from '../types/orders';
import { errorCodes as orderErrorCodes } from '../orderbook/errors';
import { errorCodes as serviceErrorCodes } from '../service/errors';
import { errorCodes as p2pErrorCodes } from '../p2p/errors';
import { errorCodes as lndErrorCodes } from '../lndclient/errors';
import { LndInfo } from '../lndclient/LndClient';

/**
* Convert a [[StampedOrder]] to an xudrpc Order message.
* Creates an xudrpc Order message from a [[StampedOrder]].
*/
const getOrder = (order: StampedOrder) => {
const createOrder = (order: StampedOrder) => {
const grpcOrder = new xudrpc.Order();
grpcOrder.setCanceled(false);
grpcOrder.setCreatedAt(order.createdAt);
grpcOrder.setId(order.id);
grpcOrder.setLocalId((order as StampedOwnOrder).localId);
if (isOwnOrder(order)) {
grpcOrder.setLocalId((order).localId);
grpcOrder.setIsOwnOrder(true);
} else {
grpcOrder.setPeerPubKey((order).peerPubKey);
grpcOrder.setIsOwnOrder(false);
}
grpcOrder.setPairId(order.pairId);
grpcOrder.setPeerPubKey((order as StampedPeerOrder).peerPubKey);
grpcOrder.setPrice(order.price);
grpcOrder.setQuantity(order.quantity);
grpcOrder.setSide(order.isBuy ? xudrpc.OrderSide.BUY : xudrpc.OrderSide.SELL);
Expand Down Expand Up @@ -228,7 +232,7 @@ class GrpcService {

const getOrdersList = <T extends StampedOrder>(orders: T[]) => {
const ordersList: xudrpc.Order[] = [];
orders.forEach(order => ordersList.push(getOrder(<StampedOrder>order)));
orders.forEach(order => ordersList.push(createOrder(<StampedOrder>order)));
return ordersList;
};

Expand Down Expand Up @@ -312,14 +316,14 @@ class GrpcService {
const matchesList: xudrpc.OrderMatch[] = [];
placeOrderResponse.matches.forEach((match) => {
const orderMatch = new xudrpc.OrderMatch();
orderMatch.setMaker(getOrder(match.maker));
orderMatch.setTaker(getOrder(match.taker));
orderMatch.setMaker(createOrder(match.maker));
orderMatch.setTaker(createOrder(match.taker));
matchesList.push(orderMatch);
});
response.setMatchesList(matchesList);

if (placeOrderResponse.remainingOrder) {
response.setRemainingOrder(getOrder(placeOrderResponse.remainingOrder));
response.setRemainingOrder(createOrder(placeOrderResponse.remainingOrder));
}
callback(null, response);
} catch (err) {
Expand Down Expand Up @@ -382,17 +386,44 @@ class GrpcService {
}

/*
* See [[Service.subscribePeerOrders]]
* See [[Service.subscribeAddedOrders]]
*/
public subscribeAddedOrders: grpc.handleServerStreamingCall<xudrpc.SubscribeAddedOrdersRequest, xudrpc.Order> = (call) => {
this.service.subscribeAddedOrders((order: StampedOrder) => {
call.write(createOrder(order));
});
}

/*
* See [[Service.subscribeRemovedOrders]]
*/
public subscribePeerOrders: grpc.handleServerStreamingCall<xudrpc.SubscribePeerOrdersRequest, xudrpc.SubscribePeerOrdersResponse> = (call) => {
this.service.subscribePeerOrders((order: StampedPeerOrder) => call.write({ order }));
public subscribeRemovedOrders: grpc.handleServerStreamingCall<xudrpc.SubscribeRemovedOrdersRequest, xudrpc.OrderRemoval> = (call) => {
this.service.subscribeRemovedOrders((order: OrderPortion) => {
const orderRemoval = new xudrpc.OrderRemoval();
orderRemoval.setPairId(order.pairId);
orderRemoval.setOrderId(order.orderId);
orderRemoval.setQuantity(order.quantity);
orderRemoval.setLocalId(order.localId || '');
orderRemoval.setIsOwnOrder(!!order.localId);
call.write(orderRemoval);
});
}

/*
* See [[Service.subscribeSwaps]]
*/
public subscribeSwaps: grpc.handleServerStreamingCall<xudrpc.SubscribeSwapsRequest, xudrpc.SubscribeSwapsResponse> = (call) => {
this.service.subscribeSwaps((order: StampedOrder) => call.write({ order }));
public subscribeSwaps: grpc.handleServerStreamingCall<xudrpc.SubscribeSwapsRequest, xudrpc.Swap> = (call) => {
this.service.subscribeSwaps((swapResult: SwapResult) => {
const swap = new xudrpc.Swap();
swap.setAmountReceived(swapResult.amountReceived);
swap.setAmountSent(swapResult.amountSent);
swap.setLocalId(swapResult.localId);
swap.setPairId(swapResult.pairId);
swap.setPeerPubKey(swapResult.peerPubKey);
swap.setRHash(swapResult.r_hash);
swap.setOrderId(swapResult.orderId);
call.write(swap);
});
}
}

Expand Down
21 changes: 14 additions & 7 deletions lib/orderbook/MatchingEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,22 @@ class MatchingEngine {
}

/**
* Remove a quantity from a peer order. if the entire quantity is met, the order will be removed entirely.
* @returns undefined if the order wasn't found, otherwise the removed order or order portion
* Removes a quantity from a peer order. If the entire quantity is met, the order will be removed entirely.
* @param quantityToRemove the quantity to remove, if undefined or if greater than or equal to the available
* quantity then the entire order is removed
* @returns the removed order or order portion, otherwise undefined if the order wasn't found
*/
public removePeerOrderQuantity = (orderId: string, quantityToDecrease?: number): StampedPeerOrder | undefined => {
public removePeerOrderQuantity = (orderId: string, quantityToRemove?: number): StampedPeerOrder | undefined => {
const order = this.peerOrders.buy.get(orderId) || this.peerOrders.sell.get(orderId);
if (!order) {
return;
}

if (quantityToDecrease && quantityToDecrease < order.quantity) {
// if quantityToDecrease is below the order quantity, mutate the order quantity, and return a simulation of the removed order portion
order.quantity = order.quantity - quantityToDecrease;
return { ...order, quantity: quantityToDecrease };
if (quantityToRemove && quantityToRemove < order.quantity) {
// if quantityToRemove is below the order quantity, reduce the order quantity
// and return a copy of the order with the quantity that was removed
order.quantity = order.quantity - quantityToRemove;
return { ...order, quantity: quantityToRemove };
} else {
// otherwise, remove the order entirely, and return it
this.removePeerOrder(order);
Expand Down Expand Up @@ -180,6 +183,10 @@ class MatchingEngine {
return removedOrders;
}

/**
* Removes an own order by its global order id.
* @returns the removed order, or undefined if no order with the provided id could be found
*/
public removeOwnOrder = (orderId: string): StampedOwnOrder | undefined => {
const order = this.ownOrders.buy.get(orderId) || this.ownOrders.sell.get(orderId);
if (!order) {
Expand Down
Loading

0 comments on commit ba71088

Please sign in to comment.