diff --git a/lib/grpc/GrpcService.ts b/lib/grpc/GrpcService.ts index c6048427f..c0f6616b8 100644 --- a/lib/grpc/GrpcService.ts +++ b/lib/grpc/GrpcService.ts @@ -1,5 +1,5 @@ /* tslint:disable no-floating-promises no-null-keyword */ -import grpc, { status } from 'grpc'; +import grpc, { status, ServerWritableStream } from 'grpc'; import { SwapFailureReason } from '../constants/enums'; import { LndInfo } from '../lndclient/types'; import { isOwnOrder, Order, OrderPortion, PlaceOrderEventType, PlaceOrderResult } from '../orderbook/types'; @@ -139,6 +139,12 @@ const createPlaceOrderEvent = (e: ServicePlaceOrderEvent) => { return placeOrderEvent; }; +function getCancelledPromise(call: ServerWritableStream) { + return new Promise((resolve) => { + call.once('cancelled', resolve); + }); +} + /** Class containing the available RPC methods for XUD */ class GrpcService { public locked = false; @@ -826,6 +832,9 @@ class GrpcService { call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' }); return; } + + const cancelledPromise = getCancelledPromise(call); + this.service.subscribeOrders(call.request.toObject(), (order?: Order, orderRemoval?: OrderPortion) => { const orderUpdate = new xudrpc.OrderUpdate(); if (order) { @@ -840,8 +849,8 @@ class GrpcService { orderUpdate.setOrderRemoval(grpcOrderRemoval); } call.write(orderUpdate); - }); - this.addStream(call); + }, + cancelledPromise); } /* @@ -852,9 +861,11 @@ class GrpcService { call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' }); return; } + + const cancelledPromise = getCancelledPromise(call); this.service.subscribeSwapFailures(call.request.toObject(), (result: SwapFailure) => { call.write(createSwapFailure(result)); - }); + }, cancelledPromise); this.addStream(call); } @@ -866,9 +877,11 @@ class GrpcService { call.emit('error', { code: status.UNAVAILABLE, message: 'xud is starting', name: 'NotReadyError' }); return; } + + const cancelledPromise = getCancelledPromise(call); this.service.subscribeSwaps(call.request.toObject(), (result: SwapSuccess) => { call.write(createSwapSuccess(result)); - }); + }, cancelledPromise); this.addStream(call); } } diff --git a/lib/service/Service.ts b/lib/service/Service.ts index fa6ea5f1e..6815564bc 100644 --- a/lib/service/Service.ts +++ b/lib/service/Service.ts @@ -9,7 +9,7 @@ import swapsErrors from '../swaps/errors'; import { TradingLimits } from '../swaps/SwapClient'; import SwapClientManager from '../swaps/SwapClientManager'; import Swaps from '../swaps/Swaps'; -import { ResolveRequest, SwapFailure, SwapSuccess } from '../swaps/types'; +import { ResolveRequest, SwapDeal, SwapFailure, SwapSuccess } from '../swaps/types'; import { isNodePubKey } from '../utils/aliasUtils'; import { parseUri, toUri, UriParts } from '../utils/uriUtils'; import { checkDecimalPlaces, sortOrders, toEip55Address } from '../utils/utils'; @@ -666,7 +666,11 @@ class Service { /* * Subscribe to orders being added to the order book. */ - public subscribeOrders = (args: { existing: boolean }, callback: (order?: Order, orderRemoval?: OrderPortion) => void) => { + public subscribeOrders = ( + args: { existing: boolean }, + callback: (order?: Order, orderRemoval?: OrderPortion) => void, + cancelledPromise: Promise, + ) => { if (args.existing) { this.orderBook.pairIds.forEach((pair) => { const ownOrders = this.orderBook.getOwnOrders(pair); @@ -678,38 +682,64 @@ class Service { }); } - this.orderBook.on('peerOrder.incoming', order => callback(order)); - this.orderBook.on('ownOrder.added', order => callback(order)); + this.orderBook.on('peerOrder.incoming', callback); + this.orderBook.on('ownOrder.added', callback); + + const removalCallback = (orderRemoval: OrderPortion) => callback(undefined, orderRemoval); + this.orderBook.on('peerOrder.invalidation', removalCallback); + this.orderBook.on('peerOrder.filled', removalCallback); + this.orderBook.on('ownOrder.filled', removalCallback); + this.orderBook.on('ownOrder.removed', removalCallback); - this.orderBook.on('peerOrder.invalidation', orderRemoval => callback(undefined, orderRemoval)); - this.orderBook.on('peerOrder.filled', orderRemoval => callback(undefined, orderRemoval)); - this.orderBook.on('ownOrder.filled', orderRemoval => callback(undefined, orderRemoval)); - this.orderBook.on('ownOrder.removed', orderRemoval => callback(undefined, orderRemoval)); + // we wait for the streaming call to be canceled + cancelledPromise.then(() => { + // after it is canceled we can remove all the listeners set above + this.orderBook.removeListener('peerOrder.incoming', callback); + this.orderBook.removeListener('ownOrder.added', callback); + this.orderBook.removeListener('peerOrder.invalidation', removalCallback); + this.orderBook.removeListener('peerOrder.filled', removalCallback); + this.orderBook.removeListener('ownOrder.filled', removalCallback); + this.orderBook.removeListener('ownOrder.removed', removalCallback); + }).catch(this.logger.error); } /* * Subscribe to completed swaps. */ - public subscribeSwaps = async (args: { includeTaker: boolean }, callback: (swapSuccess: SwapSuccess) => void) => { - this.swaps.on('swap.paid', (swapSuccess) => { + public subscribeSwaps = async ( + args: { includeTaker: boolean }, + callback: (swapSuccess: SwapSuccess) => void, + cancelledPromise: Promise, + ) => { + const onSwapPaid = (swapSuccess: SwapSuccess) => { // always alert client for maker matches, taker matches only when specified if (swapSuccess.role === SwapRole.Maker || args.includeTaker) { callback(swapSuccess); } - }); + }; + this.swaps.on('swap.paid', onSwapPaid); + + cancelledPromise.then(() => this.swaps.removeListener('swap.paid', onSwapPaid)).catch(this.logger.error); } /* * Subscribe to failed swaps. */ - public subscribeSwapFailures = async (args: { includeTaker: boolean }, callback: (swapFailure: SwapFailure) => void) => { - this.swaps.on('swap.failed', (deal) => { + public subscribeSwapFailures = async ( + args: { includeTaker: boolean }, + callback: (swapFailure: SwapFailure) => void, + cancelledPromise: Promise, + ) => { + const onSwapFailed = (deal: SwapDeal) => { this.logger.trace(`notifying SwapFailure subscription for ${deal.rHash} with role ${SwapRole[deal.role]}`); // always alert client for maker matches, taker matches only when specified if (deal.role === SwapRole.Maker || args.includeTaker) { callback(deal as SwapFailure); } - }); + }; + this.swaps.on('swap.failed', onSwapFailed); + + cancelledPromise.then(() => this.swaps.removeListener('swap.failed', onSwapFailed)).catch(this.logger.error); } /**