Skip to content

Commit

Permalink
feat(swaps): client heartbeat
Browse files Browse the repository at this point in the history
This commit shortens the interval for the outbound capacity check timer
from 60 to 3 seconds and sets the client as disconnected any time a
call fails due to an unreachable server. This makes the capacity checks
act like a heartbeat, checking that the server is reachable every few
seconds even in the absence of any other activity.

Previously, we had used a dummy server -> client streaming call and
listened for the `error` event on the lnd side, however with newer
versions of lnd and grpc this is no longer a reliable way to tell when
lnd has gone down.

This also resolves an issue where lnd would get stuck in the
`WaitingUnlock` state if it is stopped while xud is running and comes
back online in the locked state.

Closes #1090.
  • Loading branch information
sangaman committed Sep 12, 2019
1 parent e1ee78a commit cf3f07e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 31 deletions.
37 changes: 10 additions & 27 deletions lib/lndclient/LndClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class LndClient extends SwapClient {
private urisList?: string[];
/** The identifier for the chain this lnd instance is using in the format [chain]-[network] like "bitcoin-testnet" */
private chainIdentifier?: string;
private channelSubscription?: ClientReadableStream<lndrpc.ChannelEventUpdate>;
private invoiceSubscriptions = new Map<string, ClientReadableStream<lndrpc.Invoice>>();
private maximumOutboundAmount = 0;
private initWalletResolve?: (value: boolean) => void;
Expand Down Expand Up @@ -163,6 +162,10 @@ class LndClient extends SwapClient {
}
(this.lightning![methodName] as Function)(params, this.meta, (err: grpc.ServiceError, response: U) => {
if (err) {
if (err.code === grpc.status.UNAVAILABLE) {
this.disconnect().catch(this.logger.error);
}
this.logger.trace(`error on ${methodName}: ${err.message}`);
reject(err);
} else {
resolve(response);
Expand All @@ -187,6 +190,10 @@ class LndClient extends SwapClient {
}
(this.invoices![methodName] as Function)(params, this.meta, (err: grpc.ServiceError, response: U) => {
if (err) {
if (err.code === grpc.status.UNAVAILABLE) {
this.disconnect().catch(this.logger.error);
}
this.logger.trace(`error on ${methodName}: ${err.message}`);
reject(err);
} else {
resolve(response);
Expand All @@ -203,6 +210,7 @@ class LndClient extends SwapClient {
}
(this.walletUnlocker![methodName] as Function)(params, this.meta, (err: grpc.ServiceError, response: U) => {
if (err) {
this.logger.trace(`error on ${methodName}: ${err.message}`);
reject(err);
} else {
resolve(response);
Expand Down Expand Up @@ -236,7 +244,7 @@ class LndClient extends SwapClient {
version = lnd.getVersion();
alias = lnd.getAlias();
} catch (err) {
this.logger.error(`LND error: ${err}`);
this.logger.error('getinfo error', err);
error = err.message;
}
}
Expand Down Expand Up @@ -375,8 +383,6 @@ class LndClient extends SwapClient {
this.walletUnlocker.close();
this.walletUnlocker = undefined;
}

this.subscribeChannels();
} else {
await this.setStatus(ClientStatus.OutOfSync);
this.logger.warn(`lnd is out of sync with chain, retrying in ${LndClient.RECONNECT_TIMER} ms`);
Expand Down Expand Up @@ -800,25 +806,6 @@ class LndClient extends SwapClient {
this.invoiceSubscriptions.set(rHash, invoiceSubscription);
}

/**
* Subscribes to channel events.
*/
private subscribeChannels = (): void => {
if (!this.lightning) {
throw errors.LND_IS_UNAVAILABLE(this.status);
}
if (this.channelSubscription) {
this.channelSubscription.cancel();
}

this.channelSubscription = this.lightning.subscribeChannelEvents(new lndrpc.ChannelEventSubscription(), this.meta)
.on('error', async (error) => {
this.channelSubscription = undefined;
this.logger.error(`lnd has been disconnected, error: ${error}`);
await this.disconnect();
});
}

/**
* Attempts to close an open channel.
*/
Expand Down Expand Up @@ -850,10 +837,6 @@ class LndClient extends SwapClient {

/** Lnd specific procedure to disconnect from the server. */
protected disconnect = async () => {
if (this.channelSubscription) {
this.channelSubscription.cancel();
this.channelSubscription = undefined;
}
if (this.lightning) {
this.lightning.close();
this.lightning = undefined;
Expand Down
6 changes: 4 additions & 2 deletions lib/raidenclient/RaidenClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ class RaidenClient extends SwapClient {
this.maximumOutboundAmounts.set(currency, balance);
});
} catch (e) {
// TODO: Mark client as disconnected
this.logger.error(`failed to fetch channelbalances: ${e}`);
}
}
Expand Down Expand Up @@ -391,7 +390,10 @@ class RaidenClient extends SwapClient {
}
});

req.on('error', (err) => {
req.on('error', (err: any) => {
if (err.code === 'ECONNREFUSED') {
this.disconnect().catch(this.logger.error);
}
this.logger.error(err);
reject(err);
});
Expand Down
4 changes: 2 additions & 2 deletions lib/swaps/SwapClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ abstract class SwapClient extends EventEmitter {
private updateCapacityTimer?: NodeJS.Timer;
/** The maximum amount of time we will wait for the connection to be verified during initialization. */
private static INITIALIZATION_TIME_LIMIT = 5000;
/** Time in milliseconds between updating the maximum outbound capacity */
private static CAPACITY_REFRESH_INTERVAL = 60000;
/** Time in milliseconds between updating the maximum outbound capacity. */
private static CAPACITY_REFRESH_INTERVAL = 3000;

constructor(public logger: Logger) {
super();
Expand Down

0 comments on commit cf3f07e

Please sign in to comment.