Skip to content

Commit

Permalink
feat: added send-to-sns command
Browse files Browse the repository at this point in the history
Closes #38
  • Loading branch information
theburningmonk committed Nov 3, 2019
1 parent 41f4684 commit a2b7716
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 24 deletions.
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ USAGE
* [`lumigo-cli list-lambda`](#lumigo-cli-list-lambda)
* [`lumigo-cli powertune-lambda`](#lumigo-cli-powertune-lambda)
* [`lumigo-cli replay-sqs-dlq`](#lumigo-cli-replay-sqs-dlq)
* [`lumigo-cli send-to-sns`](#lumigo-cli-send-to-sns)
* [`lumigo-cli send-to-sqs`](#lumigo-cli-send-to-sqs)
* [`lumigo-cli sls-remove`](#lumigo-cli-sls-remove)
* [`lumigo-cli switch-profile`](#lumigo-cli-switch-profile)
Expand Down Expand Up @@ -188,6 +189,24 @@ OPTIONS

_See code: [src/commands/replay-sqs-dlq.js](https://github.com/lumigo-io/lumigo-cli/blob/v0.24.0/src/commands/replay-sqs-dlq.js)_

## `lumigo-cli send-to-sns`

Sends each line in the specified file as a message to a SNS topic

```
USAGE
$ lumigo-cli send-to-sns
OPTIONS
-c, --concurrency=concurrency [default: 10] how many concurrent pollers to run
-f, --filePath=filePath (required) path to the file
-n, --topicName=topicName (required) name of the SNS topic, e.g. my-topic-dev
-p, --profile=profile AWS CLI profile name
-r, --region=region (required) AWS region, e.g. us-east-1
```

_See code: [src/commands/send-to-sns.js](https://github.com/lumigo-io/lumigo-cli/blob/v0.24.0/src/commands/send-to-sns.js)_

## `lumigo-cli send-to-sqs`

Sends each line in the specified file as a message to a SQS queue
Expand All @@ -197,7 +216,7 @@ USAGE
$ lumigo-cli send-to-sqs
OPTIONS
-f, --filePath=filePath (required) relative to the file with the messages
-f, --filePath=filePath (required) path to the file
-n, --queueName=queueName (required) name of the SQS queue, e.g. task-queue-dev
-p, --profile=profile AWS CLI profile name
-r, --region=region (required) AWS region, e.g. us-east-1
Expand Down
22 changes: 22 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"line-reader": "^0.4.0",
"lodash": "^4.17.15",
"moment": "^2.24.0",
"p-queue": "^6.2.0",
"restify": "^8.4.0",
"semver": "^6.3.0",
"uuid": "^3.3.3"
Expand Down
125 changes: 125 additions & 0 deletions src/commands/send-to-sns.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
const Promise = require("bluebird");
const _ = require("lodash");
const {default: PQueue} = require("p-queue");
const lineReader = require("line-reader");
const { getAWSSDK } = require("../lib/aws");
const { getTopicArn } = require("../lib/sns");
const { Command, flags } = require("@oclif/command");
const { checkVersion } = require("../lib/version-check");
require("colors");

class SendToSnsCommand extends Command {
async run() {
const { flags } = this.parse(SendToSnsCommand);
const { topicName, region, profile, filePath, concurrency } = flags;

global.region = region;
global.profile = profile;

checkVersion();

this.log(`finding the topic [${topicName}] in [${region}]`);
const topicArn = await getTopicArn(topicName);

this.log("sending messages...");
console.time("execution time");
await sendMessages(filePath, topicArn, concurrency);

this.log("all done!");
console.timeEnd("execution time");
}
}

SendToSnsCommand.description =
"Sends each line in the specified file as a message to a SNS topic";
SendToSnsCommand.flags = {
topicName: flags.string({
char: "n",
description: "name of the SNS topic, e.g. my-topic-dev",
required: true
}),
region: flags.string({
char: "r",
description: "AWS region, e.g. us-east-1",
required: true
}),
profile: flags.string({
char: "p",
description: "AWS CLI profile name",
required: false
}),
filePath: flags.string({
char: "f",
description: "path to the file",
required: true
}),
concurrency: flags.integer({
char: "c",
description: "how many concurrent pollers to run",
required: false,
default: 10
})
};

const sendMessages = (filePath, topicArn, concurrency) => {
const AWS = getAWSSDK();
const SNS = new AWS.SNS();
const queue = new PQueue({ concurrency });

let processedCount = 0;

const printProgress = (count, last = false) => {
process.stdout.clearLine();
process.stdout.cursorTo(0);
process.stdout.write(`sent ${count} messages`);

if (last) {
process.stdout.write("\n");
}
};

const publish = async (line) => {
try {
await SNS.publish({
Message: line,
TopicArn: topicArn
}).promise();
} catch (err) {
console.log(`\n${err.message.bold.bgWhite.red}`);
console.log(line);
}
};

const add = (line, last = false) => {
queue.add(() => publish(line));
processedCount += 1;
printProgress(processedCount, last);
};

return new Promise((resolve) => {
lineReader.eachLine(filePath, function(line, last, cb) {
if (_.isEmpty(line)) {
cb();
} else if (last) {
add(line, true);
queue.onEmpty().then(() => {
cb();
resolve();
});
} else if (processedCount % 100 === 0) {
// to avoid overloading the queue and run of memory,
// also, to avoid throttling as well,
// wait for the queue to empty every after 100 messages
queue.onEmpty().then(() => {
add(line);
cb();
});
} else {
add(line);
cb();
}
});
});
};

module.exports = SendToSnsCommand;
24 changes: 1 addition & 23 deletions src/commands/tail-sns.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
const _ = require("lodash");
const { getAWSSDK } = require("../lib/aws");
const { getTopicArn } = require("../lib/sns");
const { Command, flags } = require("@oclif/command");
const { checkVersion } = require("../lib/version-check");
const uuid = require("uuid/v4");
Expand Down Expand Up @@ -40,29 +41,6 @@ TailSnsCommand.flags = {
})
};

const getTopicArn = async topicName => {
const AWS = getAWSSDK();
const SNS = new AWS.SNS();
const loop = async nextToken => {
const resp = await SNS.listTopics({
NextToken: nextToken
}).promise();

const matchingTopic = resp.Topics.find(x => x.TopicArn.endsWith(":" + topicName));
if (matchingTopic) {
return matchingTopic.TopicArn;
}

if (resp.NextToken) {
return await loop(resp.NextToken);
} else {
throw new Error(`cannot find the SNS topic [${topicName}]!`);
}
};

return loop();
};

const createQueue = async topicArn => {
const AWS = getAWSSDK();
const SQS = new AWS.SQS();
Expand Down
28 changes: 28 additions & 0 deletions src/lib/sns.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
const { getAWSSDK } = require("../lib/aws");

const getTopicArn = async topicName => {
const AWS = getAWSSDK();
const SNS = new AWS.SNS();
const loop = async nextToken => {
const resp = await SNS.listTopics({
NextToken: nextToken
}).promise();

const matchingTopic = resp.Topics.find(x => x.TopicArn.endsWith(":" + topicName));
if (matchingTopic) {
return matchingTopic.TopicArn;
}

if (resp.NextToken) {
return await loop(resp.NextToken);
} else {
throw new Error(`cannot find the SNS topic [${topicName}]!`);
}
};

return loop();
};

module.exports = {
getTopicArn
};
86 changes: 86 additions & 0 deletions test/commands/send-to-sns.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const _ = require("lodash");
const {expect, test} = require("@oclif/test");
const AWS = require("aws-sdk");

const mockListTopics = jest.fn();
AWS.SNS.prototype.listTopics = mockListTopics;
const mockPublish = jest.fn();
AWS.SNS.prototype.publish = mockPublish;

const consoleLog = jest.fn();
console.log = consoleLog;
process.stdout.clearLine = jest.fn();
process.stdout.cursorTo = jest.fn();

beforeEach(() => {
mockListTopics.mockReset();
mockPublish.mockReset();
consoleLog.mockReset();

mockListTopics.mockReturnValue({
promise: () => Promise.resolve({
Topics: [{ TopicArn: "arn:aws:sns:us-east-1:12345:my-topic" }]
})
});
});

describe("send-to-sns", () => {
describe("when there are no failures", () => {
beforeEach(() => {
givenPublishAlwaysReturns();
});

test
.stdout()
.command(["send-to-sns", "-n", "my-topic", "-r", "us-east-1", "-f", "test/test_sns_input.txt"])
.it("sends all the file's content to sns", ctx => {
expect(ctx.stdout).to.contain("all done!");

// there's a total of 5 messages
expect(mockPublish.mock.calls).to.have.lengthOf(5);
const messages = _
.flatMap(mockPublish.mock.calls, calls => calls)
.map(x => x.Message);
expect(messages).to.have.lengthOf(5);
_.range(1, 6).forEach(n => {
expect(messages).to.contain(`message ${n}`);
});
});
});

describe("when there are failures", () => {
beforeEach(() => {
givenPublishFails(new Error("boom!"));
givenPublishAlwaysReturns();
});

test
.stdout()
.command(["send-to-sns", "-n", "my-topic", "-r", "us-east-1", "-f", "test/test_sns_input.txt"])
.it("reports the failed messages", ctx => {
expect(ctx.stdout).to.contain("all done!");

// there's a total of 5 messages
expect(mockPublish.mock.calls).to.have.lengthOf(5);
const messages = _
.flatMap(mockPublish.mock.calls, calls => calls)
.map(x => x.Message);
expect(messages).to.have.lengthOf(5);

const logMessages = _.flatMap(consoleLog.mock.calls, call => call).join("\n");
expect(logMessages).to.contain("boom!");
});
});
});

function givenPublishAlwaysReturns() {
mockPublish.mockReturnValue({
promise: () => Promise.resolve({})
});
};

function givenPublishFails(error) {
mockPublish.mockReturnValueOnce({
promise: () => Promise.reject(error)
});
};
6 changes: 6 additions & 0 deletions test/test_sns_input.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
message 1
message 2
message 3
message 4

message 5

0 comments on commit a2b7716

Please sign in to comment.