From 9a77ed32372d218435cdfe444f032dcd1671b173 Mon Sep 17 00:00:00 2001 From: rsercano Date: Sat, 2 Jan 2021 13:14:19 +0300 Subject: [PATCH] moved db close operation to start subscribe and fixed saveOrder --- src/arby.ts | 18 ++++++++++++++---- src/centralized/execute-order.ts | 2 +- src/opendex/catch-error.ts | 2 -- src/trade/cleanup.spec.ts | 4 ---- src/trade/cleanup.ts | 7 ------- 5 files changed, 15 insertions(+), 18 deletions(-) diff --git a/src/arby.ts b/src/arby.ts index fd2ee89..4951dd7 100644 --- a/src/arby.ts +++ b/src/arby.ts @@ -34,6 +34,8 @@ type StartArbyParams = { getCentralizedExchangeOrder$, getOpenDEXcomplete$, shutdown$, + models, + saveOrder$, }: GetTradeParams) => Observable; cleanup$: ({ config, @@ -135,7 +137,6 @@ export const startArby = ({ loggers, removeOpenDEXorders$, removeCEXorders$, - closeDB$, CEX, }) ).pipe( @@ -149,7 +150,6 @@ export const startArby = ({ removeOpenDEXorders$, removeCEXorders$, CEX, - closeDB$, }); }) ); @@ -182,8 +182,18 @@ if (!module.parent) { } else { console.log(error); } - process.exit(1); + closeDB$().subscribe({ + complete: () => { + process.exit(1); + }, + }); + }, + complete: () => { + closeDB$().subscribe({ + complete: () => { + console.log('Shutdown complete. Goodbye, Arby.'); + }, + }); }, - complete: () => console.log('Shutdown complete. Goodbye, Arby.'), }); } diff --git a/src/centralized/execute-order.ts b/src/centralized/execute-order.ts index 08303bb..d0d80f1 100644 --- a/src/centralized/execute-order.ts +++ b/src/centralized/execute-order.ts @@ -61,7 +61,7 @@ const executeCEXorder$ = ({ logger.info( `Centralized exchange order finished: ${JSON.stringify(order)}` ); - saveOrder$({ order, logger, models }); + saveOrder$({ order, logger, models }).subscribe(); }), catchError((e, caught) => { logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`); diff --git a/src/opendex/catch-error.ts b/src/opendex/catch-error.ts index 57ff5d0..5df672d 100644 --- a/src/opendex/catch-error.ts +++ b/src/opendex/catch-error.ts @@ -10,7 +10,6 @@ import { Logger, Loggers } from '../logger'; import { errorCodes, errors } from '../opendex/errors'; import { GetCleanupParams } from '../trade/cleanup'; import { removeOpenDEXorders$ } from './remove-orders'; -import { closeDB$ } from '../db/db'; const catchOpenDEXerror = ( loggers: Loggers, @@ -74,7 +73,6 @@ const catchOpenDEXerror = ( loggers, removeOpenDEXorders$, removeCEXorders$, - closeDB$, CEX, }).pipe(ignoreElements()), timer(RETRY_INTERVAL) diff --git a/src/trade/cleanup.spec.ts b/src/trade/cleanup.spec.ts index 8433d19..460fa51 100644 --- a/src/trade/cleanup.spec.ts +++ b/src/trade/cleanup.spec.ts @@ -36,9 +36,6 @@ const assertGetTrade = ({ const removeOpenDEXorders$ = () => { return (openDEXorders$ as unknown) as Observable; }; - const closeDB$ = () => { - return (cold(inputEvents.closeDB$) as unknown) as Observable; - }; const CEXorders$ = cold(inputEvents.removeCEXorders$); const removeCEXorders$ = () => CEXorders$; const CEX = (null as unknown) as Exchange; @@ -48,7 +45,6 @@ const assertGetTrade = ({ removeOpenDEXorders$, removeCEXorders$, CEX, - closeDB$, }); expectObservable(cleanup$, inputEvents.unsubscribe).toBe(expected); expectSubscriptions(CEXorders$.subscriptions).toBe( diff --git a/src/trade/cleanup.ts b/src/trade/cleanup.ts index 2001d5c..cc1f999 100644 --- a/src/trade/cleanup.ts +++ b/src/trade/cleanup.ts @@ -47,7 +47,6 @@ type GetCleanupParams = { orderId: string ) => Observable ) => Observable; - closeDB$: () => Observable; CEX: Exchange; }; @@ -56,7 +55,6 @@ const getCleanup$ = ({ loggers, removeOpenDEXorders$, removeCEXorders$, - closeDB$, CEX, }: GetCleanupParams): Observable => { const retryOnError = (logger: Logger, source: Observable) => { @@ -79,11 +77,6 @@ const getCleanup$ = ({ const retryOnErrorOpenDEX = curriedRetryOnError(loggers.opendex); const retryonErrorCEX = curriedRetryOnError(loggers.centralized); return combineLatest( - closeDB$().pipe( - tap(() => { - loggers.db.info('DB has been closed'); - }) - ), removeOpenDEXorders$({ config, getXudClient$,