Skip to content

Commit

Permalink
feat(rpc): add new order events rpc subscriptions
Browse files Browse the repository at this point in the history
This commit replaces the old streaming rpc subscription calls with three
new streaming rpc calls:

- subscribeAddedOrders: Notifies when orders are added to the order
book.
- subscribeRemovedOrders: Notifies when orders are removed from the
order book.
- subscribeSwaps: Notifies when swaps that are initiated by a remote
user are completed.

It also restructures and updates the `Order` grpc message type.

Closes #123.
  • Loading branch information
sangaman committed Sep 27, 2018
1 parent 8d0309e commit 41ded3f
Show file tree
Hide file tree
Showing 20 changed files with 1,315 additions and 946 deletions.
102 changes: 51 additions & 51 deletions docs/api.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions lib/cli/commands/subscribeorders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { loadXudClient } from '../command';
import { Arguments } from 'yargs';
import { SubscribeAddedOrdersRequest, Order, SubscribeRemovedOrdersRequest, SubscribeSwapsRequest, Swap, OrderRemoval } from '../../proto/xudrpc_pb';

export const command = 'streamorders';

export const describe = 'stream order added, removed, and swapped events (DEMO)';

export const handler = (argv: Arguments) => {
const addedOrdersSubscription = loadXudClient(argv).subscribeAddedOrders(new SubscribeAddedOrdersRequest());
addedOrdersSubscription.on('data', (order: Order) => {
console.log(`Order added: ${JSON.stringify(order.toObject())}`);
});

const removedOrdersSubscription = loadXudClient(argv).subscribeRemovedOrders(new SubscribeRemovedOrdersRequest());
removedOrdersSubscription.on('data', (orderRemoval: OrderRemoval) => {
console.log(`Order removed: ${JSON.stringify(orderRemoval.toObject())}`);
});

const swapsSubscription = loadXudClient(argv).subscribeSwaps(new SubscribeSwapsRequest());
swapsSubscription.on('data', (swap: Swap) => {
console.log(`Order swapped: ${JSON.stringify(swap.toObject())}`);
});
};
15 changes: 0 additions & 15 deletions lib/cli/commands/subscribepeerorders.ts

This file was deleted.

15 changes: 0 additions & 15 deletions lib/cli/commands/subscribeswaps.ts

This file was deleted.

3 changes: 2 additions & 1 deletion lib/grpc/GrpcServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class GrpcServer {
removeCurrency: grpcService.removeCurrency,
removePair: grpcService.removePair,
shutdown: grpcService.shutdown,
subscribePeerOrders: grpcService.subscribePeerOrders,
subscribeAddedOrders: grpcService.subscribeAddedOrders,
subscribeRemovedOrders: grpcService.subscribeRemovedOrders,
subscribeSwaps: grpcService.subscribeSwaps,
});

Expand Down
61 changes: 46 additions & 15 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,28 @@ import Logger from '../Logger';
import Service from '../service/Service';
import * as xudrpc from '../proto/xudrpc_pb';
import { ResolveRequest, ResolveResponse } from '../proto/lndrpc_pb';
import { StampedPeerOrder, StampedOrder, StampedOwnOrder } from '../types/orders';
import { StampedOrder, isOwnOrder, OrderPortion, SwapResult } from '../types/orders';
import { errorCodes as orderErrorCodes } from '../orderbook/errors';
import { errorCodes as serviceErrorCodes } from '../service/errors';
import { errorCodes as p2pErrorCodes } from '../p2p/errors';
import { errorCodes as lndErrorCodes } from '../lndclient/errors';
import { LndInfo } from '../lndclient/LndClient';

