Skip to content

Commit 9fd927e

Browse files
authored
Merge pull request #77 from MitocGroup/sync_vanilla
🍔 Sync backend with vanilla ms
2 parents 1e3653c + 8bdadf0 commit 9fd927e

File tree

10 files changed

+205
-12
lines changed

10 files changed

+205
-12
lines changed

src/deep-root-angular/backend/resources.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,15 @@
2121
}
2222
},
2323
"ddb-eventual-consistency": {
24+
"listen-queues": {
25+
"description": "Listen for SQS offload queues messages",
26+
"type": "lambda",
27+
"forceUserIdentity": false,
28+
"scope": "private",
29+
"cron": "0/1 * * * ? *",
30+
"methods": ["GET"],
31+
"source": "src/ddb-eventual-consistency/listen-queues"
32+
},
2433
"pool-queue": {
2534
"description": "Pool the SQS queue for DynamoDB offloaded operations on alarm received from CloudWatchAlarm through an SNS topic",
2635
"type": "lambda",

src/deep-root-angular/backend/src/async-config/dump/Handler.es6

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
9898
*/
9999
_selfDisable(cb) {
100100
let resource = this.kernel.get('resource');
101-
let lambda = resource.get('@deep-root-angular:scheduler:rule');
101+
let lambda = resource.get(this._selfDisableResourceId);
102102
let payload = {
103103
effect: 'disable',
104104
lambdaName: this.context.functionName
@@ -107,6 +107,13 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
107107
lambda.request(payload).useDirectCall().send(cb);
108108
}
109109

110+
/**
111+
* @returns {String}
112+
*/
113+
get _selfDisableResourceId() {
114+
return `@${this.kernel.microservice().identifier}:scheduler:rule`;
115+
}
116+
110117
/**
111118
* @param {Function} callback
112119
* @private

src/deep-root-angular/backend/src/async-config/dump/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
}
1111
],
1212
"bugs": {
13-
"url": "https://github.com/MitocGroup/deep-microservices-root-angularjs/issues"
13+
"url": "https://github.com/MitocGroup/deep-microservices-root-angular/issues"
1414
},
1515
"keywords": [
1616
"DEEP",
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* Created by acucer on 6/14/16.
3+
*/
4+
5+
'use strict';
6+
7+
import AWS from 'aws-sdk';
8+
import DeepFramework from 'deep-framework';
9+
10+
export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
11+
/**
12+
* @param {*} args
13+
*/
14+
constructor(...args) {
15+
super(...args);
16+
}
17+
18+
/**
19+
* @param {*} request
20+
*/
21+
handle(request) {
22+
Promise
23+
.all(this._queues.map(queueConfig => this._checkQueue(queueConfig)))
24+
.then(this.createResponse({}).send);
25+
}
26+
27+
/**
28+
* @param {Object} queueConfig
29+
* @returns {Promise|*}
30+
*/
31+
_checkQueue(queueConfig) {
32+
return new Promise(resolve => {
33+
let payload = {
34+
QueueUrl: queueConfig.url,
35+
AttributeNames: ['ApproximateNumberOfMessages',],
36+
};
37+
38+
queueConfig.sqs.getQueueAttributes(payload, (error, data) => {
39+
if (error) {
40+
console.error(error);
41+
return resolve();
42+
}
43+
44+
let attrs = data.Attributes || {
45+
ApproximateNumberOfMessages: '0', // o_O AWS sends a string
46+
};
47+
48+
if (parseInt(attrs.ApproximateNumberOfMessages) > 0) {
49+
return this._invokePullQueue(queueConfig.name)
50+
.then(resolve)
51+
.catch(error => {
52+
console.error(error);
53+
resolve();
54+
});
55+
}
56+
57+
resolve();
58+
});
59+
});
60+
}
61+
62+
/**
63+
* @param {String} queueName
64+
* @returns {Promise|*}
65+
*/
66+
_invokePullQueue(queueName) {
67+
return new Promise((resolve, reject) => {
68+
let resource = this.kernel.get('resource');
69+
70+
resource
71+
.get(this._pullQueueResourceId)
72+
.request({queueName,})
73+
.invokeAsync()
74+
.send((response) => {
75+
response.isError ? reject(response.error) : resolve();
76+
});
77+
});
78+
}
79+
80+
/**
81+
* @returns {String}
82+
*/
83+
get _pullQueueResourceId() {
84+
return `@${this.kernel.microservice().identifier}:ddb-eventual-consistency:pool-queue`;
85+
}
86+
87+
/**
88+
* @param {String} queueUrl
89+
* @returns {String}
90+
* @private
91+
*/
92+
_getRegionFromSqsQueueUrl(queueUrl) {
93+
let regionParts = queueUrl.match(/\.([^\.]+)\.amazonaws\.com\/.*/i);
94+
95+
if (!regionParts || regionParts.length === 0) {
96+
throw new Error(queueUrl, 'Unable to extract AWS region.');
97+
}
98+
99+
return regionParts[1];
100+
}
101+
102+
/**
103+
* @returns {Object[]}
104+
*/
105+
get _queues() {
106+
let result = [];
107+
let queues = this.kernel.config.dbOffloadQueues;
108+
109+
for (let modelName in queues) {
110+
if (!queues.hasOwnProperty(modelName)) {
111+
continue;
112+
}
113+
114+
let queueConfig = queues[modelName];
115+
let name = queueConfig.name;
116+
let url = queueConfig.url;
117+
let region = this._getRegionFromSqsQueueUrl(url);
118+
let sqs = new AWS.SQS({region,});
119+
120+
result.push({name, url, region, sqs,});
121+
}
122+
123+
return result;
124+
}
125+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
/**
2+
* Created by acucer on 3/17/16.
3+
*/
4+
5+
'use strict';
6+
7+
import DeepFramework from 'deep-framework';
8+
import Handler from './Handler';
9+
10+
export default DeepFramework.LambdaHandler(Handler);
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
{
2+
"name": "deep-system-ddb-eventual-consistency-listen-queues",
3+
"version": "0.0.1",
4+
"description": "Listen for SQS offload queues messages",
5+
"author": "Mitoc Group <hello@mitocgroup.com>",
6+
"contributors": [
7+
{
8+
"name": "Alexander Cucer",
9+
"email": "acucer@mitocgroup.com"
10+
}
11+
],
12+
"bugs": {
13+
"url": "https://github.com/MitocGroup/deep-microservices-root-angular/issues"
14+
},
15+
"keywords": [
16+
"DEEP",
17+
"Config",
18+
"SQS",
19+
"EventualConsistency",
20+
"DynamoDB"
21+
],
22+
"scripts": {
23+
"postinstall": "npm run compile",
24+
"compile": "deepify compile es6 `pwd`"
25+
},
26+
"dependencies": {
27+
"deep-framework": "^1.x.x"
28+
},
29+
"private": true
30+
}

src/deep-root-angular/backend/src/ddb-eventual-consistency/pool-queue/Handler.es6

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
1818

1919
/**
2020
* @param {*} request
21+
* @todo refactor for direct call only (avoid SQS->SNS invokations)
2122
*/
2223
handle(request) {
2324
let queueName = this._getQueueNameFromRequest(request);
@@ -28,6 +29,10 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
2829
this._poolQueueUntilEmpty(queueUrl, sqs, dynamo).then(() => {
2930
let cloudWatch = this._cloudWatch(queueUrl);
3031

32+
if (!this._isInvokedBySns(request)) {
33+
return this.createResponse({}).send();
34+
}
35+
3136
// @see deep-package-manager (queueName === alarmName)
3237
this._resetAlarmState(cloudWatch, queueName)
3338
.then(this.createResponse({}).send);
@@ -41,6 +46,7 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
4146
* @param {AWS.CloudWatch|*} cloudWatch
4247
* @param {String} alarmName
4348
* @returns {Promise|*}
49+
* @deprecated
4450
*/
4551
_resetAlarmState(cloudWatch, alarmName) {
4652
return new Promise(resolve => {
@@ -220,13 +226,27 @@ export default class extends DeepFramework.Core.AWS.Lambda.Runtime {
220226
return regionParts[1];
221227
}
222228

229+
/**
230+
* @param {*} request
231+
* @returns {Boolean}
232+
*/
233+
_isInvokedBySns(request) {
234+
return !!request.getParam('Records');
235+
}
236+
223237
/**
224238
* @param {*} request
225239
* @returns {String}
226240
* @private
227241
* @todo: pre-validate event data
228242
*/
229243
_getQueueNameFromRequest(request) {
244+
if (!this._isInvokedBySns(request)) {
245+
return request.getParam('queueName');
246+
}
247+
248+
// @deprecated left for back compatibility only
249+
// SQS event delivered through SNS topic
230250
return JSON.parse(request.getParam('Records')[0].Sns.Message).Trigger.Dimensions[0].value;
231251
}
232252

src/deep-root-angular/backend/src/ddb-eventual-consistency/pool-queue/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
}
1111
],
1212
"bugs": {
13-
"url": "https://github.com/MitocGroup/deep-microservices-root-angularjs/issues"
13+
"url": "https://github.com/MitocGroup/deep-microservices-root-angular/issues"
1414
},
1515
"keywords": [
1616
"DEEP",

src/deep-root-angular/backend/src/scheduler/rule/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
}
1111
],
1212
"bugs": {
13-
"url": "https://github.com/MitocGroup/deep-microservices-root-angularjs/issues"
13+
"url": "https://github.com/MitocGroup/deep-microservices-root-angular/issues"
1414
},
1515
"keywords": [
1616
"DEEP",

src/deep-root-angular/parameters.json

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,6 @@
1616
"default": "@deep-root-angular:img/favicon.ico",
1717
"example": "@deep-root-angular:img/favicon.ico"
1818
},
19-
"storage|eventualConsistency|offloaderEndpoint": {
20-
"displayName": "Eventual consistency offloading backend endpoint",
21-
"type": "string",
22-
"required": false,
23-
"pattern": "^@[^:]+:[^:]+:[^:]+$",
24-
"default": "@deep-root-angular:ddb-eventual-consistency:pool-queue",
25-
"example": "@deep-root-angular:ddb-eventual-consistency:pool-queue"
26-
},
2719
"pageLoader|src": {
2820
"displayName": "Page loader source",
2921
"type": "string",

0 commit comments

Comments
 (0)