Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion src/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,20 @@ EVENT_ENABLE_KAFKA_PUSH=true
#Set Default Phone code
DEFAULT_PHONE_CODE="+91"
# kafka topic for health check
KAFKA_HEALTH_CHECK_TOPIC='user-health-check-topic-check'
KAFKA_HEALTH_CHECK_TOPIC='user-health-check-topic-check'
#Enable / disable tenant create event
EVENT_ENABLE_TENANT_EVENTS=true
#Enable / disable tenant kafka event
ENABLE_TENANT_KAFKA_EVENTS=true
#Enable / disable user kafka event
ENABLE_USER_KAFKA_EVENTS=true
#Enable / disable org kafka event
ENABLE_ORG_KAFKA_EVENTS=true
#Event Kafka topic for tenant create
EVENT_TENANT_KAFKA_TOPIC='dev.tenantEvent'
#Event API for tenant create , can be comma separated
EVENT_TENANT_LISTENER_API='http://interface:3567/scp/v1/tenant/add'
#Enable / disable organization event
EVENT_ENABLE_ORGANIZATION_EVENTS=true
#Event Kafka topic for organization create/update
EVENT_ORGANIZATION_KAFKA_TOPIC='dev.organizationEvent'
31 changes: 31 additions & 0 deletions src/dtos/organizationDTO.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,37 @@ class organizationDTO {
registration_codes: input?.registration_codes || [],
}
}

static eventBodyDTO({ entity, eventType, entityId, changedValues = [], args = {} }) {
try {
if (!entity || !eventType || !entityId)
throw new Error('Entity, EventType & EntityId values are mandatory for an Event')

const disallowedArgs = ['deleted_at']

disallowedArgs.forEach((key) => {
delete args[key]
})

const changes = changedValues.reduce((result, obj) => {
const { fieldName, oldValue, newValue } = obj
if (!result[fieldName]) result[fieldName] = {}
if (oldValue) result[fieldName].oldValue = oldValue
if (newValue) result[fieldName].newValue = newValue
return result
}, {})
return {
entity,
eventType,
entityId,
changes,
...args,
}
} catch (error) {
console.error(error)
return false
}
}
}

module.exports = organizationDTO
31 changes: 31 additions & 0 deletions src/dtos/tenantDTO.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,37 @@ class TenantResponseDTO {
}),
}
}

static eventBodyDTO({ entity, eventType, entityId, changedValues = [], args = {} }) {
try {
if (!entity || !eventType || !entityId)
throw new Error('Entity, EventType & EntityId values are mandatory for an Event')

const disallowedArgs = ['deleted_at']

disallowedArgs.forEach((key) => {
delete args[key]
})

const changes = changedValues.reduce((result, obj) => {
const { fieldName, oldValue, newValue } = obj
if (!result[fieldName]) result[fieldName] = {}
if (oldValue) result[fieldName].oldValue = oldValue
if (newValue) result[fieldName].newValue = newValue
return result
}, {})
return {
entity,
eventType,
entityId,
changes,
...args,
}
} catch (error) {
console.error(error)
return false
}
}
}

module.exports = TenantResponseDTO
45 changes: 44 additions & 1 deletion src/envVariables.js
Original file line number Diff line number Diff line change
Expand Up @@ -396,11 +396,15 @@ let enviromentVariables = {
message: 'Key to toggle user creation kafka event',
optional: true,
},
EVENT_ENABLE_USER_KAFKA_EVENTS: {
message: 'Key to toggle user creation kafka event',
optional: true,
},
EVENT_USER_KAFKA_TOPIC: {
message: 'Kafka topic for User creation Event',
optional: true,
requiredIf: {
key: 'EVENT_ENABLE_KAFKA_PUSH',
key: 'EVENT_ENABLE_USER_KAFKA_EVENTS',
operator: 'EQUALS',
value: 'true',
},
Expand All @@ -415,6 +419,45 @@ let enviromentVariables = {
optional: true,
default: 'user-health-check-topic-check',
},
EVENT_ENABLE_TENANT_EVENTS: {
message: 'Key to toggle tenant creation api event',
optional: true,
},
EVENT_ENABLE_TENANT_KAFKA_EVENTS: {
message: 'Key to toggle tenant creation kafka event',
optional: true,
},
EVENT_TENANT_KAFKA_TOPIC: {
message: 'Kafka topic for Tenant creation Event',
optional: true,
requiredIf: {
key: 'EVENT_ENABLE_TENANT_KAFKA_EVENTS',
operator: 'EQUALS',
value: 'true',
},
},
EVENT_TENANT_LISTENER_API: {
message: 'URL for Tenant creation Event',
optional: true,
requiredIf: {
key: 'EVENT_ENABLE_TENANT_EVENTS',
operator: 'EQUALS',
value: 'true',
},
},
EVENT_ENABLE_ORG_KAFKA_EVENTS: {
message: 'Key to toggle organization creation kafka event',
optional: true,
},
EVENT_ORGANIZATION_KAFKA_TOPIC: {
message: 'Kafka topic for organization create/update Event',
optional: true,
requiredIf: {
key: 'EVENT_ENABLE_ORG_KAFKA_EVENTS',
operator: 'EQUALS',
value: 'true',
},
},
}
let success = true