/**
* Convert a [[StampedOrder]] to an xudrpc Order message.
* Creates an xudrpc Order message from a [[StampedOrder]].
*/
const getOrder = (order: StampedOrder) => {
const createOrder = (order: StampedOrder) => {
const grpcOrder = new xudrpc.Order();
grpcOrder.setCanceled(false);
grpcOrder.setCreatedAt(order.createdAt);
grpcOrder.setId(order.id);
grpcOrder.setLocalId((order as StampedOwnOrder).localId);
if (isOwnOrder(order)) {
grpcOrder.setLocalId((order).localId);
grpcOrder.setIsOwnOrder(true);
} else {
grpcOrder.setPeerPubKey((order).peerPubKey);
grpcOrder.setIsOwnOrder(false);
}
grpcOrder.setPairId(order.pairId);
grpcOrder.setPeerPubKey((order as StampedPeerOrder).peerPubKey);
grpcOrder.setPrice(order.price);
grpcOrder.setQuantity(order.quantity);
grpcOrder.setSide(order.isBuy ? xudrpc.OrderSide.BUY : xudrpc.OrderSide.SELL);
Expand Down Expand Up @@ -228,7 +232,7 @@ class GrpcService {

const getOrdersList = <T extends StampedOrder>(orders: T[]) => {
const ordersList: xudrpc.Order[] = [];
orders.forEach(order => ordersList.push(getOrder(<StampedOrder>order)));
orders.forEach(order => ordersList.push(createOrder(<StampedOrder>order)));
return ordersList;
};

Expand Down Expand Up @@ -312,14 +316,14 @@ class GrpcService {
const matchesList: xudrpc.OrderMatch[] = [];
placeOrderResponse.matches.forEach((match) => {
const orderMatch = new xudrpc.OrderMatch();
orderMatch.setMaker(getOrder(match.maker));
orderMatch.setTaker(getOrder(match.taker));
orderMatch.setMaker(createOrder(match.maker));
orderMatch.setTaker(createOrder(match.taker));
matchesList.push(orderMatch);
});
response.setMatchesList(matchesList);

if (placeOrderResponse.remainingOrder) {
response.setRemainingOrder(getOrder(placeOrderResponse.remainingOrder));
response.setRemainingOrder(createOrder(placeOrderResponse.remainingOrder));
}
callback(null, response);
} catch (err) {
Expand Down Expand Up @@ -382,17 +386,44 @@ class GrpcService {
}

/*
* See [[Service.subscribePeerOrders]]
* See [[Service.subscribeAddedOrders]]
*/
public subscribeAddedOrders: grpc.handleServerStreamingCall<xudrpc.SubscribeAddedOrdersRequest, xudrpc.Order> = (call) => {
this.service.subscribeAddedOrders((order: StampedOrder) => {
call.write(createOrder(order));
});
}

/*
* See [[Service.subscribeRemovedOrders]]
*/
public subscribePeerOrders: grpc.handleServerStreamingCall<xudrpc.SubscribePeerOrdersRequest, xudrpc.SubscribePeerOrdersResponse> = (call) => {
this.service.subscribePeerOrders((order: StampedPeerOrder) => call.write({ order }));
public subscribeRemovedOrders: grpc.handleServerStreamingCall<xudrpc.SubscribeRemovedOrdersRequest, xudrpc.OrderRemoval> = (call) => {
this.service.subscribeRemovedOrders((order: OrderPortion) => {
const orderRemoval = new xudrpc.OrderRemoval();
orderRemoval.setPairId(order.pairId);
orderRemoval.setOrderId(order.orderId);
orderRemoval.setQuantity(order.quantity);
orderRemoval.setLocalId(order.localId || '');
orderRemoval.setIsOwnOrder(!!order.localId);
call.write(orderRemoval);
});
}

/*
* See [[Service.subscribeSwaps]]
*/
public subscribeSwaps: grpc.handleServerStreamingCall<xudrpc.SubscribeSwapsRequest, xudrpc.SubscribeSwapsResponse> = (call) => {
this.service.subscribeSwaps((order: StampedOrder) => call.write({ order }));
public subscribeSwaps: grpc.handleServerStreamingCall<xudrpc.SubscribeSwapsRequest, xudrpc.Swap> = (call) => {
this.service.subscribeSwaps((swapResult: SwapResult) => {
const swap = new xudrpc.Swap();
swap.setAmountReceived(swapResult.amountReceived);
swap.setAmountSent(swapResult.amountSent);
swap.setLocalId(swapResult.localId);
swap.setPairId(swapResult.pairId);
swap.setPeerPubKey(swapResult.peerPubKey);
swap.setRHash(swapResult.r_hash);
swap.setOrderId(swapResult.orderId);
call.write(swap);
});
}
}

Expand Down
Loading

0 comments on commit 41ded3f

Please sign in to comment.