Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(mojaloop/#3819): update functional tests and move fulfil int test #1009

Merged
merged 30 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e416463
chore: update functional tests
kleyow Apr 24, 2024
2eed6e0
version
kleyow Apr 24, 2024
89c3749
snap
kleyow Apr 24, 2024
b508be7
test
kleyow Apr 24, 2024
433d596
name
kleyow Apr 24, 2024
aa5fc2e
func
kleyow Apr 25, 2024
6ed2ec4
update
kleyow Apr 25, 2024
f258dfe
test-function
kleyow Apr 25, 2024
573d6eb
snap
kleyow Apr 25, 2024
96f0d00
changes
kleyow Apr 25, 2024
5e5beb8
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow Apr 26, 2024
b82cf9f
feat: implemented fx
vijayg10 Apr 29, 2024
20a2e00
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Apr 29, 2024
bfa5af3
fix: unit tests
vijayg10 Apr 29, 2024
5427e6d
fix: unit tests
vijayg10 Apr 29, 2024
ee60d49
chore: removed fx-fulfil in non batch mode
vijayg10 Apr 29, 2024
cf0b78f
add back functions
kleyow Apr 29, 2024
5b8b8bf
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow Apr 29, 2024
cfe0422
feat: refactored position fulfil handler for fx
vijayg10 Apr 30, 2024
0503b79
chore: removed fx from non batch position fulfil
vijayg10 Apr 30, 2024
b939303
chore: removed fx references from non batch position handler
vijayg10 Apr 30, 2024
e386429
chore: simplified existing tests
vijayg10 Apr 30, 2024
3b62678
chore: added unit tests
vijayg10 Apr 30, 2024
60a5615
fix: prepare position fx
vijayg10 Apr 30, 2024
b11fcab
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Apr 30, 2024
06ced6a
Merge remote-tracking branch 'origin/feat/fx-fulfil-position-batching…
kleyow Apr 30, 2024
2fc69ce
publish messages to batch topic
kleyow Apr 30, 2024
ca7c447
update script
kleyow May 2, 2024
0dc4b90
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow May 2, 2024
005c3bc
move fxfulfil tests to batch tests
kleyow May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update script
  • Loading branch information
kleyow committed May 2, 2024
commit ca7c447a950b7e7bbe19c79fa36477a8f878911a
4 changes: 3 additions & 1 deletion audit-ci.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"GHSA-w5p7-h5w8-2hfq", // tap-spec>tap-out>trim
"GHSA-p9pc-299p-vxgp", // widdershins>yargs>yargs-parser
"GHSA-f5x3-32g6-xq36", // https://github.com/advisories/GHSA-f5x3-32g6-xq36
"GHSA-cgfm-xwp7-2cvr" // https://github.com/advisories/GHSA-cgfm-xwp7-2cvr
"GHSA-cgfm-xwp7-2cvr", // https://github.com/advisories/GHSA-cgfm-xwp7-2cvr
"GHSA-ghr5-ch3p-vcr6" // https://github.com/advisories/GHSA-ghr5-ch3p-vcr6

]
}
264 changes: 0 additions & 264 deletions src/models/position/facade.js
Original file line number Diff line number Diff line change
Expand Up @@ -376,273 +376,9 @@ const getAllByNameAndCurrency = async (name, currencyId = null) => {
}
}

