Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(mojaloop/#3819): update functional tests and move fulfil int test #1009

Merged
merged 30 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
e416463
chore: update functional tests
kleyow Apr 24, 2024
2eed6e0
version
kleyow Apr 24, 2024
89c3749
snap
kleyow Apr 24, 2024
b508be7
test
kleyow Apr 24, 2024
433d596
name
kleyow Apr 24, 2024
aa5fc2e
func
kleyow Apr 25, 2024
6ed2ec4
update
kleyow Apr 25, 2024
f258dfe
test-function
kleyow Apr 25, 2024
573d6eb
snap
kleyow Apr 25, 2024
96f0d00
changes
kleyow Apr 25, 2024
5e5beb8
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow Apr 26, 2024
b82cf9f
feat: implemented fx
vijayg10 Apr 29, 2024
20a2e00
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Apr 29, 2024
bfa5af3
fix: unit tests
vijayg10 Apr 29, 2024
5427e6d
fix: unit tests
vijayg10 Apr 29, 2024
ee60d49
chore: removed fx-fulfil in non batch mode
vijayg10 Apr 29, 2024
cf0b78f
add back functions
kleyow Apr 29, 2024
5b8b8bf
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow Apr 29, 2024
cfe0422
feat: refactored position fulfil handler for fx
vijayg10 Apr 30, 2024
0503b79
chore: removed fx from non batch position fulfil
vijayg10 Apr 30, 2024
b939303
chore: removed fx references from non batch position handler
vijayg10 Apr 30, 2024
e386429
chore: simplified existing tests
vijayg10 Apr 30, 2024
3b62678
chore: added unit tests
vijayg10 Apr 30, 2024
60a5615
fix: prepare position fx
vijayg10 Apr 30, 2024
b11fcab
Merge branch 'feat/fx-impl' of https://github.com/mojaloop/central-le…
vijayg10 Apr 30, 2024
06ced6a
Merge remote-tracking branch 'origin/feat/fx-fulfil-position-batching…
kleyow Apr 30, 2024
2fc69ce
publish messages to batch topic
kleyow Apr 30, 2024
ca7c447
update script
kleyow May 2, 2024
0dc4b90
Merge remote-tracking branch 'origin/feat/fx-impl' into test/functional
kleyow May 2, 2024
005c3bc
move fxfulfil tests to batch tests
kleyow May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: implemented fx
  • Loading branch information
vijayg10 committed Apr 29, 2024
commit b82cf9f72a51b8221d37810e2febdbb6f2561064
14 changes: 14 additions & 0 deletions src/domain/position/binProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const BatchPositionModelCached = require('../../models/position/batchCached')
const PositionPrepareDomain = require('./prepare')
const PositionFxPrepareDomain = require('./fx-prepare')
const PositionFulfilDomain = require('./fulfil')
const PositionFxFulfilDomain = require('./fx-fulfil')
const SettlementModelCached = require('../../models/settlement/settlementModelCached')
const Enum = require('@mojaloop/central-services-shared').Enum
const ErrorHandler = require('@mojaloop/central-services-error-handling')
Expand Down Expand Up @@ -127,6 +128,19 @@ const processBins = async (bins, trx) => {
let accumulatedFxTransferStateChanges = []
let accumulatedPositionChanges = []

// If fulfil action found then call processPositionPrepareBin function
// We don't need to change the position for FX transfers. All the position changes happen when actual transfer is done
const fxFulfilActionResult = await PositionFxFulfilDomain.processPositionFxFulfilBin(
accountBin[Enum.Events.Event.Action.FX_RESERVE],
accumulatedFxTransferStates
)

// Update accumulated values
accumulatedFxTransferStates = fxFulfilActionResult.accumulatedFxTransferStates
// Append accumulated arrays
accumulatedFxTransferStateChanges = accumulatedFxTransferStateChanges.concat(fxFulfilActionResult.accumulatedFxTransferStateChanges)
notifyMessages = notifyMessages.concat(fxFulfilActionResult.notifyMessages)

// If fulfil action found then call processPositionPrepareBin function
const fulfilActionResult = await PositionFulfilDomain.processPositionFulfilBin(
[accountBin.commit, accountBin.reserve],
Expand Down
131 changes: 131 additions & 0 deletions src/domain/position/fx-fulfil.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
const { Enum } = require('@mojaloop/central-services-shared')
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Config = require('../../lib/config')
const Utility = require('@mojaloop/central-services-shared').Util
const Logger = require('@mojaloop/central-services-logger')

/**
* @function processPositionFxFulfilBin
*
* @async
* @description This is the domain function to process a bin of position-fx-fulfil messages of a single participant account.
*
* @param {array} binItems - an array of objects that contain a position fx reserve message and its span. {message, span}
* @param {object} accumulatedFxTransferStates - object with fx transfer id keys and transfer state id values. Used to check if transfer is in correct state for processing. Clone and update states for output.
* @param {object} transferInfoList - object with transfer id keys and transfer info values. Used to pass transfer info to domain function.
* @returns {object} - Returns an object containing accumulatedFxTransferStateChanges, accumulatedFxTransferStates, resultMessages, limitAlarms or throws an error if failed
*/
const processPositionFxFulfilBin = async (
binItems,
accumulatedFxTransferStates,
) => {
const fxTransferStateChanges = []
const resultMessages = []
const accumulatedFxTransferStatesCopy = Object.assign({}, accumulatedFxTransferStates)

if (binItems && binItems.length > 0) {
for (const binItem of binItems) {
let fxTransferStateId
let reason
let resultMessage
const commitRequestId = binItem.message.value.content.uriParams.id
const counterPartyFsp = binItem.message.value.from
const initiatingFsp = binItem.message.value.to
const fxTransfer = binItem.decodedPayload
Logger.isDebugEnabled && Logger.debug(`processPositionFxFulfilBin::fxTransfer:processingMessage: ${JSON.stringify(fxTransfer)}`)
Logger.isDebugEnabled && Logger.debug(`accumulatedFxTransferStates: ${JSON.stringify(accumulatedFxTransferStates)}`)
// Inform sender if transfer is not in RECEIVED_FULFIL state, skip making any transfer state changes
if (accumulatedFxTransferStates[commitRequestId] !== Enum.Transfers.TransferInternalState.RECEIVED_FULFIL) {
// forward same headers from the request, except the content-length header
// set destination to counterPartyFsp and source to switch
const headers = { ...binItem.message.value.content.headers }
headers[Enum.Http.Headers.FSPIOP.DESTINATION] = counterPartyFsp
headers[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
delete headers['content-length']

const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(
`Invalid State: ${accumulatedFxTransferStates[commitRequestId]} - expected: ${Enum.Transfers.TransferInternalState.RECEIVED_FULFIL}`
).toApiErrorObject(Config.ERROR_HANDLING)
const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
fspiopError.errorInformation.errorCode,
fspiopError.errorInformation.errorDescription
)

const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
commitRequestId,
Enum.Kafka.Topics.NOTIFICATION,
Enum.Events.Event.Action.FX_FULFIL,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
commitRequestId,
counterPartyFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
metadata,
headers,
fspiopError,
{ id: commitRequestId },
'application/json'
)
} else {
// forward same headers from the prepare message, except the content-length header
const headers = { ...binItem.message.value.content.headers }
delete headers['content-length']

const state = Utility.StreamingProtocol.createEventState(
Enum.Events.EventStatus.SUCCESS.status,
null,
null
)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(
commitRequestId,
Enum.Kafka.Topics.TRANSFER,
Enum.Events.Event.Action.COMMIT,
state
)

resultMessage = Utility.StreamingProtocol.createMessage(
commitRequestId,
initiatingFsp,
counterPartyFsp,
metadata,
headers,
fxTransfer,
{ id: commitRequestId },
'application/json'
)

fxTransferStateId = Enum.Transfers.TransferState.COMMITTED

binItem.result = { success: true }
}

resultMessages.push({ binItem, message: resultMessage })

if (fxTransferStateId) {
const fxTransferStateChange = {
commitRequestId,
fxTransferStateId,
reason
}
fxTransferStateChanges.push(fxTransferStateChange)
Logger.isDebugEnabled && Logger.debug(`processPositionFxFulfilBin::fxTransferStateChange: ${JSON.stringify(fxTransferStateChange)}`)

accumulatedFxTransferStatesCopy[commitRequestId] = fxTransferStateId
Logger.isDebugEnabled && Logger.debug(`processPositionFxFulfilBin::accumulatedTransferStatesCopy:finalizedFxTransferState ${JSON.stringify(fxTransferStateId)}`)
}
}
}

return {
accumulatedTransferStates: accumulatedFxTransferStatesCopy, // finalized fx transfer state after fx-fulfil processing
accumulatedFxTransferStateChanges: fxTransferStateChanges, // fx transfer state changes to be persisted in order
notifyMessages: resultMessages // array of objects containing bin item and result message. {binItem, message}
}
}

module.exports = {
processPositionFxFulfilBin
}
23 changes: 1 addition & 22 deletions src/domain/position/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
'use strict'

const PositionFacade = require('../../models/position/facade')
const { Enum } = require('@mojaloop/central-services-shared')

const Metrics = require('@mojaloop/central-services-metrics')

Expand All @@ -46,38 +45,18 @@ const changeParticipantPosition = (participantCurrencyId, isReversal, amount, tr
return result
}

const changeParticipantPositionFx = (participantCurrencyId, isReversal, amount, fxTransferStateChange) => {
const histTimerChangeParticipantPositionEnd = Metrics.getHistogram(
'fx_domain_position',
'changeParticipantPositionFx - Metrics for transfer domain',
['success', 'funcName']
).startTimer()
const result = PositionFacade.changeParticipantPositionTransactionFx(participantCurrencyId, isReversal, amount, fxTransferStateChange)
histTimerChangeParticipantPositionEnd({ success: true, funcName: 'changeParticipantPositionFx' })
return result
}

const calculatePreparePositionsBatch = async (transferList) => {
const histTimerPositionBatchDomainEnd = Metrics.getHistogram(
'domain_position',
'calculatePreparePositionsBatch - Metrics for transfer domain',
['success', 'funcName']
).startTimer()
let result
const action = transferList[0]?.value.metadata.event.action
if (action === Enum.Events.Event.Action.FX_PREPARE) {
// FX transfer
result = PositionFacade.prepareChangeParticipantPositionTransactionFx(transferList)
} else {
// Standard transfer
result = PositionFacade.prepareChangeParticipantPositionTransaction(transferList)
}
const result = PositionFacade.prepareChangeParticipantPositionTransaction(transferList)
histTimerPositionBatchDomainEnd({ success: true, funcName: 'calculatePreparePositionsBatch' })
return result
}

module.exports = {
changeParticipantPosition,
changeParticipantPositionFx,
calculatePreparePositionsBatch
}
5 changes: 1 addition & 4 deletions src/handlers/positions/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,7 @@ const positions = async (error, messages) => {
Logger.isInfoEnabled && Logger.info(Utility.breadcrumb(location, `payerNotifyInsufficientLiquidity--${actionLetter}2`))
const responseFspiopError = fspiopError || ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR)
const fspiopApiError = responseFspiopError.toApiErrorObject(Config.ERROR_HANDLING)
// TODO: log error incase of fxTransfer to a new table like fxTransferError
if (action !== Enum.Events.Event.Action.FX_PREPARE) {
await TransferService.logTransferError(transferId, fspiopApiError.errorInformation.errorCode, fspiopApiError.errorInformation.errorDescription)
}
await TransferService.logTransferError(transferId, fspiopApiError.errorInformation.errorCode, fspiopApiError.errorInformation.errorDescription)
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopApiError, eventDetail, fromSwitch })
throw responseFspiopError
}
Expand Down