Expand Down
25 changes: 25 additions & 0 deletions src/generics/kafka-communication.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,29 @@ const pushUserEventsToKafka = async (message) => {
}
}

const pushTenantEventsToKafka = async (message) => {
try {
const payload = { topic: process.env.EVENT_TENANT_KAFKA_TOPIC, messages: [{ value: JSON.stringify(message) }] }
return await pushPayloadToKafka(payload)
} catch (error) {
console.log(error)
return error
}
}

const pushOrganizationEventsToKafka = async (message) => {
try {
const payload = {
topic: process.env.EVENT_ORGANIZATION_KAFKA_TOPIC,
messages: [{ value: JSON.stringify(message) }],
}
return await pushPayloadToKafka(payload)
} catch (error) {
console.log(error)
return error
}
}

const pushPayloadToKafka = async (payload) => {
try {
let response = await kafkaProducer.send(payload)
Expand All @@ -51,4 +74,6 @@ module.exports = {
pushEmailToKafka,
clearInternalCache,
pushUserEventsToKafka,
pushTenantEventsToKafka,
pushOrganizationEventsToKafka,
}
28 changes: 28 additions & 0 deletions src/generics/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,33 @@ function appendParamsToUrl(host, params) {

return url.toString()
}

/**
* Compare object and get the updated value with old value
* @method
* @name extractUpdatedValues
* @param {Object} oldData - Data before update
* @param {Object} newData - Data after update
* @param {Object} updateData - reqBody data
* @returns {Array<{fieldName:string, oldValue:any, newValue:any}>}
*/

function extractUpdatedValues(oldData = {}, newData = {}, updateData = {}) {
const changes = []

for (const key of Object.keys(updateData)) {
let oldValue = oldData[key]
let newValue = newData[key]
let fieldName = key
// Compare only if key exists in updateData
if (!_.isEqual(oldValue, newValue)) {
changes.push({ fieldName, oldValue, newValue })
}
}

return changes
}

module.exports = {
generateToken,
hashPassword,
Expand Down Expand Up @@ -1131,4 +1158,5 @@ module.exports = {
isValidAction,
appendParamsToUrl,
parseMetaData,
extractUpdatedValues,
}
36 changes: 29 additions & 7 deletions src/helpers/eventBroadcasterMain.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ const getEndpoints = (eventGroup) => {
case 'userEvents':
if (process.env.EVENT_USER_LISTENER_API)
return process.env.EVENT_USER_LISTENER_API.split(',').filter((url) => url.trim())
return []
case 'tenantEvents':
if (process.env.EVENT_TENANT_LISTENER_API)
return process.env.EVENT_TENANT_LISTENER_API.split(',').filter((url) => url.trim())
return []
default:
return []
}
Expand All @@ -20,12 +25,16 @@ const isEventEnabled = (eventGroup) => {
switch (eventGroup) {
case 'organizationEvents':
return process.env.EVENT_ENABLE_ORG_EVENTS !== 'false'

case 'userEvents':
return process.env.EVENT_ENABLE_USER_EVENTS !== 'false'

case 'tenantEvents':
return process.env.EVENT_ENABLE_TENANT_EVENTS !== 'false'
case 'userEvents-kafka':
return process.env.EVENT_ENABLE_KAFKA_PUSH !== 'false'
return process.env.EVENT_ENABLE_USER_KAFKA_EVENTS !== 'false'
case 'tenantEvents-kafka':
return process.env.EVENT_ENABLE_TENANT_KAFKA_EVENTS !== 'false'
case 'organizationEvents-kafka':
return process.env.EVENT_ENABLE_ORG_KAFKA_EVENTS !== 'false'
default:
return true
}
Expand Down Expand Up @@ -56,13 +65,26 @@ exports.eventBroadcasterKafka = async (eventGroup, { requestBody }) => {
if (!requestBody) throw new Error('Kafka Event Body Generation Failed')
if (!isEventEnabled(`${eventGroup}-kafka`))
throw new Error(`Kafka Events Not Enabled For The Group "${eventGroup}"`)

kafkaCommunication.pushUserEventsToKafka(requestBody)
//push to kafka based on eventGroup
switch (eventGroup) {
case 'organizationEvents':
await kafkaCommunication.pushOrganizationEventsToKafka(requestBody)
break
case 'userEvents':
await kafkaCommunication.pushUserEventsToKafka(requestBody)
break
case 'tenantEvents':
await kafkaCommunication.pushTenantEventsToKafka(requestBody)
break
default:
console.log('No Kafka Event Group Found')
break
}
} catch (err) {
console.log(err)
}
}
exports.broadcastUserEvent = async (eventGroup, { requestBody, headers = {}, isInternal = true }) => {
exports.broadcastEvent = async (eventGroup, { requestBody, headers = {}, isInternal = true }) => {
try {
// Fire both broadcaster functions concurrently
const broadcastPromises = [
Expand All @@ -82,6 +104,6 @@ exports.broadcastUserEvent = async (eventGroup, { requestBody, headers = {}, isI
})
} catch (err) {
// Log any unexpected errors from the promise settlement
console.error('Error in broadcastUserEvent:', err)
console.error('Error in broadcastEvent:', err)
}
}
6 changes: 3 additions & 3 deletions src/helpers/userInvite.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const emailEncryption = require('@utils/emailEncryption')
const userOrganizationQueries = require('@database/queries/userOrganization')
const userOrganizationRoleQueries = require('@database/queries/userOrganizationRole')
const { eventBodyDTO, keysFilter } = require('@dtos/userDTO')
const { broadcastUserEvent } = require('@helpers/eventBroadcasterMain')
const { broadcastEvent } = require('@helpers/eventBroadcasterMain')
const { generateUniqueUsername, generateUniqueCodeString } = require('@utils/usernameGenerator.js')
const userRolesQueries = require('@database/queries/userOrganizationRole')
const invitationQueries = require('@database/queries/invitation')
Expand Down Expand Up @@ -923,7 +923,7 @@ module.exports = class UserInviteHelper {
},
})

broadcastUserEvent('userEvents', { requestBody: eventBody, isInternal: true })
broadcastEvent('userEvents', { requestBody: eventBody, isInternal: true })
}

// Update UserCredential with organization_id and potentially password
Expand Down Expand Up @@ -1157,7 +1157,7 @@ module.exports = class UserInviteHelper {
args,
})

broadcastUserEvent('userEvents', { requestBody: eventBody, isInternal: true })
broadcastEvent('userEvents', { requestBody: eventBody, isInternal: true })

if (insertedUser?.id) {
const { name, email } = invitee
Expand Down
6 changes: 3 additions & 3 deletions src/services/account.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const { generateUniqueUsername } = require('@utils/usernameGenerator.js')
const UserTransformDTO = require('@dtos/userDTO')
const notificationUtils = require('@utils/notification')
const userHelper = require('@helpers/userHelper')
const { broadcastUserEvent } = require('@helpers/eventBroadcasterMain')
const { broadcastEvent } = require('@helpers/eventBroadcasterMain')

module.exports = class AccountHelper {
/**
Expand Down Expand Up @@ -622,7 +622,7 @@ module.exports = class AccountHelper {
},
})

broadcastUserEvent('userEvents', { requestBody: eventBody, isInternal: true })
broadcastEvent('userEvents', { requestBody: eventBody, isInternal: true })

return responses.successResponse({
statusCode: httpStatusCode.created,
Expand Down Expand Up @@ -1768,7 +1768,7 @@ module.exports = class AccountHelper {
},
})

broadcastUserEvent('userEvents', { requestBody: eventBody, isInternal: true })
broadcastEvent('userEvents', { requestBody: eventBody, isInternal: true })

return responses.successResponse({
statusCode: httpStatusCode.ok,
Expand Down
Loading