const prepareChangeParticipantPositionTransactionFx = async (transferList) => {
const histTimerChangeParticipantPositionEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx - Metrics for position model',
['success', 'queryName']
).startTimer()
try {
const knex = await Db.getKnex()

const { participantName, currencyId } = transferList[0].value.content.context.cyrilResult

const allSettlementModels = await SettlementModelCached.getAll()
let settlementModels = allSettlementModels.filter(model => model.currencyId === currencyId)
if (settlementModels.length === 0) {
settlementModels = allSettlementModels.filter(model => model.currencyId === null) // Default settlement model
if (settlementModels.length === 0) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.GENERIC_SETTLEMENT_ERROR, 'Unable to find a matching or default, Settlement Model')
}
}
const settlementModel = settlementModels.find(sm => sm.ledgerAccountTypeId === Enum.Accounts.LedgerAccountType.POSITION)
const participantCurrency = await participantFacade.getByNameAndCurrency(participantName, currencyId, Enum.Accounts.LedgerAccountType.POSITION)
const settlementParticipantCurrency = await participantFacade.getByNameAndCurrency(participantName, currencyId, settlementModel.settlementAccountTypeId)
const processedTransfers = {} // The list of processed transfers - so that we can store the additional information around the decision. Most importantly the "running" position
const reservedTransfers = []
const abortedTransfers = []
const initialTransferStateChangePromises = []
const commitRequestIdList = []
const limitAlarms = []
let sumTransfersInBatch = 0
const histTimerChangeParticipantPositionTransEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction - Metrics for position model',
['success', 'queryName']
).startTimer()
await knex.transaction(async (trx) => {
try {
const transactionTimestamp = Time.getUTCString(new Date())
for (const transfer of transferList) {
const id = transfer.value.content.payload.commitRequestId
commitRequestIdList.push(id)
// DUPLICATE of TransferStateChangeModel getByTransferId
initialTransferStateChangePromises.push(await knex('fxTransferStateChange').transacting(trx).where('commitRequestId', id).orderBy('fxTransferStateChangeId', 'desc').first())
}
const histTimerinitialTransferStateChangeListEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_initialTransferStateChangeList - Metrics for position model',
['success', 'queryName']
).startTimer()
const initialTransferStateChangeList = await Promise.all(initialTransferStateChangePromises)
histTimerinitialTransferStateChangeListEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_initialTransferStateChangeList' })
const histTimerTransferStateChangePrepareAndBatchInsertEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_transferStateChangeBatchInsert - Metrics for position model',
['success', 'queryName']
).startTimer()
for (const id in initialTransferStateChangeList) {
const transferState = initialTransferStateChangeList[id]
const transfer = transferList[id].value.content.payload
const rawMessage = transferList[id]
if (transferState.transferStateId === Enum.Transfers.TransferInternalState.RECEIVED_PREPARE) {
transferState.fxTransferStateChangeId = null
transferState.transferStateId = Enum.Transfers.TransferState.RESERVED
let transferAmount
if (transfer.targetAmount.currency === currencyId) {
transferAmount = new MLNumber(transfer.targetAmount.amount)
} else {
transferAmount = new MLNumber(transfer.sourceAmount.amount)
}
reservedTransfers[transfer.commitRequestId] = { transferState, transfer, rawMessage, transferAmount }
sumTransfersInBatch = new MLNumber(sumTransfersInBatch).add(transferAmount).toFixed(Config.AMOUNT.SCALE)
} else {
transferState.fxTransferStateChangeId = null
transferState.transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
transferState.reason = 'Transfer in incorrect state'
abortedTransfers[transfer.commitRequestId] = { transferState, transfer, rawMessage }
}
}
const abortedTransferStateChangeList = Object.keys(abortedTransfers).length && Array.from(commitRequestIdList.map(id => abortedTransfers[id].transferState))
Object.keys(abortedTransferStateChangeList).length && await knex.batchInsert('fxTransferStateChange', abortedTransferStateChangeList).transacting(trx)
histTimerTransferStateChangePrepareAndBatchInsertEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_transferStateChangeBatchInsert' })
// Get the effective position for this participantCurrency at the start of processing the Batch
// and reserved the total value of the transfers in the batch (sumTransfersInBatch)
const histTimerUpdateEffectivePositionEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_UpdateEffectivePosition - Metrics for position model',
['success', 'queryName']
).startTimer()
const participantPositions = await knex('participantPosition')
.transacting(trx)
.whereIn('participantCurrencyId', [participantCurrency.participantCurrencyId, settlementParticipantCurrency.participantCurrencyId])
.forUpdate()
.select('*')
const initialParticipantPosition = participantPositions.find(position => position.participantCurrencyId === participantCurrency.participantCurrencyId)
const settlementParticipantPosition = participantPositions.find(position => position.participantCurrencyId === settlementParticipantCurrency.participantCurrencyId)
const currentPosition = new MLNumber(initialParticipantPosition.value)
const reservedPosition = new MLNumber(initialParticipantPosition.reservedValue)
const effectivePosition = currentPosition.add(reservedPosition).toFixed(Config.AMOUNT.SCALE)
initialParticipantPosition.reservedValue = new MLNumber(initialParticipantPosition.reservedValue).add(sumTransfersInBatch).toFixed(Config.AMOUNT.SCALE)
initialParticipantPosition.changedDate = transactionTimestamp
await knex('participantPosition').transacting(trx).where({ participantPositionId: initialParticipantPosition.participantPositionId }).update(initialParticipantPosition)
histTimerUpdateEffectivePositionEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_UpdateEffectivePosition' })
// Get the actual position limit and calculate the available position for the transfers to use in this batch
// Note: see optimisation decision notes to understand the justification for the algorithm
const histTimerValidatePositionBatchEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_ValidatePositionBatch - Metrics for position model',
['success', 'queryName']
).startTimer()
const participantLimit = await participantFacade.getParticipantLimitByParticipantCurrencyLimit(participantCurrency.participantId, participantCurrency.currencyId, Enum.Accounts.LedgerAccountType.POSITION, Enum.Accounts.ParticipantLimitType.NET_DEBIT_CAP)

