Skip to content

Commit acc2ccd

Browse files
committed
Add support for tagging to SQS queues and SNS topics
1 parent e6c5bf9 commit acc2ccd

File tree

5 files changed

+124
-33
lines changed

5 files changed

+124
-33
lines changed

.releases/4.15.0.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
**New Features**
2+
3+
* Added support for tagging topics to the `SnsProvider` class.
4+
* Added support for tagging queues to the `SqsProvider` class.
5+
* Added support for tags to the `AwsPublisher` class (when creating SNS topics and SQS queues).
6+
* Added support for tags to the `AwsRouter` class (when creating SQS queues).

aws/SnsProvider.js

Lines changed: 42 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const aws = require('aws-sdk'),
33

44
const assert = require('@barchart/common-js/lang/assert'),
55
Disposable = require('@barchart/common-js/lang/Disposable'),
6+
is = require('@barchart/common-js/lang/is'),
67
object = require('@barchart/common-js/lang/object'),
78
promise = require('@barchart/common-js/lang/promise');
89

@@ -27,7 +28,7 @@ module.exports = (() => {
2728
constructor(configuration) {
2829
super();
2930

30-
assert.argumentIsRequired(configuration, 'configuration');
31+
assert.argumentIsRequired(configuration, 'configuration', Object);
3132
assert.argumentIsRequired(configuration.region, 'configuration.region', String);
3233
assert.argumentIsRequired(configuration.prefix, 'configuration.prefix', String);
3334
assert.argumentIsOptional(configuration.apiVersion, 'configuration.apiVersion', String);
@@ -94,13 +95,14 @@ module.exports = (() => {
9495

9596
/**
9697
* Given a topic's name, return Amazon's unique identifier for the topic
97-
* (i.e. the ARN). If no topc with the given name exists, it will be created.
98+
* (i.e. the ARN). If no topic with the given name exists, it will be created.
9899
*
99100
* @public
100-
* @param {string} topicName - The name of the topic to find.
101+
* @param {string} topicName - The name of the topic to find (or create).
102+
* @param {Object=} createOptions - Options to use when topic does not exist and must be created.
101103
* @returns {Promise<String>}
102104
*/
103-
getTopicArn(topicName) {
105+
getTopicArn(topicName, createOptions) {
104106
return Promise.resolve()
105107
.then(() => {
106108
assert.argumentIsRequired(topicName, 'topicName', String);
@@ -112,7 +114,13 @@ module.exports = (() => {
112114
if (!this._topicPromises.hasOwnProperty(qualifiedTopicName)) {
113115
logger.debug('The SnsProvider has not cached the topic ARN. Issuing request to create topic');
114116

115-
this._topicPromises[qualifiedTopicName] = this.createTopic(topicName);
117+
let tags = null;
118+
119+
if (createOptions && createOptions.tags) {
120+
tags = createOptions.tags;
121+
}
122+
123+
this._topicPromises[qualifiedTopicName] = this.createTopic(topicName, tags);
116124
}
117125

118126
return this._topicPromises[qualifiedTopicName];
@@ -125,12 +133,14 @@ module.exports = (() => {
125133
*
126134
* @public
127135
* @param {string} topicName - The name of the topic to create.
136+
* @param {Object=} tags - Tags to assign to the topic.
128137
* @returns {Promise<String>}
129138
*/
130-
createTopic(topicName) {
139+
createTopic(topicName, tags) {
131140
return Promise.resolve()
132141
.then(() => {
133142
assert.argumentIsRequired(topicName, 'topicName', String);
143+
assert.argumentIsOptional(tags, 'tags', Object);
134144

135145
checkReady.call(this);
136146

@@ -140,9 +150,30 @@ module.exports = (() => {
140150

141151
logger.debug('Creating SNS topic [', qualifiedTopicName, ']');
142152

143-
this._sns.createTopic({
153+
const payload = {
144154
Name: qualifiedTopicName
145-
}, (error, data) => {
155+
};
156+
157+
if (is.object(tags)) {
158+
const keys = object.keys(tags);
159+
160+
const t = keys.reduce((accumulator, key) => {
161+
const tag = { };
162+
163+
tag.Key = key;
164+
tag.Value = tags[key];
165+
166+
accumulator.push(tag);
167+
168+
return accumulator;
169+
}, [ ]);
170+
171+
if (t.length > 0) {
172+
payload.Tags = t;
173+
}
174+
}
175+
176+
this._sns.createTopic(payload, (error, data) => {
146177
if (error === null) {
147178
logger.info('SNS topic created [', qualifiedTopicName, ']');
148179

@@ -227,17 +258,18 @@ module.exports = (() => {
227258
* @public
228259
* @param {string} topicName - The name of the topic to publish to.
229260
* @param {Object} payload - The message to publish (which will be serialized as JSON).
261+
* @param {Object=} createOptions - Options to use when topic does not exist and must be created.
230262
* @returns {Promise}
231263
*/
232-
publish(topicName, payload) {
264+
publish(topicName, payload, createOptions) {
233265
return Promise.resolve()
234266
.then(() => {
235267
assert.argumentIsRequired(topicName, 'topicName', String);
236268
assert.argumentIsRequired(payload, 'payload', Object);
237269

238270
checkReady.call(this);
239271

240-
return this.getTopicArn(topicName)
272+
return this.getTopicArn(topicName, createOptions)
241273
.then((topicArn) => {
242274
const qualifiedTopicName = getQualifiedTopicName(this._configuration.prefix, topicName);
243275

aws/SqsProvider.js

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ module.exports = (() => {
3030
constructor(configuration) {
3131
super();
3232

33-
assert.argumentIsRequired(configuration, 'configuration');
33+
assert.argumentIsRequired(configuration, 'configuration', Object);
3434
assert.argumentIsRequired(configuration.region, 'configuration.region', String);
3535
assert.argumentIsRequired(configuration.prefix, 'configuration.prefix', String);
3636
assert.argumentIsOptional(configuration.apiVersion, 'configuration.apiVersion', String);
@@ -148,9 +148,10 @@ module.exports = (() => {
148148
*
149149
* @public
150150
* @param {string} queueName - The name of the queue to find.
151+
* @param {Object=} createOptions - Options to use when queue does not exist and must be created.
151152
* @returns {Promise<String>}
152153
*/
153-
getQueueUrl(queueName) {
154+
getQueueUrl(queueName, createOptions) {
154155
return Promise.resolve()
155156
.then(() => {
156157
assert.argumentIsRequired(queueName, 'queueName', String);
@@ -168,7 +169,19 @@ module.exports = (() => {
168169
if (!this._queueUrlPromises.hasOwnProperty(qualifiedQueueName)) {
169170
logger.debug('The SqsProvider has not cached the queue URL. Issuing request to create queue.');
170171

171-
this._queueUrlPromises[qualifiedQueueName] = this.createQueue(queueName);
172+
let retentionTime = null;
173+
174+
if (createOptions && is.number(createOptions.retentionTime)) {
175+
retentionTime = createOptions.retentionTime;
176+
}
177+
178+
let tags = null;
179+
180+
if (createOptions && is.object(createOptions.tags)) {
181+
tags = createOptions.tags;
182+
}
183+
184+
this._queueUrlPromises[qualifiedQueueName] = this.createQueue(queueName, retentionTime, tags);
172185
}
173186

174187
return this._queueUrlPromises[qualifiedQueueName];
@@ -230,9 +243,10 @@ module.exports = (() => {
230243
*
231244
* @public
232245
* @param {string} queueName - The name of the queue to find.
246+
* @param {Object=} createOptions - Options to use when queue does not exist and must be created.
233247
* @returns {Promise<String>}
234248
*/
235-
getQueueArn(queueName) {
249+
getQueueArn(queueName, createOptions) {
236250
return Promise.resolve()
237251
.then(() => {
238252
assert.argumentIsRequired(queueName, 'queueName', String);
@@ -248,7 +262,7 @@ module.exports = (() => {
248262
const qualifiedQueueName = getQualifiedQueueName(this._configuration.prefix, queueName);
249263

250264
if (!this._queueArnPromises.hasOwnProperty(qualifiedQueueName)) {
251-
this._queueArnPromises[qualifiedQueueName] = this.getQueueUrl(queueName)
265+
this._queueArnPromises[qualifiedQueueName] = this.getQueueUrl(queueName, createOptions)
252266
.then((queueUrl) => {
253267
return promise.build(
254268
(resolveCallback, rejectCallback) => {
@@ -286,13 +300,15 @@ module.exports = (() => {
286300
* @public
287301
* @param {string} queueName - The name of the queue to create.
288302
* @param {Number=} retentionTime - The length of time a queue will retain a message in seconds.
303+
* @param {Object=} tags - Tags to assign to the queue.
289304
* @returns {Promise<String>}
290305
*/
291-
createQueue(queueName, retentionTime) {
306+
createQueue(queueName, retentionTime, tags) {
292307
return Promise.resolve()
293308
.then(() => {
294309
assert.argumentIsRequired(queueName, 'queueName', String);
295310
assert.argumentIsOptional(retentionTime, 'retentionTime', Number);
311+
assert.argumentIsOptional(tags, 'tags', Object);
296312

297313
if (this.getIsDisposed()) {
298314
throw new Error('The SqsProvider has been disposed.');
@@ -318,6 +334,25 @@ module.exports = (() => {
318334
};
319335
}
320336

337+
if (is.object(tags)) {
338+
const keys = object.keys(tags);
339+
340+
const t = keys.reduce((accumulator, key) => {
341+
const tag = { };
342+
343+
tag.Key = key;
344+
tag.Value = tags[key];
345+
346+
accumulator.push(tag);
347+
348+
return accumulator;
349+
}, [ ]);
350+
351+
if (t.length > 0) {
352+
payload.Tags = t;
353+
}
354+
}
355+
321356
this._sqs.createQueue(payload, (error, data) => {
322357
if (error === null) {
323358
logger.info('Queue created [', qualifiedQueueName, ']');
@@ -400,9 +435,10 @@ module.exports = (() => {
400435
* @param {string} queueName - The name of the queue to add the message to.
401436
* @param {Object} payload - The message to enqueue (will be serialized to JSON).
402437
* @param {Number=} delaySeconds - The number of seconds to prevent message from being retrieved from the queue.
438+
* @param {Object=} createOptions - Options to use when queue does not exist and must be created.
403439
* @returns {Promise}
404440
*/
405-
send(queueName, payload, delaySeconds) {
441+
send(queueName, payload, delaySeconds, createOptions) {
406442
return Promise.resolve()
407443
.then(() => {
408444
assert.argumentIsRequired(queueName, 'queueName', String);
@@ -417,7 +453,7 @@ module.exports = (() => {
417453
throw new Error('The SqsProvider has not been started.');
418454
}
419455

420-
return this.getQueueUrl(queueName)
456+
return this.getQueueUrl(queueName, createOptions)
421457
.then((queueUrl) => {
422458
return promise.build(
423459
(resolveCallback, rejectCallback) => {
@@ -462,9 +498,10 @@ module.exports = (() => {
462498
* @public
463499
* @param {string} queueName - The name of the queue to add the message to.
464500
* @param {Object[]} batch - The messages to enqueue (each will be serialized to JSON).
501+
* @param {Object=} createOptions - Options to use when queue does not exist and must be created.
465502
* @returns {Promise}
466503
*/
467-
sendBatch(queueName, batch) {
504+
sendBatch(queueName, batch, createOptions) {
468505
return Promise.resolve()
469506
.then(() => {
470507
assert.argumentIsRequired(queueName, 'queueName', String);
@@ -486,7 +523,7 @@ module.exports = (() => {
486523
throw new Error('The SqsProvider has not been started.');
487524
}
488525

489-
return this.getQueueUrl(queueName)
526+
return this.getQueueUrl(queueName, createOptions)
490527
.then((queueUrl) => {
491528
return promise.build(
492529
(resolveCallback, rejectCallback) => {
@@ -674,9 +711,10 @@ module.exports = (() => {
674711
* @param {Number=} pollInterval - The milliseconds to wait between polling the queue.
675712
* @param {Number=} pollDuration - The maximum amount of time the server-side long-poll will wait for messages to become available.
676713
* @param {Number=} maximumMessages - The maximum number of messages to read per request (cannot be more than 10).
714+
* @param {Object=} createOptions - Options to use when queue does not exist and must be created.
677715
* @returns {Disposable}
678716
*/
679-
observe(queueName, callback, pollInterval, pollDuration, batchSize) {
717+
observe(queueName, callback, pollInterval, pollDuration, batchSize, createOptions) {
680718
assert.argumentIsRequired(queueName, 'queueName', String);
681719
assert.argumentIsRequired(callback, 'callback', Function);
682720
assert.argumentIsOptional(pollInterval, 'pollInterval', Number);
@@ -716,7 +754,7 @@ module.exports = (() => {
716754

717755
let delay;
718756

719-
receiveMessages.call(this, queueName, pollDuration, batchSize, false)
757+
receiveMessages.call(this, queueName, pollDuration, batchSize, false, createOptions)
720758
.then((messages) => {
721759
return Promise.all(messages.map((message) => {
722760
if (disposed) {
@@ -849,7 +887,7 @@ module.exports = (() => {
849887
}
850888
}
851889

852-
function receiveMessages(queueName, waitTime, maximumMessages, synchronousDelete) {
890+
function receiveMessages(queueName, waitTime, maximumMessages, synchronousDelete, createOptions) {
853891
if (this.getIsDisposed()) {
854892
throw new Error('The SqsProvider has been disposed.');
855893
}
@@ -878,7 +916,7 @@ module.exports = (() => {
878916
maximumMessagesToUse = 1;
879917
}
880918

881-
return this.getQueueUrl(queueName)
919+
return this.getQueueUrl(queueName, createOptions)
882920
.then((queueUrl) => {
883921
return promise.build(
884922
(resolveCallback, rejectCallback) => {

messaging/publishers/AwsPublisher.js

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@ module.exports = (() => {
1818
const logger = log4js.getLogger('common-node/messaging/publishers/AwsPublisher');
1919

2020
class AwsPublisher extends Publisher {
21-
constructor(snsProvider, sqsProvider, suppressEcho, suppressExpressions) {
21+
constructor(snsProvider, sqsProvider, suppressEcho, suppressExpressions, tags) {
2222
super(suppressExpressions);
2323

2424
assert.argumentIsRequired(snsProvider, 'snsProvider', SnsProvider, 'SnsProvider');
2525
assert.argumentIsRequired(sqsProvider, 'sqsProvider', SqsProvider, 'SqsProvider');
2626
assert.argumentIsOptional(suppressEcho, 'suppressEcho', Boolean);
27+
assert.argumentIsOptional(tags, 'tags', Object);
2728

2829
this._snsProvider = snsProvider;
2930
this._sqsProvider = sqsProvider;
@@ -33,6 +34,13 @@ module.exports = (() => {
3334
this._publisherId = uuid.v4();
3435

3536
this._subscriptionPromises = {};
37+
38+
this._createOptions = null;
39+
40+
if (tags) {
41+
this._createOptions = { };
42+
this._createOptions.tags = tags;
43+
}
3644
}
3745

3846
_start() {
@@ -60,7 +68,7 @@ module.exports = (() => {
6068
logger.debug('Publishing message to AWS:', topic);
6169
logger.trace(payload);
6270

63-
return this._snsProvider.publish(topic, envelope);
71+
return this._snsProvider.publish(topic, envelope, this._createOptions);
6472
}
6573

6674
_subscribe(messageType, handler) {
@@ -80,8 +88,8 @@ module.exports = (() => {
8088
subscriptionStack.push(subscriptionEvent);
8189

8290
this._subscriptionPromises[topic] = Promise.all([
83-
this._snsProvider.getTopicArn(topic),
84-
this._sqsProvider.getQueueArn(subscriptionQueueName)
91+
this._snsProvider.getTopicArn(topic, this._createOptions),
92+
this._sqsProvider.getQueueArn(subscriptionQueueName, this._createOptions)
8593
]).then((resultGroup) => {
8694
const topicArn = resultGroup[0];
8795
const queueArn = resultGroup[1];

0 commit comments

Comments
 (0)