Skip to content

Commit

Permalink
feat: add argument to add context to kafka message (#404)
Browse files Browse the repository at this point in the history
* feat: add argument to add context to kafka message

* test
  • Loading branch information
kleyow authored Oct 14, 2024
1 parent f415682 commit 4d8f8de
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 56 deletions.
95 changes: 46 additions & 49 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,16 @@
"uuid4": "2.0.3",
"ulidx": "2.4.1",
"widdershins": "^4.0.1",
"yaml": "2.5.1"
"yaml": "2.6.0"
},
"devDependencies": {
"@hapi/hapi": "21.3.10",
"@hapi/joi": "17.1.1",
"audit-ci": "^7.1.0",
"base64url": "3.0.1",
"chance": "1.1.12",
"npm-check-updates": "17.1.2",
"nyc": "17.0.0",
"npm-check-updates": "17.1.3",
"nyc": "17.1.0",
"pre-commit": "1.2.2",
"proxyquire": "2.1.3",
"replace": "^1.2.2",
Expand Down
11 changes: 7 additions & 4 deletions src/util/streaming/protocol/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ const updateMessageProtocolMetadata = (messageProtocol, metadataType, metadataAc
* @param {object} headers - headers from the request
* @param {object} uriParams - the URI parameters passed in request.
* @param {string} type - the message type for the LIME message, defaults to 'application/json'
* @param {object} context - any additional context information
*
* @returns {object} - Returns generated messageProtocol
*/
const createMessage = (id, to, from, metadata, headers, payload, uriParams = undefined, type = undefined) => {
const createMessage = (id, to, from, metadata, headers, payload, uriParams = undefined, type = undefined, context = undefined) => {
return {
id,
to,
Expand All @@ -88,7 +89,8 @@ const createMessage = (id, to, from, metadata, headers, payload, uriParams = und
headers,
payload: payload || {}
},
metadata
metadata,
context
}
}

Expand All @@ -102,11 +104,12 @@ const createMessage = (id, to, from, metadata, headers, payload, uriParams = und
* @param {string} to - the action flow. Example: 'prepare'
* @param {string} from - the state of the message being passed.
* @param {object} metadata - The metadata for streaming
* @param {object} context - any additional context information
*
* @returns {object} - Returns generated messageProtocol
*/
const createMessageFromRequest = (id, request, to, from, metadata) => {
return createMessage(id, to, from, metadata, request.headers, request.dataUri, request.params)
const createMessageFromRequest = (id, request, to, from, metadata, context) => {
return createMessage(id, to, from, metadata, request.headers, request.dataUri, request.params, undefined, context)
}

/**
Expand Down
57 changes: 57 additions & 0 deletions test/unit/util/streaming/protocol/index.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,41 @@ Test('Utility Test', utilityTest => {
test.end()
})

createMessageFromRequestTest.test('adds context object to message', (test) => {
const state = {
status: 'status',
code: 1,
description: 'description'
}
const event = {
type: Enum.Events.Event.Type.FULFIL,
action: Enum.Events.Event.Action.COMMIT,
state
}
const dataUri = 'data:application/json;base64,eyJlcnJvckluZm9ybWF0aW9uIjp7ImVycm9yQ29kZSI6IjUyMDAiLCJlcnJvckRlc2NyaXB0aW9uIjoiR2VuZXJpYyBsaW1pdCBlcnJvciwgYW1vdW50ICYgcGF5bWVudHMgdGhyZXNob2xkLiJ9fQ'
const correlationId = 'c74b826d-3c0b-4cfd-8ec1-08cc4343fe8c'
const metadata = StreamingProtocol.createMetadataWithCorrelatedEvent(correlationId, event.type, event.action, state)
const to = 'fsp1'
const from = 'fsp2'
const headers = Helper.defaultHeaders(to, 'participants', from)
const request = {
dataUri,
headers
}
const isoContext = { iso20022: { key: 'value' } }
const message = StreamingProtocol.createMessageFromRequest(
correlationId,
request,
to,
from,
metadata,
{
iso20022: { key: 'value' }
})
test.deepEqual(message.context, isoContext)
test.end()
})

createMessageFromRequestTest.end()
})

Expand Down Expand Up @@ -349,6 +384,28 @@ Test('Utility Test', utilityTest => {
test.end()
})

createMessageTest.test('add context object to message', (test) => {
const state = {
status: 'status',
code: 1,
description: 'description'
}
const event = {
type: Enum.Events.Event.Type.FULFIL,
action: Enum.Events.Event.Action.COMMIT,
state
}
const correlationId = 'c74b826d-3c0b-4cfd-8ec1-08cc4343fe8c'
const metadata = StreamingProtocol.createMetadataWithCorrelatedEvent(correlationId, event.type, event.action, state)
const to = 'fsp1'
const from = 'fsp2'
const headers = Helper.defaultHeaders(to, 'participants', from)
const isoContext = { iso20022: { key: 'value' } }
const message = StreamingProtocol.createMessage(correlationId, to, from, metadata, headers, null, undefined, undefined, isoContext)
test.deepEqual(message.context, isoContext)
test.end()
})

createMessageTest.end()
})

Expand Down

0 comments on commit 4d8f8de

Please sign in to comment.