const liquidityCover = new MLNumber(settlementParticipantPosition.value).multiply(-1)
const payerLimit = new MLNumber(participantLimit.value)
const availablePositionBasedOnLiquidityCover = liquidityCover.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)
const availablePositionBasedOnPayerLimit = payerLimit.subtract(effectivePosition).toFixed(Config.AMOUNT.SCALE)
/* Validate entire batch if availablePosition >= sumTransfersInBatch - the impact is that applying per transfer rules would require to be handled differently
since further rules are expected we do not do this at this point
As we enter this next step the order in which the transfer is processed against the Position is critical.
Both positive and failure cases need to recorded in processing order
This means that they should not be removed from the list, and the participantPosition
*/
let sumReserved = 0 // Record the sum of the transfers we allow to progress to RESERVED
for (const id in reservedTransfers) {
const { transfer, transferState, rawMessage, transferAmount } = reservedTransfers[id]
if (new MLNumber(availablePositionBasedOnLiquidityCover).toNumber() < transferAmount.toNumber()) {
transferState.transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
transferState.reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message
reservedTransfers[id].fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY, null, null, null, rawMessage.value.content.payload.extensionList)
rawMessage.value.content.payload = reservedTransfers[id].fspiopError.toApiErrorObject(Config.ERROR_HANDLING)
} else if (new MLNumber(availablePositionBasedOnPayerLimit).toNumber() < transferAmount.toNumber()) {
transferState.transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
transferState.reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR.message
reservedTransfers[id].fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_LIMIT_ERROR, null, null, null, rawMessage.value.content.payload.extensionList)
rawMessage.value.content.payload = reservedTransfers[id].fspiopError.toApiErrorObject(Config.ERROR_HANDLING)
} else {
transferState.transferStateId = Enum.Transfers.TransferState.RESERVED
sumReserved = new MLNumber(sumReserved).add(transferAmount).toFixed(Config.AMOUNT.SCALE) /* actually used */
}
const runningPosition = new MLNumber(currentPosition).add(sumReserved).toFixed(Config.AMOUNT.SCALE) /* effective position */
const runningReservedValue = new MLNumber(sumTransfersInBatch).subtract(sumReserved).toFixed(Config.AMOUNT.SCALE)
processedTransfers[id] = { transferState, transfer, rawMessage, transferAmount, runningPosition, runningReservedValue }
}
histTimerValidatePositionBatchEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_ValidatePositionBatch' })
const histTimerUpdateParticipantPositionEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_UpdateParticipantPosition - Metrics for position model',
['success', 'queryName']
).startTimer()
/*
Update the participantPosition with the eventual impact of the Batch
So the position moves forward by the sum of the transfers actually reserved (sumReserved)
and the reserved amount is cleared of the we reserved in the first instance (sumTransfersInBatch)
*/
const processedPositionValue = currentPosition.add(sumReserved)
await knex('participantPosition').transacting(trx).where({ participantPositionId: initialParticipantPosition.participantPositionId }).update({
value: processedPositionValue.toFixed(Config.AMOUNT.SCALE),
reservedValue: new MLNumber(initialParticipantPosition.reservedValue).subtract(sumTransfersInBatch).toFixed(Config.AMOUNT.SCALE),
changedDate: transactionTimestamp
})
// TODO this limit needs to be clarified
if (processedPositionValue.toNumber() > liquidityCover.multiply(participantLimit.thresholdAlarmPercentage).toNumber()) {
limitAlarms.push(participantLimit)
}
histTimerUpdateParticipantPositionEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_UpdateParticipantPosition' })
/*
Persist the transferStateChanges and associated participantPositionChange entry to record the running position
The transferStateChanges need to be persisted first (by INSERTing) to have the PK reference
*/
const histTimerPersistTransferStateChangeEnd = Metrics.getHistogram(
'fx_model_position',
'facade_prepareChangeParticipantPositionTransactionFx_transaction_PersistTransferState - Metrics for position model',
['success', 'queryName']
).startTimer()
await knex('fxTransfer').transacting(trx).forUpdate().whereIn('commitRequestId', commitRequestIdList).select('*')
const processedTransferStateChangeList = Object.keys(processedTransfers).length && Array.from(commitRequestIdList.map(id => processedTransfers[id].transferState))
const processedTransferStateChangeIdList = processedTransferStateChangeList && Object.keys(processedTransferStateChangeList).length && await knex.batchInsert('fxTransferStateChange', processedTransferStateChangeList).transacting(trx)
const processedTransfersKeysList = Object.keys(processedTransfers)
const batchParticipantPositionChange = []
for (const keyIndex in processedTransfersKeysList) {
const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const participantPositionChange = {
participantPositionId: initialParticipantPosition.participantPositionId,
fxTransferStateChangeId: processedTransferStateChangeIdList[keyIndex],
value: runningPosition,
// processBatch: <uuid> - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped
reservedValue: runningReservedValue
}
batchParticipantPositionChange.push(participantPositionChange)
}
batchParticipantPositionChange.length && await knex.batchInsert('participantPositionChange', batchParticipantPositionChange).transacting(trx)
histTimerPersistTransferStateChangeEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction_PersistTransferState' })
await trx.commit()
histTimerChangeParticipantPositionTransEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction' })
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
await trx.rollback()
histTimerChangeParticipantPositionTransEnd({ success: false, queryName: 'facade_prepareChangeParticipantPositionTransactionFx_transaction' })
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
})
const preparedMessagesList = Array.from(commitRequestIdList.map(id =>
id in processedTransfers
? reservedTransfers[id]
: abortedTransfers[id]
))
histTimerChangeParticipantPositionEnd({ success: true, queryName: 'facade_prepareChangeParticipantPositionTransactionFx' })
return { preparedMessagesList, limitAlarms }
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
histTimerChangeParticipantPositionEnd({ success: false, queryName: 'facade_prepareChangeParticipantPositionTransactionFx' })
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

