Skip to content

Commit a291b91

Browse files
committed
Fixed and working. Still issue with race-condition/sync. Re-enabling retry for now.
1 parent 885d480 commit a291b91

File tree

4 files changed

+26
-27
lines changed

4 files changed

+26
-27
lines changed

src/app.js

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ const { isNil, get } = require('lodash')
88
const Kafka = require('no-kafka')
99
const healthcheck = require('topcoder-healthcheck-dropin')
1010
const logger = require('./common/logger')
11-
// const helper = require('./common/helper')
11+
const helper = require('./common/helper')
1212
const { getKafkaOptions } = require('./common/utils')
1313
const ProcessorService = require('./services/ProcessorService')
1414

@@ -58,26 +58,25 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
5858
if (isNil(challengeId)) {
5959
throw new Error(`Challenge ID ${challengeId} is null, will not queue to retry`)
6060
} else {
61-
logger.info('Should Retry - but Retry is Disabled')
62-
// const retryCountIdentifier = `${config.KAFKA_GROUP_ID.split(' ').join('_')}_retry_count`
63-
// let currentRetryCount = parseInt(get(messageJSON.payload, retryCountIdentifier, 1), 10)
64-
// if (currentRetryCount <= config.MAX_RETRIES) {
65-
// logger.info(`Challenge does not exist yet. Will post the same message back to the bus API and retry in ${currentRetryCount * (config.RETRY_TIMEOUT / 1000)} seconds`)
66-
// await new Promise((resolve) => {
67-
// setTimeout(async () => {
68-
// currentRetryCount += 1
69-
// await helper.postBusEvent(topic, { ...messageJSON.payload, [retryCountIdentifier]: currentRetryCount })
70-
// resolve()
71-
// }, config.RETRY_TIMEOUT * currentRetryCount)
72-
// })
73-
// } else {
74-
// logger.error(`Failed to process message after ${config.MAX_RETRIES} retries. Aborting...`)
75-
// }
61+
const retryCountIdentifier = `${config.KAFKA_GROUP_ID.split(' ').join('_')}_retry_count`
62+
let currentRetryCount = parseInt(get(messageJSON.payload, retryCountIdentifier, 1), 10)
63+
if (currentRetryCount <= config.MAX_RETRIES) {
64+
logger.info(`Challenge does not exist yet. Will post the same message back to the bus API and retry in ${currentRetryCount * (config.RETRY_TIMEOUT / 1000)} seconds`)
65+
await new Promise((resolve) => {
66+
setTimeout(async () => {
67+
currentRetryCount += 1
68+
await helper.postBusEvent(topic, { ...messageJSON.payload, [retryCountIdentifier]: currentRetryCount })
69+
resolve()
70+
}, config.RETRY_TIMEOUT * currentRetryCount)
71+
})
72+
} else {
73+
logger.error(`Failed to process message after ${config.MAX_RETRIES} retries. Aborting...`)
74+
}
7675
}
7776
}
7877
// only commit if no errors
7978
await consumer.commitOffset({ topic, partition, offset: m.offset })
80-
logger.debug('Successfully processed message')
79+
// logger.debug('Successfully processed message')
8180
} catch (err) {
8281
logger.error(`app.js error message: ${err.message}`)
8382
}

src/dao/RegistrationDAO.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
const helper = require('../common/helper')
22
const moment = require('moment')
3-
const logger = require('../common/logger')
3+
// const logger = require('../common/logger')
44

55
const RESOURCE_TYPE_EXT_REF_ID = 1
66
const RESOURCE_TYPE_HANDLE_ID = 2
@@ -41,7 +41,7 @@ VALUES
4141

