From a2b77167d1e5fb62d9fea025e1dd151d86e41124 Mon Sep 17 00:00:00 2001 From: theburningmonk Date: Sun, 3 Nov 2019 11:03:19 +0000 Subject: [PATCH] feat: added send-to-sns command Closes #38 --- README.md | 21 ++++- package-lock.json | 22 ++++++ package.json | 1 + src/commands/send-to-sns.js | 125 ++++++++++++++++++++++++++++++ src/commands/tail-sns.js | 24 +----- src/lib/sns.js | 28 +++++++ test/commands/send-to-sns.test.js | 86 ++++++++++++++++++++ test/test_sns_input.txt | 6 ++ 8 files changed, 289 insertions(+), 24 deletions(-) create mode 100644 src/commands/send-to-sns.js create mode 100644 src/lib/sns.js create mode 100644 test/commands/send-to-sns.test.js create mode 100644 test/test_sns_input.txt diff --git a/README.md b/README.md index 91ffc5d..a1c7d64 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ USAGE * [`lumigo-cli list-lambda`](#lumigo-cli-list-lambda) * [`lumigo-cli powertune-lambda`](#lumigo-cli-powertune-lambda) * [`lumigo-cli replay-sqs-dlq`](#lumigo-cli-replay-sqs-dlq) +* [`lumigo-cli send-to-sns`](#lumigo-cli-send-to-sns) * [`lumigo-cli send-to-sqs`](#lumigo-cli-send-to-sqs) * [`lumigo-cli sls-remove`](#lumigo-cli-sls-remove) * [`lumigo-cli switch-profile`](#lumigo-cli-switch-profile) @@ -188,6 +189,24 @@ OPTIONS _See code: [src/commands/replay-sqs-dlq.js](https://github.com/lumigo-io/lumigo-cli/blob/v0.24.0/src/commands/replay-sqs-dlq.js)_ +## `lumigo-cli send-to-sns` + +Sends each line in the specified file as a message to a SNS topic + +``` +USAGE + $ lumigo-cli send-to-sns + +OPTIONS + -c, --concurrency=concurrency [default: 10] how many concurrent pollers to run + -f, --filePath=filePath (required) path to the file + -n, --topicName=topicName (required) name of the SNS topic, e.g. my-topic-dev + -p, --profile=profile AWS CLI profile name + -r, --region=region (required) AWS region, e.g. us-east-1 +``` + +_See code: [src/commands/send-to-sns.js](https://github.com/lumigo-io/lumigo-cli/blob/v0.24.0/src/commands/send-to-sns.js)_ + ## `lumigo-cli send-to-sqs` Sends each line in the specified file as a message to a SQS queue @@ -197,7 +216,7 @@ USAGE $ lumigo-cli send-to-sqs OPTIONS - -f, --filePath=filePath (required) relative to the file with the messages + -f, --filePath=filePath (required) path to the file -n, --queueName=queueName (required) name of the SQS queue, e.g. task-queue-dev -p, --profile=profile AWS CLI profile name -r, --region=region (required) AWS region, e.g. us-east-1 diff --git a/package-lock.json b/package-lock.json index 731381f..e2258d3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -4475,6 +4475,11 @@ "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "integrity": "sha1-Qa4u62XvpiJorr/qg6x9eSmbCIc=" }, + "eventemitter3": { + "version": "4.0.0", + "resolved": "https://registry.npmjs.org/eventemitter3/-/eventemitter3-4.0.0.tgz", + "integrity": "sha512-qerSRB0p+UDEssxTtm6EDKcE7W4OaoisfIMl4CngyEhjpYglocpNg6UEqCvemdGhosAsg4sO2dXJOdyBifPGCg==" + }, "events": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/events/-/events-1.1.1.tgz", @@ -12521,6 +12526,15 @@ "aggregate-error": "^3.0.0" } }, + "p-queue": { + "version": "6.2.0", + "resolved": "https://registry.npmjs.org/p-queue/-/p-queue-6.2.0.tgz", + "integrity": "sha512-B2LXNONcyn/G6uz2UBFsGjmSa0e/br3jznlzhEyCXg56c7VhEpiT2pZxGOfv32Q3FSyugAdys9KGpsv3kV+Sbg==", + "requires": { + "eventemitter3": "^4.0.0", + "p-timeout": "^3.1.0" + } + }, "p-reduce": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/p-reduce/-/p-reduce-1.0.0.tgz", @@ -12537,6 +12551,14 @@ "retry": "^0.12.0" } }, + "p-timeout": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", + "integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==", + "requires": { + "p-finally": "^1.0.0" + } + }, "p-try": { "version": "2.2.0", "resolved": "https://registry.npmjs.org/p-try/-/p-try-2.2.0.tgz", diff --git a/package.json b/package.json index 3da4df3..8d7c520 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "line-reader": "^0.4.0", "lodash": "^4.17.15", "moment": "^2.24.0", + "p-queue": "^6.2.0", "restify": "^8.4.0", "semver": "^6.3.0", "uuid": "^3.3.3" diff --git a/src/commands/send-to-sns.js b/src/commands/send-to-sns.js new file mode 100644 index 0000000..4111e69 --- /dev/null +++ b/src/commands/send-to-sns.js @@ -0,0 +1,125 @@ +const Promise = require("bluebird"); +const _ = require("lodash"); +const {default: PQueue} = require("p-queue"); +const lineReader = require("line-reader"); +const { getAWSSDK } = require("../lib/aws"); +const { getTopicArn } = require("../lib/sns"); +const { Command, flags } = require("@oclif/command"); +const { checkVersion } = require("../lib/version-check"); +require("colors"); + +class SendToSnsCommand extends Command { + async run() { + const { flags } = this.parse(SendToSnsCommand); + const { topicName, region, profile, filePath, concurrency } = flags; + + global.region = region; + global.profile = profile; + + checkVersion(); + + this.log(`finding the topic [${topicName}] in [${region}]`); + const topicArn = await getTopicArn(topicName); + + this.log("sending messages..."); + console.time("execution time"); + await sendMessages(filePath, topicArn, concurrency); + + this.log("all done!"); + console.timeEnd("execution time"); + } +} + +SendToSnsCommand.description = + "Sends each line in the specified file as a message to a SNS topic"; +SendToSnsCommand.flags = { + topicName: flags.string({ + char: "n", + description: "name of the SNS topic, e.g. my-topic-dev", + required: true + }), + region: flags.string({ + char: "r", + description: "AWS region, e.g. us-east-1", + required: true + }), + profile: flags.string({ + char: "p", + description: "AWS CLI profile name", + required: false + }), + filePath: flags.string({ + char: "f", + description: "path to the file", + required: true + }), + concurrency: flags.integer({ + char: "c", + description: "how many concurrent pollers to run", + required: false, + default: 10 + }) +}; + +const sendMessages = (filePath, topicArn, concurrency) => { + const AWS = getAWSSDK(); + const SNS = new AWS.SNS(); + const queue = new PQueue({ concurrency }); + + let processedCount = 0; + + const printProgress = (count, last = false) => { + process.stdout.clearLine(); + process.stdout.cursorTo(0); + process.stdout.write(`sent ${count} messages`); + + if (last) { + process.stdout.write("\n"); + } + }; + + const publish = async (line) => { + try { + await SNS.publish({ + Message: line, + TopicArn: topicArn + }).promise(); + } catch (err) { + console.log(`\n${err.message.bold.bgWhite.red}`); + console.log(line); + } + }; + + const add = (line, last = false) => { + queue.add(() => publish(line)); + processedCount += 1; + printProgress(processedCount, last); + }; + + return new Promise((resolve) => { + lineReader.eachLine(filePath, function(line, last, cb) { + if (_.isEmpty(line)) { + cb(); + } else if (last) { + add(line, true); + queue.onEmpty().then(() => { + cb(); + resolve(); + }); + } else if (processedCount % 100 === 0) { + // to avoid overloading the queue and run of memory, + // also, to avoid throttling as well, + // wait for the queue to empty every after 100 messages + queue.onEmpty().then(() => { + add(line); + cb(); + }); + } else { + add(line); + cb(); + } + }); + }); +}; + +module.exports = SendToSnsCommand; diff --git a/src/commands/tail-sns.js b/src/commands/tail-sns.js index 268c53e..cf4c3e6 100644 --- a/src/commands/tail-sns.js +++ b/src/commands/tail-sns.js @@ -1,5 +1,6 @@ const _ = require("lodash"); const { getAWSSDK } = require("../lib/aws"); +const { getTopicArn } = require("../lib/sns"); const { Command, flags } = require("@oclif/command"); const { checkVersion } = require("../lib/version-check"); const uuid = require("uuid/v4"); @@ -40,29 +41,6 @@ TailSnsCommand.flags = { }) }; -const getTopicArn = async topicName => { - const AWS = getAWSSDK(); - const SNS = new AWS.SNS(); - const loop = async nextToken => { - const resp = await SNS.listTopics({ - NextToken: nextToken - }).promise(); - - const matchingTopic = resp.Topics.find(x => x.TopicArn.endsWith(":" + topicName)); - if (matchingTopic) { - return matchingTopic.TopicArn; - } - - if (resp.NextToken) { - return await loop(resp.NextToken); - } else { - throw new Error(`cannot find the SNS topic [${topicName}]!`); - } - }; - - return loop(); -}; - const createQueue = async topicArn => { const AWS = getAWSSDK(); const SQS = new AWS.SQS(); diff --git a/src/lib/sns.js b/src/lib/sns.js new file mode 100644 index 0000000..3e7db8a --- /dev/null +++ b/src/lib/sns.js @@ -0,0 +1,28 @@ +const { getAWSSDK } = require("../lib/aws"); + +const getTopicArn = async topicName => { + const AWS = getAWSSDK(); + const SNS = new AWS.SNS(); + const loop = async nextToken => { + const resp = await SNS.listTopics({ + NextToken: nextToken + }).promise(); + + const matchingTopic = resp.Topics.find(x => x.TopicArn.endsWith(":" + topicName)); + if (matchingTopic) { + return matchingTopic.TopicArn; + } + + if (resp.NextToken) { + return await loop(resp.NextToken); + } else { + throw new Error(`cannot find the SNS topic [${topicName}]!`); + } + }; + + return loop(); +}; + +module.exports = { + getTopicArn +}; diff --git a/test/commands/send-to-sns.test.js b/test/commands/send-to-sns.test.js new file mode 100644 index 0000000..b36377a --- /dev/null +++ b/test/commands/send-to-sns.test.js @@ -0,0 +1,86 @@ +const _ = require("lodash"); +const {expect, test} = require("@oclif/test"); +const AWS = require("aws-sdk"); + +const mockListTopics = jest.fn(); +AWS.SNS.prototype.listTopics = mockListTopics; +const mockPublish = jest.fn(); +AWS.SNS.prototype.publish = mockPublish; + +const consoleLog = jest.fn(); +console.log = consoleLog; +process.stdout.clearLine = jest.fn(); +process.stdout.cursorTo = jest.fn(); + +beforeEach(() => { + mockListTopics.mockReset(); + mockPublish.mockReset(); + consoleLog.mockReset(); + + mockListTopics.mockReturnValue({ + promise: () => Promise.resolve({ + Topics: [{ TopicArn: "arn:aws:sns:us-east-1:12345:my-topic" }] + }) + }); +}); + +describe("send-to-sns", () => { + describe("when there are no failures", () => { + beforeEach(() => { + givenPublishAlwaysReturns(); + }); + + test + .stdout() + .command(["send-to-sns", "-n", "my-topic", "-r", "us-east-1", "-f", "test/test_sns_input.txt"]) + .it("sends all the file's content to sns", ctx => { + expect(ctx.stdout).to.contain("all done!"); + + // there's a total of 5 messages + expect(mockPublish.mock.calls).to.have.lengthOf(5); + const messages = _ + .flatMap(mockPublish.mock.calls, calls => calls) + .map(x => x.Message); + expect(messages).to.have.lengthOf(5); + _.range(1, 6).forEach(n => { + expect(messages).to.contain(`message ${n}`); + }); + }); + }); + + describe("when there are failures", () => { + beforeEach(() => { + givenPublishFails(new Error("boom!")); + givenPublishAlwaysReturns(); + }); + + test + .stdout() + .command(["send-to-sns", "-n", "my-topic", "-r", "us-east-1", "-f", "test/test_sns_input.txt"]) + .it("reports the failed messages", ctx => { + expect(ctx.stdout).to.contain("all done!"); + + // there's a total of 5 messages + expect(mockPublish.mock.calls).to.have.lengthOf(5); + const messages = _ + .flatMap(mockPublish.mock.calls, calls => calls) + .map(x => x.Message); + expect(messages).to.have.lengthOf(5); + + const logMessages = _.flatMap(consoleLog.mock.calls, call => call).join("\n"); + expect(logMessages).to.contain("boom!"); + }); + }); +}); + +function givenPublishAlwaysReturns() { + mockPublish.mockReturnValue({ + promise: () => Promise.resolve({}) + }); +}; + +function givenPublishFails(error) { + mockPublish.mockReturnValueOnce({ + promise: () => Promise.reject(error) + }); +}; diff --git a/test/test_sns_input.txt b/test/test_sns_input.txt new file mode 100644 index 0000000..4a0d4b2 --- /dev/null +++ b/test/test_sns_input.txt @@ -0,0 +1,6 @@ +message 1 +message 2 +message 3 +message 4 + +message 5