Skip to content

Commit 5386a40

Browse files
authored
Merge pull request #9 from topcoder-platform/implement-retry
Add support to retry if challenge does not exist on legacy
2 parents f6f5f50 + 6ac71d4 commit 5386a40

File tree

7 files changed

+93
-12
lines changed

7 files changed

+93
-12
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ The following parameters can be set in config files or in env variables:
3737
- KAFKA_URL: comma separated Kafka hosts for consumer to listen; default value: 'localhost:9092'
3838
- KAFKA_GROUP_ID: Kafka consumer group id; default value: 'legacy-resources-processor-group'
3939
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional; default value is undefined;
40+
- KAFKA_ERROR_TOPIC: The kafka error topic.
41+
- RETRY_TIMEOUT: The timeout to retry processing the same message
42+
- BUSAPI_URL: Bus API URL
4043

4144
if not provided, then SSL connection is not used, direct insecure connection is used;
4245

config/default.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ module.exports = {
99
// Kafka consumer config
1010
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
1111
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-resources-processor-group',
12+
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
13+
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
14+
RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000,
15+
EVENT_ORIGINATOR: process.env.EVENT_ORIGINATOR || 'legacy-challenge-resource-processor',
16+
EVENT_MIME_TYPE: process.env.EVENT_MIME_TYPE || 'application/json',
17+
18+
1219
// below are used for secure Kafka connection, they are optional
1320
// for the local Kafka, they are not needed
1421
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT,

package-lock.json

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
"superagent": "^4.1.0",
3737
"tc-core-library-js": "git+https://github.com/appirio-tech/tc-core-library-js.git#v2.6.2",
3838
"topcoder-healthcheck-dropin": "^1.0.2",
39+
"topcoder-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
3940
"winston": "^2.2.0"
4041
},
4142
"engines": {

src/app.js

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,26 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
3333
return
3434
}
3535
try {
36-
switch (topic) {
37-
case config.CREATE_CHALLENGE_RESOURCE_TOPIC :
38-
await ProcessorService.createChallengeResource(messageJSON)
39-
break
40-
case config.DELETE_CHALLENGE_RESOURCE_TOPIC:
41-
await ProcessorService.deleteChallengeResource(messageJSON)
42-
break
43-
default:
44-
throw new Error(`Invalid topic: ${topic}`)
36+
const challengeExistsOnLegacy = await ProcessorService.legacyChallengeExist(messageJSON)
37+
if (challengeExistsOnLegacy) {
38+
switch (topic) {
39+
case config.CREATE_CHALLENGE_RESOURCE_TOPIC :
40+
await ProcessorService.createChallengeResource(messageJSON)
41+
break
42+
case config.DELETE_CHALLENGE_RESOURCE_TOPIC:
43+
await ProcessorService.deleteChallengeResource(messageJSON)
44+
break
45+
default:
46+
throw new Error(`Invalid topic: ${topic}`)
47+
}
48+
} else {
49+
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
50+
await new Promise((resolve) => {
51+
setTimeout(async () => {
52+
await helper.postBusEvent(topic, messageJSON.payload)
53+
resolve()
54+
}, config.RETRY_TIMEOUT)
55+
})
4556
}
4657
// only commit if no errors
4758
await consumer.commitOffset({ topic, partition, offset: m.offset })

src/common/helper.js

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,14 @@
55
const _ = require('lodash')
66
const config = require('config')
77
const request = require('superagent')
8+
const busApi = require('topcoder-bus-api-wrapper')
89
const m2mAuth = require('tc-core-library-js').auth.m2m
910
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))
1011

12+
13+
// Bus API Client
14+
let busApiClient
15+
1116
/**
1217
* Get M2M token
1318
* @return {String} m2m token
@@ -62,9 +67,42 @@ async function deleteRequest (url, body, m2mToken) {
6267
.set('Accept', 'application/json')
6368
}
6469

70+
/**
71+
* Get Bus API Client
72+
* @return {Object} Bus API Client Instance
73+
*/
74+
function getBusApiClient () {
75+
// if there is no bus API client instance, then create a new instance
76+
if (!busApiClient) {
77+
busApiClient = busApi(_.pick(config,
78+
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
79+
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
80+
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
81+
}
82+
83+
return busApiClient
84+
}
85+
86+
/**
87+
* Post bus event.
88+
* @param {String} topic the event topic
89+
* @param {Object} payload the event payload
90+
*/
91+
async function postBusEvent (topic, payload) {
92+
const client = getBusApiClient()
93+
await client.postEvent({
94+
topic,
95+
originator: config.EVENT_ORIGINATOR,
96+
timestamp: new Date().toISOString(),
97+
'mime-type': config.EVENT_MIME_TYPE,
98+
payload
99+
})
100+
}
101+
65102
module.exports = {
66103
getM2Mtoken,
67104
getRequest,
68105
postRequest,
69-
deleteRequest
106+
deleteRequest,
107+
postBusEvent
70108
}

src/services/ProcessorService.js

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,26 @@ const logger = require('../common/logger')
88
const helper = require('../common/helper')
99
const {getRoleIdByUuid, isStudio} = require('../common/utils')
1010

11+
/**
12+
* Check if a challenge exists on legacy (v4)
13+
* @param {Object} message The message containing the challenge resource information
14+
*/
15+
async function legacyChallengeExist (message) {
16+
let exists = false
17+
try {
18+
const m2mToken = await helper.getM2Mtoken()
19+
const res = await helper.getRequest(`${config.CHALLENGE_API_V5_URL}/${_.get(message, 'payload.challengeId')}`, m2mToken)
20+
const v5Challenge = res.body
21+
if (!v5Challenge.legacyId) {
22+
exists = false
23+
}
24+
await helper.getRequest(`${config.CHALLENGE_API_V4_URL}/${v5Challenge.legacyId}`, m2mToken)
25+
} catch (e) {
26+
exists = false
27+
}
28+
return exists
29+
}
30+
1131
/**
1232
* Updates (create or delete) a challenge resource based on the isDelete flag
1333
*
@@ -84,7 +104,8 @@ deleteChallengeResource.schema = createChallengeResource.schema
84104

85105
module.exports = {
86106
createChallengeResource,
87-
deleteChallengeResource
107+
deleteChallengeResource,
108+
legacyChallengeExist
88109
}
89110

90111
logger.buildService(module.exports)

0 commit comments

Comments
 (0)