55require ( './bootstrap' )
66
77const AWSXRay = require ( 'aws-xray-sdk' )
8- const ns = AWSXRay . getNamespace ( ) ;
98
109const _ = require ( 'lodash' )
1110const config = require ( 'config' )
@@ -28,91 +27,95 @@ const consumer = new Kafka.GroupConsumer(helper.getKafkaOptions())
2827 * this function will be invoked
2928 */
3029const dataHandler = ( messageSet , topic , partition ) => Promise . each ( messageSet , async ( m ) => {
31- const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
32- AWSXRay . setSegment ( segment ) ;
33-
34- const message = m . message . value . toString ( 'utf8' )
35- logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` )
36-
37- let messageJSON
38- try {
39- messageJSON = JSON . parse ( message )
40- } catch ( e ) {
41- logger . error ( 'Invalid message JSON.' )
42- logger . logFullError ( e )
43-
44- // commit the message and ignore it
45- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
46- return
47- }
30+ const ns = AWSXRay . getNamespace ( ) ;
31+
32+ ns . run ( async ( ) => {
33+ const segment = new AWSXRay . Segment ( 'legacy-challenge-processor' ) ;
34+ AWSXRay . setSegment ( segment ) ;
35+
36+ const message = m . message . value . toString ( 'utf8' )
37+ logger . info ( `Handle Kafka event message; Topic: ${ topic } ; Partition: ${ partition } ; Offset: ${ m . offset } ; Message: ${ message } .` )
38+
39+ let messageJSON
40+ try {
41+ messageJSON = JSON . parse ( message )
42+ } catch ( e ) {
43+ logger . error ( 'Invalid message JSON.' )
44+ logger . logFullError ( e )
45+
46+ // commit the message and ignore it
47+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
48+ return
49+ }
4850
49- if ( messageJSON . topic !== topic ) {
50- logger . error ( `The message topic ${ messageJSON . topic } doesn't match the Kafka topic ${ topic } . Message: ${ JSON . stringify ( messageJSON ) } ` )
51+ if ( messageJSON . topic !== topic ) {
52+ logger . error ( `The message topic ${ messageJSON . topic } doesn't match the Kafka topic ${ topic } . Message: ${ JSON . stringify ( messageJSON ) } ` )
5153
52- // commit the message and ignore it
53- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
54- return
55- }
54+ // commit the message and ignore it
55+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
56+ return
57+ }
5658
57- if ( _ . includes ( config . IGNORED_ORIGINATORS , messageJSON . originator ) ) {
58- logger . error ( `The message originator is in the ignored list. Originator: ${ messageJSON . originator } ` )
59+ if ( _ . includes ( config . IGNORED_ORIGINATORS , messageJSON . originator ) ) {
60+ logger . error ( `The message originator is in the ignored list. Originator: ${ messageJSON . originator } ` )
5961
60- // commit the message and ignore it
61- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
62- return
63- }
62+ // commit the message and ignore it
63+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
64+ return
65+ }
6466
65- const { traceInformation : {
66- traceId,
67- parentSegmentId,
68- } = {
69- traceId : null ,
70- parentSegmentId : null
71- } } = messageJSON . payload ;
67+ const { traceInformation : {
68+ traceId,
69+ parentSegmentId,
70+ } = {
71+ traceId : null ,
72+ parentSegmentId : null
73+ } } = messageJSON . payload ;
7274
73- console . log ( 'tracing information' , traceId , parentSegmentId ) ;
75+ console . log ( 'tracing information' , traceId , parentSegmentId ) ;
7476
75- if ( traceId ) {
76- segment . trace_id = traceId ;
77- segment . id = parentSegmentId ;
78- }
77+ if ( traceId ) {
78+ segment . trace_id = traceId ;
79+ segment . id = parentSegmentId ;
80+ }
7981
8082
81- // do not trust the message payload
82- // the message.payload will be replaced with the data from the API
83- try {
84- console . log ( 'Fetch challenge details' ) ;
85- const challengeUuid = _ . get ( messageJSON , 'payload.id' )
86- if ( _ . isEmpty ( challengeUuid ) ) {
87- segment . close ( ) ;
83+ // do not trust the message payload
84+ // the message.payload will be replaced with the data from the API
85+ try {
86+ console . log ( 'Fetch challenge details' ) ;
87+ const challengeUuid = _ . get ( messageJSON , 'payload.id' )
88+ if ( _ . isEmpty ( challengeUuid ) ) {
89+ segment . close ( ) ;
90+ segment . addError ( new Error ( err ) ) ;
91+ throw new Error ( 'Invalid payload' )
92+ }
93+ const m2mToken = await helper . getM2MToken ( )
94+ const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
95+ // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
96+ messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
97+ } catch ( err ) {
8898 segment . addError ( new Error ( err ) ) ;
89- throw new Error ( 'Invalid payload' )
99+ logger . debug ( 'Failed to fetch challenge information' )
100+ logger . logFullError ( err )
90101 }
91- const m2mToken = await helper . getM2MToken ( )
92- const v5Challenge = await helper . getRequest ( `${ config . V5_CHALLENGE_API_URL } /${ challengeUuid } ` , m2mToken )
93- // TODO : Cleanup. Pulling the billingAccountId from the payload, it's not part of the challenge object
94- messageJSON . payload = { billingAccountId : messageJSON . payload . billingAccountId , ...v5Challenge . body }
95- } catch ( err ) {
96- segment . addError ( new Error ( err ) ) ;
97- logger . debug ( 'Failed to fetch challenge information' )
98- logger . logFullError ( err )
99- }
100102
101- try {
102- console . log ( 'Process challenge' )
103- await ProcessorService . processMessage ( messageJSON )
104-
105- // logger.debug('Successfully processed message')
106- } catch ( err ) {
107- segment . addError ( new Error ( err ) ) ;
108- logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
109- logger . logFullError ( err )
110- } finally {
111- // Commit offset regardless of error
112- await consumer . commitOffset ( { topic, partition, offset : m . offset } )
113- }
103+ try {
104+ console . log ( 'Process challenge' )
105+ await ProcessorService . processMessage ( messageJSON )
114106
115- segment . close ( ) ;
107+ // logger.debug('Successfully processed message')
108+ } catch ( err ) {
109+ segment . addError ( new Error ( err ) ) ;
110+ logger . error ( `Error processing message ${ JSON . stringify ( messageJSON ) } ` )
111+ logger . logFullError ( err )
112+ } finally {
113+ // Commit offset regardless of error
114+ await consumer . commitOffset ( { topic, partition, offset : m . offset } )
115+ }
116+
117+ segment . close ( ) ;
118+ } ) ;
116119} )
117120
118121// check if there is kafka connection alive
@@ -128,14 +131,12 @@ const check = () => {
128131 return connected
129132}
130133
131- const topics = [ config . CREATE_CHALLENGE_TOPIC , config . UPDATE_CHALLENGE_TOPIC ]
134+ const topics = [ config . CREATE_CHALLENGE_TOPIC , config . UPDATE_CHALLENGE_TOPIC ] ;
132135
133- ( ( ) => {
134- ns . run ( ( ) => {
135- consumer
136- . init ( [ {
136+ consumer
137+ . init ( [ {
137138 subscriptions : topics ,
138- handler : dataHandler
139+ handler : dataHandler
139140 } ] )
140141 // consume configured topics
141142 . then ( ( ) => {
@@ -146,8 +147,6 @@ const topics = [config.CREATE_CHALLENGE_TOPIC, config.UPDATE_CHALLENGE_TOPIC]
146147 logger . info ( 'Kick Start.......' )
147148 } )
148149 . catch ( ( err ) => logger . error ( err ) )
149- } )
150- } ) ( ) ;
151150
152151if ( process . env . NODE_ENV === 'test' ) {
153152 module . exports = consumer
0 commit comments