const changeParticipantPositionTransactionFx = async (participantCurrencyId, isReversal, amount, fxTransferStateChange) => {
const histTimerChangeParticipantPositionTransactionEnd = Metrics.getHistogram(
'fx_model_position',
'facade_changeParticipantPositionTransactionFx - Metrics for position model',
['success', 'queryName']
).startTimer()
try {
const knex = await Db.getKnex()
await knex.transaction(async (trx) => {
try {
const transactionTimestamp = Time.getUTCString(new Date())
fxTransferStateChange.createdDate = transactionTimestamp
const participantPosition = await knex('participantPosition').transacting(trx).where({ participantCurrencyId }).forUpdate().select('*').first()
let latestPosition
if (isReversal) {
latestPosition = new MLNumber(participantPosition.value).subtract(amount)
} else {
latestPosition = new MLNumber(participantPosition.value).add(amount)
}
latestPosition = latestPosition.toFixed(Config.AMOUNT.SCALE)
await knex('participantPosition').transacting(trx).where({ participantCurrencyId }).update({
value: latestPosition,
changedDate: transactionTimestamp
})
await knex('fxTransferStateChange').transacting(trx).insert(fxTransferStateChange)
const insertedFxTransferStateChange = await knex('fxTransferStateChange').transacting(trx).where({ commitRequestId: fxTransferStateChange.commitRequestId }).forUpdate().first().orderBy('fxTransferStateChangeId', 'desc')
const participantPositionChange = {
participantPositionId: participantPosition.participantPositionId,
fxTransferStateChangeId: insertedFxTransferStateChange.fxTransferStateChangeId,
value: latestPosition,
reservedValue: participantPosition.reservedValue,
createdDate: transactionTimestamp
}
await knex('participantPositionChange').transacting(trx).insert(participantPositionChange)
await trx.commit()
histTimerChangeParticipantPositionTransactionEnd({ success: true, queryName: 'facade_changeParticipantPositionTransactionFx' })
} catch (err) {
await trx.rollback()
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}).catch((err) => {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
})
} catch (err) {
Logger.isErrorEnabled && Logger.error(err)
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}

module.exports = {
changeParticipantPositionTransaction,
changeParticipantPositionTransactionFx,
prepareChangeParticipantPositionTransaction,
prepareChangeParticipantPositionTransactionFx,
getByNameAndCurrency,
getAllByNameAndCurrency
}
2 changes: 1 addition & 1 deletion test/scripts/test-functional.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ echo "--=== Running Functional Test Runner ===--"
echo

CENTRAL_LEDGER_VERSION=${CENTRAL_LEDGER_VERSION:-"local"}
ML_CORE_TEST_HARNESS_VERSION=${ML_CORE_TEST_HARNESS_VERSION:-"v1.2.4-fx-snapshot.10"}
ML_CORE_TEST_HARNESS_VERSION=${ML_CORE_TEST_HARNESS_VERSION:-"v1.2.4-fx-snapshot.11"}
ML_CORE_TEST_HARNESS_GIT=${ML_CORE_TEST_HARNESS_GIT:-"https://github.com/mojaloop/ml-core-test-harness.git"}
ML_CORE_TEST_HARNESS_TEST_PROV_CONT_NAME=${ML_CORE_TEST_HARNESS_TEST_PROV_CONT_NAME:-"ttk-func-ttk-provisioning-fx-1"}
ML_CORE_TEST_HARNESS_TEST_FUNC_CONT_NAME=${ML_CORE_TEST_HARNESS_TEST_FUNC_CONT_NAME:-"ttk-func-ttk-fx-tests-1"}
Expand Down