Skip to content

Commit

Permalink
fix(rpc): close listeners for streaming calls
Browse files Browse the repository at this point in the history
This removes internal listeners that are created to handle streaming
rpc calls. When the rpc call is canceled, we remove all listeners
related to that call.

Fixes #1640.
  • Loading branch information
sangaman committed Jun 12, 2020
1 parent 43c03e7 commit b18d4d7
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 19 deletions.
23 changes: 18 additions & 5 deletions lib/grpc/GrpcService.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* tslint:disable no-floating-promises no-null-keyword */
import grpc, { status } from 'grpc';
import grpc, { status, ServerWritableStream } from 'grpc';
import { SwapFailureReason } from '../constants/enums';
import { LndInfo } from '../lndclient/types';
import { isOwnOrder, Order, OrderPortion, PlaceOrderEventType, PlaceOrderResult } from '../orderbook/types';
Expand Down Expand Up @@ -139,6 +139,12 @@ const createPlaceOrderEvent = (e: ServicePlaceOrderEvent) => {
return placeOrderEvent;
};

function getCancelledPromise(call: ServerWritableStream<any>) {
return new Promise<void>((resolve) => {
call.once('cancelled', resolve);
});
}

/** Class containing the available RPC methods for XUD */
class GrpcService {
public locked = false;
Expand Down Expand Up @@ -826,6 +832,9 @@ class GrpcService {
call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' });
return;
}

const cancelledPromise = getCancelledPromise(call);

this.service.subscribeOrders(call.request.toObject(), (order?: Order, orderRemoval?: OrderPortion) => {
const orderUpdate = new xudrpc.OrderUpdate();
if (order) {
Expand All @@ -840,8 +849,8 @@ class GrpcService {
orderUpdate.setOrderRemoval(grpcOrderRemoval);
}
call.write(orderUpdate);
});
this.addStream(call);
},
cancelledPromise);
}

/*
Expand All @@ -852,9 +861,11 @@ class GrpcService {
call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' });
return;
}

const cancelledPromise = getCancelledPromise(call);
this.service.subscribeSwapFailures(call.request.toObject(), (result: SwapFailure) => {
call.write(createSwapFailure(result));
});
}, cancelledPromise);
this.addStream(call);
}

Expand All @@ -866,9 +877,11 @@ class GrpcService {
call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' });
return;
}

const cancelledPromise = getCancelledPromise(call);
this.service.subscribeSwaps(call.request.toObject(), (result: SwapSuccess) => {
call.write(createSwapSuccess(result));
});
}, cancelledPromise);
this.addStream(call);
}
}
Expand Down
58 changes: 44 additions & 14 deletions lib/service/Service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import swapsErrors from '../swaps/errors';
import { TradingLimits } from '../swaps/SwapClient';
import SwapClientManager from '../swaps/SwapClientManager';
import Swaps from '../swaps/Swaps';
import { ResolveRequest, SwapFailure, SwapSuccess } from '../swaps/types';
import { ResolveRequest, SwapDeal, SwapFailure, SwapSuccess } from '../swaps/types';
import { isNodePubKey } from '../utils/aliasUtils';
import { parseUri, toUri, UriParts } from '../utils/uriUtils';
import { checkDecimalPlaces, sortOrders, toEip55Address } from '../utils/utils';
Expand Down Expand Up @@ -666,7 +666,11 @@ class Service {
/*
* Subscribe to orders being added to the order book.
*/
public subscribeOrders = (args: { existing: boolean }, callback: (order?: Order, orderRemoval?: OrderPortion) => void) => {
public subscribeOrders = (
args: { existing: boolean },
callback: (order?: Order, orderRemoval?: OrderPortion) => void,
cancelledPromise: Promise<void>,
) => {
if (args.existing) {
this.orderBook.pairIds.forEach((pair) => {
const ownOrders = this.orderBook.getOwnOrders(pair);
Expand All @@ -678,38 +682,64 @@ class Service {
});
}

this.orderBook.on('peerOrder.incoming', order => callback(order));
this.orderBook.on('ownOrder.added', order => callback(order));
this.orderBook.on('peerOrder.incoming', callback);
this.orderBook.on('ownOrder.added', callback);

const removalCallback = (orderRemoval: OrderPortion) => callback(undefined, orderRemoval);
this.orderBook.on('peerOrder.invalidation', removalCallback);
this.orderBook.on('peerOrder.filled', removalCallback);
this.orderBook.on('ownOrder.filled', removalCallback);
this.orderBook.on('ownOrder.removed', removalCallback);

this.orderBook.on('peerOrder.invalidation', orderRemoval => callback(undefined, orderRemoval));
this.orderBook.on('peerOrder.filled', orderRemoval => callback(undefined, orderRemoval));
this.orderBook.on('ownOrder.filled', orderRemoval => callback(undefined, orderRemoval));
this.orderBook.on('ownOrder.removed', orderRemoval => callback(undefined, orderRemoval));
// we wait for the streaming call to be canceled
cancelledPromise.then(() => {
// after it is canceled we can remove all the listeners set above
this.orderBook.removeListener('peerOrder.incoming', callback);
this.orderBook.removeListener('ownOrder.added', callback);
this.orderBook.removeListener('peerOrder.invalidation', removalCallback);
this.orderBook.removeListener('peerOrder.filled', removalCallback);
this.orderBook.removeListener('ownOrder.filled', removalCallback);
this.orderBook.removeListener('ownOrder.removed', removalCallback);
}).catch(this.logger.error);
}

/*
* Subscribe to completed swaps.
*/
public subscribeSwaps = async (args: { includeTaker: boolean }, callback: (swapSuccess: SwapSuccess) => void) => {
this.swaps.on('swap.paid', (swapSuccess) => {
public subscribeSwaps = async (
args: { includeTaker: boolean },
callback: (swapSuccess: SwapSuccess) => void,
cancelledPromise: Promise<void>,
) => {
const onSwapPaid = (swapSuccess: SwapSuccess) => {
// always alert client for maker matches, taker matches only when specified
if (swapSuccess.role === SwapRole.Maker || args.includeTaker) {
callback(swapSuccess);
}
});
};
this.swaps.on('swap.paid', onSwapPaid);

cancelledPromise.then(() => this.swaps.removeListener('swap.paid', onSwapPaid)).catch(this.logger.error);
}

/*
* Subscribe to failed swaps.
*/
public subscribeSwapFailures = async (args: { includeTaker: boolean }, callback: (swapFailure: SwapFailure) => void) => {
this.swaps.on('swap.failed', (deal) => {
public subscribeSwapFailures = async (
args: { includeTaker: boolean },
callback: (swapFailure: SwapFailure) => void,
cancelledPromise: Promise<void>,
) => {
const onSwapFailed = (deal: SwapDeal) => {
this.logger.trace(`notifying SwapFailure subscription for ${deal.rHash} with role ${SwapRole[deal.role]}`);
// always alert client for maker matches, taker matches only when specified
if (deal.role === SwapRole.Maker || args.includeTaker) {
callback(deal as SwapFailure);
}
});
};
this.swaps.on('swap.failed', onSwapFailed);

cancelledPromise.then(() => this.swaps.removeListener('swap.failed', onSwapFailed)).catch(this.logger.error);
}

/**
Expand Down

0 comments on commit b18d4d7

Please sign in to comment.