Skip to content

Add support to retry if challenge does not exist on legacy #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ The following parameters can be set in config files or in env variables:
- KAFKA_URL: comma separated Kafka hosts for consumer to listen; default value: 'localhost:9092'
- KAFKA_GROUP_ID: Kafka consumer group id; default value: 'legacy-resources-processor-group'
- KAFKA_CLIENT_CERT: Kafka connection certificate, optional; default value is undefined;
- KAFKA_ERROR_TOPIC: The kafka error topic.
- RETRY_TIMEOUT: The timeout to retry processing the same message
- BUSAPI_URL: Bus API URL

if not provided, then SSL connection is not used, direct insecure connection is used;

Expand Down
7 changes: 7 additions & 0 deletions config/default.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@ module.exports = {
// Kafka consumer config
KAFKA_URL: process.env.KAFKA_URL || 'localhost:9092',
KAFKA_GROUP_ID: process.env.KAFKA_GROUP_ID || 'legacy-resources-processor-group',
BUSAPI_URL: process.env.BUSAPI_URL || 'https://api.topcoder-dev.com/v5',
KAFKA_ERROR_TOPIC: process.env.KAFKA_ERROR_TOPIC || 'common.error.reporting',
RETRY_TIMEOUT: process.env.RETRY_TIMEOUT || 10 * 1000,
EVENT_ORIGINATOR: process.env.EVENT_ORIGINATOR || 'legacy-challenge-resource-processor',
EVENT_MIME_TYPE: process.env.EVENT_MIME_TYPE || 'application/json',


// below are used for secure Kafka connection, they are optional
// for the local Kafka, they are not needed
KAFKA_CLIENT_CERT: process.env.KAFKA_CLIENT_CERT,
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"superagent": "^4.1.0",
"tc-core-library-js": "git+https://github.com/appirio-tech/tc-core-library-js.git#v2.6.2",
"topcoder-healthcheck-dropin": "^1.0.2",
"topcoder-bus-api-wrapper": "topcoder-platform/tc-bus-api-wrapper.git",
"winston": "^2.2.0"
},
"engines": {
Expand Down
29 changes: 20 additions & 9 deletions src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,26 @@ const dataHandler = async (messageSet, topic, partition) => Promise.each(message
return
}
try {
switch (topic) {
case config.CREATE_CHALLENGE_RESOURCE_TOPIC :
await ProcessorService.createChallengeResource(messageJSON)
break
case config.DELETE_CHALLENGE_RESOURCE_TOPIC:
await ProcessorService.deleteChallengeResource(messageJSON)
break
default:
throw new Error(`Invalid topic: ${topic}`)
const challengeExistsOnLegacy = await ProcessorService.legacyChallengeExist(messageJSON)
if (challengeExistsOnLegacy) {
switch (topic) {
case config.CREATE_CHALLENGE_RESOURCE_TOPIC :
await ProcessorService.createChallengeResource(messageJSON)
break
case config.DELETE_CHALLENGE_RESOURCE_TOPIC:
await ProcessorService.deleteChallengeResource(messageJSON)
break
default:
throw new Error(`Invalid topic: ${topic}`)
}
} else {
logger.info('Challenge does not exist yet. Will post the same message back to the bus API')
await new Promise((resolve) => {
setTimeout(async () => {
await helper.postBusEvent(topic, messageJSON.payload)
resolve()
}, config.RETRY_TIMEOUT)
})
}
// only commit if no errors
await consumer.commitOffset({ topic, partition, offset: m.offset })
Expand Down
40 changes: 39 additions & 1 deletion src/common/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@
const _ = require('lodash')
const config = require('config')
const request = require('superagent')
const busApi = require('topcoder-bus-api-wrapper')
const m2mAuth = require('tc-core-library-js').auth.m2m
const m2m = m2mAuth(_.pick(config, ['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME', 'AUTH0_PROXY_SERVER_URL']))


// Bus API Client
let busApiClient

/**
* Get M2M token
* @return {String} m2m token
Expand Down Expand Up @@ -62,9 +67,42 @@ async function deleteRequest (url, body, m2mToken) {
.set('Accept', 'application/json')
}

/**
* Get Bus API Client
* @return {Object} Bus API Client Instance
*/
function getBusApiClient () {
// if there is no bus API client instance, then create a new instance
if (!busApiClient) {
busApiClient = busApi(_.pick(config,
['AUTH0_URL', 'AUTH0_AUDIENCE', 'TOKEN_CACHE_TIME',
'AUTH0_CLIENT_ID', 'AUTH0_CLIENT_SECRET', 'BUSAPI_URL',
'KAFKA_ERROR_TOPIC', 'AUTH0_PROXY_SERVER_URL']))
}

return busApiClient
}

/**
* Post bus event.
* @param {String} topic the event topic
* @param {Object} payload the event payload
*/
async function postBusEvent (topic, payload) {
const client = getBusApiClient()
await client.postEvent({
topic,
originator: config.EVENT_ORIGINATOR,
timestamp: new Date().toISOString(),
'mime-type': config.EVENT_MIME_TYPE,
payload
})
}

module.exports = {
getM2Mtoken,
getRequest,
postRequest,
deleteRequest
deleteRequest,
postBusEvent
}
23 changes: 22 additions & 1 deletion src/services/ProcessorService.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,26 @@ const logger = require('../common/logger')
const helper = require('../common/helper')
const {getRoleIdByUuid, isStudio} = require('../common/utils')

/**
* Check if a challenge exists on legacy (v4)
* @param {Object} message The message containing the challenge resource information
*/
async function legacyChallengeExist (message) {
let exists = false
try {
const m2mToken = await helper.getM2Mtoken()
const res = await helper.getRequest(`${config.CHALLENGE_API_V5_URL}/${_.get(message, 'payload.challengeId')}`, m2mToken)
const v5Challenge = res.body
if (!v5Challenge.legacyId) {
exists = false
}
await helper.getRequest(`${config.CHALLENGE_API_V4_URL}/${v5Challenge.legacyId}`, m2mToken)
} catch (e) {
exists = false
}
return exists
}

/**
* Updates (create or delete) a challenge resource based on the isDelete flag
*
Expand Down Expand Up @@ -84,7 +104,8 @@ deleteChallengeResource.schema = createChallengeResource.schema

module.exports = {
createChallengeResource,
deleteChallengeResource
deleteChallengeResource,
legacyChallengeExist
}

logger.buildService(module.exports)