4242
async function persistResourceWithRoleId (userId, challengeId, resourceId, roleId, handle, projectPhaseId) {
4343
const regDate = moment().format('MM[.]DD[.]YYYY h:mm A')
44-
logger.debug(`persistResourceWithRoleId - projectPhaseId: ${projectPhaseId} - ${JSON.stringify([resourceId, roleId, projectPhaseId, challengeId, userId, userId, userId])}`)
44+
// logger.debug(`persistResourceWithRoleId - projectPhaseId: ${projectPhaseId} - ${JSON.stringify([resourceId, roleId, projectPhaseId, challengeId, userId, userId, userId])}`)
4545
await helper.executeSQLonDB(QUERY_INSERT_RESOURCE_WITH_ROLE, [resourceId, roleId, projectPhaseId, challengeId, userId, userId, userId])
4646

4747
await persistResourceInfo(userId, resourceId, RESOURCE_TYPE_EXT_REF_ID, userId)

src/services/ProcessorService.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ async function legacyChallengeExist (message) {
2424
const m2mToken = await helper.getM2Mtoken()
2525
const res = await helper.getRequest(`${config.CHALLENGE_API_V5_URL}/${challengeId}`, m2mToken)
2626
// logger.debug(`m2m Token: ${m2mToken}`)
27-
logger.debug(`Getting Challenge from V5 ${config.CHALLENGE_API_V5_URL}/${challengeId}`)
28-
logger.debug(`Response ${JSON.stringify(res.body)}`)
27+
// logger.debug(`Getting Challenge from V5 ${config.CHALLENGE_API_V5_URL}/${challengeId}`)
28+
// logger.debug(`Response ${JSON.stringify(res.body)}`)
2929
const v5Challenge = res.body
3030
if (!v5Challenge.legacyId) {
3131
exists = false
@@ -112,7 +112,7 @@ async function _updateChallengeResource (message, isDelete) {
112112
}
113113
} else {
114114
if (isDelete) {
115-
logger.debug(`v4 Deleteing Challenge Resource ${config.CHALLENGE_API_V4_URL}/${_.get(v5Challenge, 'legacyId')}/resources - ${JSON.stringify(body)}`)
115+
logger.debug(`v4 Deleting Challenge Resource ${config.CHALLENGE_API_V4_URL}/${_.get(v5Challenge, 'legacyId')}/resources - ${JSON.stringify(body)}`)
116116
response = await helper.deleteRequest(`${config.CHALLENGE_API_V4_URL}/${_.get(v5Challenge, 'legacyId')}/resources`, body, m2mToken)
117117
} else {
118118
logger.debug(`v4 Creating Challenge Resource ${config.CHALLENGE_API_V4_URL}/${_.get(v5Challenge, 'legacyId')}/resources - ${JSON.stringify(body)}`)
@@ -132,11 +132,11 @@ async function createChallengeResource (message) {
132132
try {
133133
await _updateChallengeResource(message, false)
134134
} catch (e) {
135+
logger.error(e.message)
135136
logger.logFullError(e)
136-
logger.debug(e.message)
137137
}
138138

139-
logger.info(`Successfully processed create challenge resource message : ${JSON.stringify(message)}`)
139+
// logger.debug(`Successfully processed create challenge resource message : ${JSON.stringify(message)}`)
140140
}
141141

142142
createChallengeResource.schema = {
@@ -162,10 +162,10 @@ async function deleteChallengeResource (message) {
162162
try {
163163
await _updateChallengeResource(message, true)
164164
} catch (e) {
165-
logger.info(`Failed to find and delete the resource: ${JSON.stringify(message)} Error: ${JSON.stringify(e)}`)
165+
logger.error(`Failed to find and delete the resource: ${JSON.stringify(message)} Error: ${JSON.stringify(e)}`)
166166
}
167167

168-
logger.info(`Successfully processed delete challenge resource message : ${JSON.stringify(message)}`)
168+
// logger.debug(`Successfully processed delete challenge resource message : ${JSON.stringify(message)}`)
169169
}
170170

171171
deleteChallengeResource.schema = createChallengeResource.schema

src/services/ProjectService.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const QUERY_CHECK_RESOURCE_EXISTS = 'SELECT COUNT(*) as num FROM resource WHERE
1414
*/
1515
async function resourceExists (challengeId, roleId, userId) {
1616
const result = helper.queryDataFromDB(QUERY_CHECK_RESOURCE_EXISTS, [challengeId, roleId, toInteger(userId)])
17-
logger.debug(`resourceExists ${JSON.stringify([challengeId, roleId, toInteger(userId)])} result: ${JSON.stringify(result)}`)
17+
// logger.debug(`resourceExists ${JSON.stringify([challengeId, roleId, toInteger(userId)])} result: ${JSON.stringify(result)}`)
1818
if (result && result.length > 0) {
1919
return result[0].num > 0
2020
}

0 commit comments

Comments
 (0)