@@ -20,35 +20,75 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
20
20
* @param {* } request
21
21
*/
22
22
handle ( request ) {
23
- let queueUrl = this . _getQueueUrl ( request ) ;
23
+ let queueName = this . _getQueueNameFromRequest ( request ) ;
24
+ let queueUrl = this . _getQueueUrlFromConfig ( queueName ) ;
24
25
let sqs = this . _sqs ( queueUrl ) ;
25
26
let dynamo = this . _dynamo ;
26
27
27
- this . _poolQueueUntilEmpty ( queueUrl , sqs , dynamo ) ;
28
+ this . _poolQueueUntilEmpty ( queueUrl , sqs , dynamo ) . then ( ( ) => {
29
+ let cloudWatch = this . _cloudWatch ( queueUrl ) ;
30
+
31
+ // @see deep-package-manager (queueName === alarmName)
32
+ this . _resetAlarmState ( cloudWatch , queueName )
33
+ . then ( this . createResponse ( { } ) . send ) ;
34
+ } ) ;
35
+ }
36
+
37
+ /**
38
+ * Avoid keeping alarm state infinitely in case there are
39
+ * messages pushed before the alarm switching to OK state
40
+ *
41
+ * @param {AWS.CloudWatch|* } cloudWatch
42
+ * @param {String } alarmName
43
+ * @returns {Promise|* }
44
+ */
45
+ _resetAlarmState ( cloudWatch , alarmName ) {
46
+ return new Promise ( resolve => {
47
+ let payload = {
48
+ AlarmName : alarmName ,
49
+ StateReason : 'Resetting alarm state to avoid keeping it on ALARM infinitely' ,
50
+ StateValue : 'OK'
51
+ } ;
52
+
53
+ cloudWatch . setAlarmState ( payload , error => {
54
+ if ( error ) {
55
+ console . error ( error ) ;
56
+ }
57
+
58
+ resolve ( ) ;
59
+ } ) ;
60
+ } ) ;
28
61
}
29
62
30
63
/**
31
64
* @param {String } queueUrl
32
65
* @param {AWS.SQS|* } sqs
33
66
* @param {AWS.DynamoDB|* } dynamo
67
+ * @returns {Promise|* }
34
68
* @private
35
69
*/
36
70
_poolQueueUntilEmpty ( queueUrl , sqs , dynamo ) {
37
- this . _poolQueueItems ( sqs , queueUrl )
38
- . catch ( error => this . createError ( error ) )
39
- . then ( queueMessages => {
40
- if ( queueMessages . length <= 0 ) {
41
- return this . createResponse ( { } ) . send ( ) ;
42
- }
43
-
44
- Promise
45
- . all ( queueMessages . map (
46
- queueMsg => this . _manageQueueMsg ( sqs , queueUrl , dynamo , queueMsg )
47
- ) )
48
- . then ( ( ) => {
49
- this . _poolQueueUntilEmpty ( queueUrl , sqs , dynamo ) ;
50
- } ) ;
51
- } ) ;
71
+ return new Promise ( resolve => {
72
+ this . _poolQueueItems ( sqs , queueUrl )
73
+ . catch ( error => {
74
+ console . error ( error ) ;
75
+
76
+ resolve ( ) ;
77
+ } )
78
+ . then ( queueMessages => {
79
+ if ( queueMessages . length <= 0 ) {
80
+ return resolve ( ) ;
81
+ }
82
+
83
+ Promise
84
+ . all ( queueMessages . map (
85
+ queueMsg => this . _manageQueueMsg ( sqs , queueUrl , dynamo , queueMsg )
86
+ ) )
87
+ . then ( ( ) => {
88
+ this . _poolQueueUntilEmpty ( queueUrl , sqs , dynamo ) . then ( resolve ) ;
89
+ } ) ;
90
+ } ) ;
91
+ } ) ;
52
92
}
53
93
54
94
/**
@@ -155,6 +195,16 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
155
195
return new AWS . SQS ( { region, } ) ;
156
196
}
157
197
198
+ /**
199
+ * @param {String } queueUrl
200
+ * @returns {AWS.CloudWatch|* }
201
+ */
202
+ _cloudWatch ( queueUrl ) {
203
+ let region = this . _getRegionFromSqsQueueUrl ( queueUrl ) ;
204
+
205
+ return new AWS . CloudWatch ( { region, } ) ;
206
+ }
207
+
158
208
/**
159
209
* @param {String } queueUrl
160
210
* @returns {String }
@@ -176,10 +226,8 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
176
226
* @private
177
227
* @todo : pre-validate event data
178
228
*/
179
- _getQueueUrl ( request ) {
180
- let queueName = JSON . parse ( request . getParam ( 'Records' ) [ 0 ] . Sns . Message ) . Trigger . Dimensions [ 0 ] . value ;
181
-
182
- return this . _getQueueUrlFromConfig ( queueName ) ;
229
+ _getQueueNameFromRequest ( request ) {
230
+ return JSON . parse ( request . getParam ( 'Records' ) [ 0 ] . Sns . Message ) . Trigger . Dimensions [ 0 ] . value ;
183
231
}
184
232
185
233
/**
0 commit comments