Skip to content
This repository was archived by the owner on Mar 12, 2025. It is now read-only.

Commit e021187

Browse files
author
Sachin Maheshwari
committed
adding member api call and publishing to another topic
1 parent 9dc9069 commit e021187

File tree

3 files changed

+62
-10
lines changed

3 files changed

+62
-10
lines changed

config/default.js

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ module.exports = {
1010
// for the local Kafka, they are not needed
1111
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT,
1212
KAFKA_CLIENT_CERT_KEY: process.env.KAFKA_CLIENT_CERT_KEY,
13-
13+
KAFKA_MESSAGE_ORIGINATOR: process.env.KAFKA_MESSAGE_ORIGINATOR || 'u-bahn-user-reconciliation-processor',
14+
1415
// Kafka group id
1516
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'u-bahn-user-reconciliation-processor',
1617
USER_RECONCILATION_TOPIC: process.env.USER_RECONCILATION_TOPIC || 'backgroundjob.reconcile.user',
18+
PUBLISH_TOPIC: process.env.PUBLISH_TOPIC_TOPIC || 'legacy.sync.user',
19+
20+
V5_API_URL: process.env.V5_API_URL || 'https://api.topcoder-dev.com/v5',
1721

1822
UBAHN_API_URL: process.env.UBAHN_API_URL || 'https://api.topcoder-dev.com/v5',
1923
MEMBERS_API_URL: process.env.MEMBERS_API_URL || 'https://api.topcoder-dev.com/v5/members',

src/common/helper.js

+35-5
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const topcoderM2M = m2mAuth({ ...topcoderM2MConfig, AUTH0_AUDIENCE: topcoderM2MC
1919
* js version of sleep()
2020
* @param {Number} ms Timeout in ms
2121
*/
22-
async function sleep (ms) {
22+
async function sleep(ms) {
2323
if (!ms) {
2424
ms = config.SLEEP_TIME
2525
}
@@ -33,33 +33,63 @@ async function sleep (ms) {
3333
* (U-Bahn APIs only)
3434
* @returns {Promise}
3535
*/
36-
async function getUbahnM2Mtoken () {
36+
async function getUbahnM2Mtoken() {
3737
return ubahnM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
3838
}
3939

4040
/* Function to get M2M token
4141
* (Topcoder APIs only)
4242
* @returns {Promise}
4343
*/
44-
async function getTopcoderM2Mtoken () {
44+
async function getTopcoderM2Mtoken() {
4545
return topcoderM2M.getMachineToken(config.AUTH0_CLIENT_ID, config.AUTH0_CLIENT_SECRET)
4646
}
4747

4848
/**
4949
* Get Kafka options
5050
* @return {Object} the Kafka options
5151
*/
52-
function getKafkaOptions () {
52+
function getKafkaOptions() {
5353
const options = { connectionString: config.KAFKA_URL, groupId: config.KAFKA_GROUP_ID }
5454
if (config.KAFKA_CLIENT_CERT && config.KAFKA_CLIENT_CERT_KEY) {
5555
options.ssl = { cert: config.KAFKA_CLIENT_CERT, key: config.KAFKA_CLIENT_CERT_KEY }
5656
}
5757
return options
5858
}
5959

60+
/**
61+
* Returns the member details
62+
*
63+
* @param {String} handle The member handle
64+
*/
65+
async function getMember(handle, token) {
66+
const res = await axios.get(`${config.MEMBERS_API_URL}/${qs.escape(handle)}`, { headers: { Authorization: `Bearer ${token}` } })
67+
return _.get(res, 'data', {})
68+
}
69+
70+
/**
71+
* Send Kafka event message
72+
* @params {String} topic the topic name
73+
* @params {Object} payload the payload
74+
*/
75+
async function postEvent (topic, payload, token) {
76+
logger.debug(`Posting event to Kafka topic ${topic}, ${JSON.stringify(payload, null, 2)}`)
77+
const message = {
78+
topic,
79+
originator: config.KAFKA_MESSAGE_ORIGINATOR,
80+
timestamp: new Date().toISOString(),
81+
'mime-type': 'application/json',
82+
payload
83+
}
84+
85+
await axios.post(`${config.V5_API_URL}/bus/events`, message, { headers: { Authorization: `Bearer ${token}` } })
86+
logger.debug(`Posted event to Kafka topic`)
87+
}
6088

6189
module.exports = {
6290
getKafkaOptions,
6391
getTopcoderM2Mtoken,
64-
getUbahnM2Mtoken
92+
getUbahnM2Mtoken,
93+
getMember,
94+
postEvent
6595
}

src/services/ProcessorService.js

+22-4
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,28 @@ const helper = require('../common/helper')
1313
* @param {Object} message the kafka message
1414
* @returns {Promise}
1515
*/
16-
async function processSync (message) {
17-
const { handle } = message.payload
18-
const tcToken = await helper.getTopcoderM2Mtoken()
19-
const ubToken = await helper.getUbahnM2Mtoken()
16+
async function processSync(message) {
17+
try {
18+
const { handle } = message.payload
19+
const tcToken = await helper.getTopcoderM2Mtoken()
20+
const member = await helper.getMember(handle, tcToken)
21+
const location = member.homeCountryCode || member.competitionCountryCode || null
22+
const payload = {
23+
id : member.id,
24+
handle: handle,
25+
firstName : member.firstName,
26+
lastName: member.lastName,
27+
email: member.email,
28+
country : {
29+
isoAlpha3Code: location
30+
},
31+
active : (member.status === 'ACTIVE') ? true : false
32+
}
33+
await postEvent(config.PUBLISH_TOPIC, payload, tcToken)
34+
} catch (e) {
35+
logger.error(`unable to process the message, error : ${JSON.stringify(e)}`)
36+
}
37+
2038
}
2139

2240
processSync.schema = {

0 commit comments

Comments
 (0)