Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mojaloop/project#2553): send extra notification back to payee when a transfer is aborted #872

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
1e75b98
refactor: make actionLetter and functionality more readable
lewisdaly Nov 10, 2021
3476ae0
refactor: make fulfil more readable, remove redundant else blocks
lewisdaly Nov 10, 2021
d8b35ed
refactor: make fulfil more readable, remove redundant else blocks
lewisdaly Nov 10, 2021
81d27f7
refactor: make fulfil more readable, remove redundant else blocks
lewisdaly Nov 10, 2021
2ea02a7
feat: add placeholder FINAL_STATUS_FAILED event entrypoints
lewisdaly Nov 10, 2021
0451998
Merge branch 'master' of github.com:mojaloop/central-ledger into feat…
lewisdaly Nov 16, 2021
045341f
chore(todos): add ticket numbers to todo statements
lewisdaly Nov 16, 2021
69df0c0
chore(deps): rename central-object-store to object-store-lib
lewisdaly Nov 16, 2021
b6372b8
chore(deps): update object-store-lib to `11.1.1`
lewisdaly Nov 17, 2021
c382a20
feat: add extra notification when the RESERVED action fals on the ful…
lewisdaly Nov 17, 2021
85d6b35
feat: adding unit tests for RESERVED_ABORTED notification
lewisdaly Nov 17, 2021
1ad585f
feat: adding unit tests for RESERVED_ABORTED notification
lewisdaly Nov 17, 2021
639396e
feat: clean up extra tests
lewisdaly Nov 17, 2021
6aa55a2
chore(refactor): remove commented test code
lewisdaly Nov 17, 2021
f307030
chore(refactor): remove commented test code
lewisdaly Nov 17, 2021
0467fa9
feat(int): working on handlers integration tests
lewisdaly Nov 18, 2021
a8fb745
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 18, 2021
b6a0010
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 18, 2021
a9fdbac
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 18, 2021
c0b32dd
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 18, 2021
7be5fb4
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 18, 2021
03d55dd
feat(int): working on testConsumer to monitor kafka events
lewisdaly Nov 22, 2021
4a0b520
feat(int): debugging open handles issue
lewisdaly Nov 22, 2021
a2d738e
feat(int): reenable disabled tests
lewisdaly Nov 23, 2021
828125b
feat(int): fix flaky test
lewisdaly Nov 23, 2021
271f0c0
feat(int): fix flaky test
lewisdaly Nov 23, 2021
872252f
fix(unit): fix basic unit test
lewisdaly Nov 23, 2021
9ec7e29
Merge branch 'master' of github.com:mojaloop/central-ledger into feat…
lewisdaly Nov 23, 2021
0d9fa92
feat(handlers): fixing up completedTimestamp in notification
lewisdaly Nov 23, 2021
23aaad7
feat(handlers): fixing up completedTimestamp in notification
lewisdaly Nov 23, 2021
06b3aca
fix(vulns): run npm audit, ignore unfixable medium vulns for 1 month
lewisdaly Nov 23, 2021
1216307
fix(vulns): run npm audit, ignore unfixable medium vulns for 1 month
lewisdaly Nov 23, 2021
ece14fd
feat(unit): implement tests for other notifications
lewisdaly Nov 23, 2021
8a6a4a1
feat(unit): implement tests for other notifications
lewisdaly Nov 23, 2021
2651e4e
feat(unit): implement tests for other notifications
lewisdaly Nov 23, 2021
61f912a
chore: appease linting gods
lewisdaly Nov 23, 2021
17aeec0
chore: remove unused code, unused devDependency
lewisdaly Nov 23, 2021
157d4ee
feat(int): add event notification consumer config to integration test…
lewisdaly Nov 23, 2021
aa99f2d
fix(int): integration test runner on ci
lewisdaly Nov 24, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat(int): working on testConsumer to monitor kafka events
  • Loading branch information
