Skip to content

Commit

Permalink
moved db close operation to start subscribe and fixed saveOrder
Browse files Browse the repository at this point in the history
  • Loading branch information
rsercano committed Jan 2, 2021
1 parent 86d25ea commit 9a77ed3
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 18 deletions.
18 changes: 14 additions & 4 deletions src/arby.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type StartArbyParams = {
getCentralizedExchangeOrder$,
getOpenDEXcomplete$,
shutdown$,
models,
saveOrder$,
}: GetTradeParams) => Observable<boolean>;
cleanup$: ({
config,
Expand Down Expand Up @@ -135,7 +137,6 @@ export const startArby = ({
loggers,
removeOpenDEXorders$,
removeCEXorders$,
closeDB$,
CEX,
})
).pipe(
Expand All @@ -149,7 +150,6 @@ export const startArby = ({
removeOpenDEXorders$,
removeCEXorders$,
CEX,
closeDB$,
});
})
);
Expand Down Expand Up @@ -182,8 +182,18 @@ if (!module.parent) {
} else {
console.log(error);
}
process.exit(1);
closeDB$().subscribe({
complete: () => {
process.exit(1);
},
});
},
complete: () => {
closeDB$().subscribe({
complete: () => {
console.log('Shutdown complete. Goodbye, Arby.');
},
});
},
complete: () => console.log('Shutdown complete. Goodbye, Arby.'),
});
}
2 changes: 1 addition & 1 deletion src/centralized/execute-order.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ const executeCEXorder$ = ({
logger.info(
`Centralized exchange order finished: ${JSON.stringify(order)}`
);
saveOrder$({ order, logger, models });
saveOrder$({ order, logger, models }).subscribe();
}),
catchError((e, caught) => {
logger.warn(`Failed to execute CEX order: ${e}. Retrying in 1000ms`);
Expand Down
2 changes: 0 additions & 2 deletions src/opendex/catch-error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import { Logger, Loggers } from '../logger';
import { errorCodes, errors } from '../opendex/errors';
import { GetCleanupParams } from '../trade/cleanup';
import { removeOpenDEXorders$ } from './remove-orders';
import { closeDB$ } from '../db/db';

const catchOpenDEXerror = (
loggers: Loggers,
Expand Down Expand Up @@ -74,7 +73,6 @@ const catchOpenDEXerror = (
loggers,
removeOpenDEXorders$,
removeCEXorders$,
closeDB$,
CEX,
}).pipe(ignoreElements()),
timer(RETRY_INTERVAL)
Expand Down
4 changes: 0 additions & 4 deletions src/trade/cleanup.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const assertGetTrade = ({
const removeOpenDEXorders$ = () => {
return (openDEXorders$ as unknown) as Observable<null>;
};
const closeDB$ = () => {
return (cold(inputEvents.closeDB$) as unknown) as Observable<void>;
};
const CEXorders$ = cold(inputEvents.removeCEXorders$);
const removeCEXorders$ = () => CEXorders$;
const CEX = (null as unknown) as Exchange;
Expand All @@ -48,7 +45,6 @@ const assertGetTrade = ({
removeOpenDEXorders$,
removeCEXorders$,
CEX,
closeDB$,
});
expectObservable(cleanup$, inputEvents.unsubscribe).toBe(expected);
expectSubscriptions(CEXorders$.subscriptions).toBe(
Expand Down
7 changes: 0 additions & 7 deletions src/trade/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type GetCleanupParams = {
orderId: string
) => Observable<Order>
) => Observable<unknown>;
closeDB$: () => Observable<void>;
CEX: Exchange;
};

Expand All @@ -56,7 +55,6 @@ const getCleanup$ = ({
loggers,
removeOpenDEXorders$,
removeCEXorders$,
closeDB$,
CEX,
}: GetCleanupParams): Observable<unknown> => {
const retryOnError = (logger: Logger, source: Observable<any>) => {
Expand All @@ -79,11 +77,6 @@ const getCleanup$ = ({
const retryOnErrorOpenDEX = curriedRetryOnError(loggers.opendex);
const retryonErrorCEX = curriedRetryOnError(loggers.centralized);
return combineLatest(
closeDB$().pipe(
tap(() => {
loggers.db.info('DB has been closed');
})
),
removeOpenDEXorders$({
config,
getXudClient$,
Expand Down

0 comments on commit 9a77ed3

Please sign in to comment.