Skip to content

Commit

Permalink
fix: blotter highlighting wrong row or not at all
Browse files Browse the repository at this point in the history
  • Loading branch information
algreasley committed Sep 27, 2023
1 parent 26a2e7f commit 15d83c7
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 73 deletions.
44 changes: 18 additions & 26 deletions packages/client/src/client/App/Trades/TradesState/tableTrades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ import { bind } from "@react-rxjs/core"
import { createSignal } from "@react-rxjs/utils"
import { startOfDay } from "date-fns"
import { combineLatest, merge, Observable } from "rxjs"
import { delay, filter, map, mergeMap, scan, startWith } from "rxjs/operators"
import { delay, filter, map, mergeMap, startWith } from "rxjs/operators"

import { HIGHLIGHT_ROW_FLASH_TIME } from "@/client/constants"
import { creditTrades$, trades$ } from "@/services/trades"
import { QUOTE_ACCEPTED_RFQ_UPDATE } from "@/generated/TradingGateway"
import { creditRfqUpdates$ } from "@/services/credit"
import { tradesStream$ } from "@/services/trades"
import { TradeType } from "@/services/trades/types"

import { ColDef } from "./colConfig"
Expand Down Expand Up @@ -357,33 +359,23 @@ export const [creditTradeRowHighlight$, setCreditTradeRowHighlight] =
/**
* Emit tradeId of new trades after the initial load
*/
const newTradeId$ = merge(trades$, creditTrades$).pipe(
scan(
(acc, trades) => {
return {
stateOfWorld: acc.stateOfWorld && acc.trades.length === 0,
trades: trades,
skip: acc.trades.length === trades.length,
}
},
{ stateOfWorld: true, trades: [], skip: false } as {
stateOfWorld: boolean
trades: TradeType[]
skip: boolean
},
),
filter(
({ stateOfWorld, trades, skip }) =>
!stateOfWorld && trades.length > 0 && !skip,
const newTradeId$ = tradesStream$.pipe(
filter(({ isStateOfTheWorld }) => !isStateOfTheWorld),
map(({ updates }) => updates[0].tradeId),
)

const newCreditTradeIds$ = creditRfqUpdates$.pipe(
// only need to grab ID on Accept
map((update) =>
update.type === QUOTE_ACCEPTED_RFQ_UPDATE ? update.payload.rfqId : 0,
),
map(({ trades }) => trades[0].tradeId),
filter((rfqId) => rfqId > 0),
)

/**
* State hook that emits tradeId of row to highlight for x seconds
* highlighted row will be either from manually updating tradeRowHighlight$ or a new trade
* State hooks that emit tradeId of row to highlight for HIGHLIGHT_ROW_FLASH_TIME ms
* highlighted row will be either from manually updating *TradeRowHighlight$ or a new trade
*/

export const [useFxTradeRowHighlight] = bind(
merge([
fxTradeRowHighlight$,
Expand All @@ -407,8 +399,8 @@ export const [useCreditTradeRowHighlight] = bind(
delay(HIGHLIGHT_ROW_FLASH_TIME),
map(() => undefined),
),
newTradeId$,
newTradeId$.pipe(
newCreditTradeIds$,
newCreditTradeIds$.pipe(
delay(HIGHLIGHT_ROW_FLASH_TIME),
map(() => undefined),
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ vi.mock("../TradesState/tableTrades", async () => {
return {
...tableTrades,
useFilterFields: vi.fn().mockReturnValue([]),
useCreditTradeRowHighlight: vi.fn().mockReturnValue(undefined),
useFxTradeRowHighlight: vi.fn().mockReturnValue(undefined),
}
})
Expand Down
89 changes: 43 additions & 46 deletions packages/client/src/services/credit/creditRfqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import {
scan,
startWith,
switchMap,
tap,
withLatestFrom,
} from "rxjs/operators"

Expand Down Expand Up @@ -60,7 +59,7 @@ export interface PricedQuoteDetails extends Omit<QuoteDetails, "state"> {
state: PricedQuoteState
}

const creditRfqUpdates$ = WorkflowService.subscribe().pipe(
export const creditRfqUpdates$ = WorkflowService.subscribe().pipe(
withConnection(),
shareLatest(),
)
Expand All @@ -72,18 +71,24 @@ export const creditRfqsById$ = creditRfqUpdates$.pipe(
[boolean, Record<number, RfqDetails>]
>(
(acc, [update, instruments, dealers]) => {
const rec = acc[1]
// need to know when we can release updates from this stream
const hasProcessedStateOfTheWorld = acc[0]
// keep record of rfq/trade ID to rfq details .. accepted, rejected etc.
const rfqs = acc[1]

switch (update.type) {
case START_OF_STATE_OF_THE_WORLD_RFQ_UPDATE: {
return [false, {}]
}
case START_OF_STATE_OF_THE_WORLD_RFQ_UPDATE:
// init state of scan is correct for start of SoW
return acc
case END_OF_STATE_OF_THE_WORLD_RFQ_UPDATE:
return [true, rfqs]
case RFQ_CREATED_RFQ_UPDATE:
return [
acc[0],
hasProcessedStateOfTheWorld,
{
...rec,
...rfqs,
[update.payload.id]: {
...rec[update.payload.id],
...rfqs[update.payload.id],
...update.payload,
instrument:
instruments.find(
Expand All @@ -97,21 +102,21 @@ export const creditRfqsById$ = creditRfqUpdates$.pipe(
]
case RFQ_CLOSED_RFQ_UPDATE:
return [
acc[0],
hasProcessedStateOfTheWorld,
{
...rec,
...rfqs,
[update.payload.id]: {
...rec[update.payload.id],
...rfqs[update.payload.id],
...update.payload,
},
},
]
case QUOTE_CREATED_RFQ_UPDATE: {
const previousRfq = rec[update.payload.rfqId]
const previousRfq = rfqs[update.payload.rfqId]
return [
acc[0],
hasProcessedStateOfTheWorld,
{
...rec,
...rfqs,
[update.payload.rfqId]: {
...previousRfq,
dealers: [
Expand All @@ -129,11 +134,11 @@ export const creditRfqsById$ = creditRfqUpdates$.pipe(
]
}
case QUOTE_PASSED_RFQ_UPDATE: {
const previousRfq = rec[update.payload.rfqId]
const previousRfq = rfqs[update.payload.rfqId]
return [
acc[0],
hasProcessedStateOfTheWorld,
{
...rec,
...rfqs,
[update.payload.rfqId]: {
...previousRfq,
quotes: previousRfq.quotes.map((quote) => ({
Expand All @@ -148,11 +153,11 @@ export const creditRfqsById$ = creditRfqUpdates$.pipe(
]
}
case QUOTE_QUOTED_RFQ_UPDATE: {
const previousRfq = rec[update.payload.rfqId]
const previousRfq = rfqs[update.payload.rfqId]
return [
acc[0],
hasProcessedStateOfTheWorld,
{
...rec,
...rfqs,
[update.payload.rfqId]: {
...previousRfq,
quotes: previousRfq.quotes.map((quote) => ({
Expand All @@ -166,40 +171,32 @@ export const creditRfqsById$ = creditRfqUpdates$.pipe(
},
]
}
// Currently the server sets the state of all the other non-passed quotes to
// Rejected if one quote is accepted
case QUOTE_ACCEPTED_RFQ_UPDATE: {
const previousRfq = Object.values(rec).find((rfqDetails) =>
rfqDetails.quotes.some((quote) => quote.id === update.payload.id),
)
if (previousRfq) {
return [
acc[0],
{
...rec,
[previousRfq.id]: {
...previousRfq,
quotes: previousRfq.quotes.map((quote) => ({
...quote,
state: getQuoteStateOnAccept(quote, update.payload),
})),
},
const previousRfq = rfqs[update.payload.rfqId]
// Emulating the server which, in "state of the world" sets the state of
// any non-accepted/passed quotes to one of the rejected states
return [
hasProcessedStateOfTheWorld,
{
...rfqs,
[previousRfq.id]: {
...previousRfq,
quotes: previousRfq.quotes.map((quote) => ({
...quote,
state: getQuoteStateOnAccept(quote, update.payload),
})),
},
]
}
return [acc[0], rec]
}
case END_OF_STATE_OF_THE_WORLD_RFQ_UPDATE: {
return [true, rec]
},
]
}
default:
return acc
}
},
[false, {}],
),
filter(([isEndStateOftheWorld]) => isEndStateOftheWorld),
map(([, rfqDetailsRec]) => rfqDetailsRec),
filter(([hasProcessedStateOfTheWorld]) => hasProcessedStateOfTheWorld),
map(([, rfqs]) => rfqs),
shareLatest(),
)

Expand Down
2 changes: 2 additions & 0 deletions packages/client/src/services/trades/__mocks__/trades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import type { Trade } from "../types"

let _trades$: Observable<Trade[]>

export const tradesStream$ = EMPTY

export const trades$ = defer(() => _trades$)

export const __setTrades = (input: Observable<Trade[]>) => {
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/services/trades/trades.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { creditRfqsById$ } from "../credit"
import { withConnection } from "../withConnection"
import { CreditTrade, FxTrade } from "./types"

const tradesStream$ = BlotterService.getTradeStream().pipe(
export const tradesStream$ = BlotterService.getTradeStream().pipe(
withConnection(),
map(({ isStateOfTheWorld, updates }) => ({
isStateOfTheWorld,
Expand Down

0 comments on commit 15d83c7

Please sign in to comment.