lewisdaly committed Nov 22, 2021
commit 03d55ddc333d7bf99a4bdc62003774021a8e4131
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
"@mojaloop/central-services-health": "13.0.0",
"@mojaloop/central-services-logger": "10.6.2",
"@mojaloop/central-services-metrics": "11.0.0",
"@mojaloop/central-services-shared": "15.0.1",
"@mojaloop/central-services-shared": "git+https://github.com/vessels-tech/central-services-shared.git#feat/2553-patch-notification-3",
"@mojaloop/central-services-stream": "10.7.0",
"@mojaloop/event-sdk": "10.7.1",
"@mojaloop/ml-number": "11.2.1",
Expand Down
108 changes: 65 additions & 43 deletions src/handlers/transfers/handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -490,55 +490,77 @@ const fulfil = async (error, messages) => {
// emit an extra message - RESERVED_ABORTED if action === TransferEventAction.RESERVE
if (action === TransferEventAction.RESERVE) {
Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackReservedAborted--${actionLetter}1`))
const metadataState = Util.StreamingProtocol.createEventState(
Enum.Events.EventStatus.FAILURE.status,
apiFspiopError.errorInformation.errorCode,
apiFspiopError.errorInformation.errorDescription
)
const metadata = {
event: {
// TODO: These seem to have no effect - they are overridden by
// some crazy logic in Kafka.produceGeneralMessage
type: TransferEventType.FULFIL,
action: TransferEventAction.RESERVED_ABORTED,
state: metadataState
}
const eventDetail = { functionality: TransferEventType.NOTIFICATION, action: TransferEventAction.RESERVED_ABORTED }
const abortParams = {
// TODO: can we just use the original message?
message,
kafkaTopic,
span,
consumer: Consumer,
producer: Producer,
}
const reservedAbortedPayload = {
transferId: transfer.id,
//TODO: get this time from the db - don't be lazy
completedTimestamp: Util.Time.getUTCString(new Date()),
transferState: TransferState.ABORTED
}

// clone the headers, and change the FSPIOP-Source and FSPIOP-Destination
const reservedAbortedHeaders = JSON.parse(JSON.stringify(headers))
reservedAbortedHeaders[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payeeFsp
reservedAbortedHeaders[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
const reservedAbortedMessage = Util.StreamingProtocol.createMessage(
transferId,
transfer.payeeFsp,
Enum.Http.Headers.FSPIOP.SWITCH.value,
metadata,
reservedAbortedHeaders,
reservedAbortedPayload,
)

console.log('Kafka.produceGeneralMessage - ', JSON.stringify(reservedAbortedMessage))
await Kafka.produceGeneralMessage(
Config.KAFKA_CONFIG,
Producer,
Enum.Events.Event.Type.NOTIFICATION,
// TODO: should this be EVENT? or should we make a new topic for RESERVED_ABORTED?
// I can't seem to find how to send a message to the `topic-notification-event` topic
// that has the reservedAbortedMessage.metadata.event.action of RESERVED_ABORTED
// TransferEventAction.RESERVED_ABORTED,
//
// produceGeneralMessage overrides the reservedAbortedMessage.metadata.event.action
// for some reason.
TransferEventAction.EVENT,
reservedAbortedMessage,
metadataState
)
message.value.content.payload = reservedAbortedPayload
await Kafka.proceed(Config.KAFKA_CONFIG, abortParams, { consumerCommit, eventDetail, toDestination: transfer.payeeFsp, fromSwitch: true })


// Logger.isInfoEnabled && Logger.info(Util.breadcrumb(location, `callbackReservedAborted--${actionLetter}1`))
// const metadataState = Util.StreamingProtocol.createEventState(
// Enum.Events.EventStatus.FAILURE.status,
// apiFspiopError.errorInformation.errorCode,
// apiFspiopError.errorInformation.errorDescription
// )
// //TODO: replace this call with createEventMetadata
// const metadata = {
// event: {
// // TODO: These seem to have no effect - they are overridden by
// // some crazy logic in Kafka.produceGeneralMessage
// type: TransferEventType.FULFIL,
// action: TransferEventAction.RESERVED_ABORTED,
// state: metadataState
// // TODO: add span/trace stuff in here?
// }
// }


// // clone the headers, and change the FSPIOP-Source and FSPIOP-Destination
// const reservedAbortedHeaders = Util.clone(headers)
// reservedAbortedHeaders[Enum.Http.Headers.FSPIOP.DESTINATION] = transfer.payeeFsp
// reservedAbortedHeaders[Enum.Http.Headers.FSPIOP.SOURCE] = Enum.Http.Headers.FSPIOP.SWITCH.value
// const reservedAbortedMessage = Util.StreamingProtocol.createMessage(
// // TODO: should this be the transferId or a messageId
// transferId,
// transfer.payeeFsp,
// Enum.Http.Headers.FSPIOP.SWITCH.value,
// metadata,
// reservedAbortedHeaders,
// reservedAbortedPayload,
// )

// console.log('Kafka.produceGeneralMessage - ', JSON.stringify(reservedAbortedMessage))
// await Kafka.produceGeneralMessage(
// Config.KAFKA_CONFIG,
// Producer,
// Enum.Events.Event.Type.NOTIFICATION,
// // TODO: should this be EVENT? or should we make a new topic for RESERVED_ABORTED?
// // I can't seem to find how to send a message to the `topic-notification-event` topic
// // that has the reservedAbortedMessage.metadata.event.action of RESERVED_ABORTED
// // TransferEventAction.RESERVED_ABORTED,
// //
// // produceGeneralMessage overrides the reservedAbortedMessage.metadata.event.action
// // for some reason.
// TransferEventAction.RESERVED_ABORTED,
// // TransferEventAction.EVENT,
// reservedAbortedMessage,
// metadataState

// // TODO: missing tracing here - pass the span object in
// )
}
throw fspiopError
}
Expand Down
1 change: 0 additions & 1 deletion src/models/transfer/transferDuplicateCheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ const getTransferDuplicateCheck = async (transferId) => {
Logger.isDebugEnabled && Logger.debug(`get transferDuplicateCheck (transferId=${transferId})`)
try {
const result = Db.from('transferDuplicateCheck').findOne({ transferId })
console.log("getTransferDuplicateCheck", result)
histTimerGetTransferDuplicateCheckEnd({ success: true, queryName: 'transferDuplicateCheck_getTransferDuplicateCheck' })
return result
} catch (err) {
Expand Down
21 changes: 9 additions & 12 deletions test/integration/handlers/handlers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -388,23 +388,20 @@ Test('Handlers test', async handlersTest => {

const transferStateIsAbortedError = await wrapWithRetries(async () => {
const transfer = await TransferService.getById(td.messageProtocolPrepare.content.payload.transferId)
if (transfer.transferState !== 'ABORTED_ERROR') {
return false
}
return true
return transfer.transferState === 'ABORTED_ERROR'
})
test.equal(transferStateIsAbortedError, true, 'Transfer is in ABORTED_ERROR state')

// Assert
// 3. Check that we sent 2 notifications to kafka - one for the Payee, one for the Payer
await currentEventLoopEnd()
const payerAbortNotification = testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'abort-validation' })[0]
// TODO: I don't think this action should be event
// There seem to be some inconsistiences with how different kafka messages are produced
const payeeAbortNotification = testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'event' })[0]
console.log('payerAbortNotification are', JSON.stringify(payerAbortNotification))
console.log('payeeAbortNotification are', JSON.stringify(payeeAbortNotification))

const payerAbortNotification = (await wrapWithRetries(
() => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'abort-validation' }))
)[0]
const payeeAbortNotification = (await wrapWithRetries(
() => testConsumer.getEventsForFilter({ topicFilter: 'topic-notification-event', action: 'reserved-aborted' }))
)[0]
test.ok(payerAbortNotification, 'Payer Abort notification sent')
test.ok(payeeAbortNotification, 'Payee Abort notification sent')

// Cleanup
testConsumer.clearEvents()
Expand Down