Skip to content

Commit

Permalink
posting events control per logStream instead of global (#112)
Browse files Browse the repository at this point in the history
  • Loading branch information
onhate authored and lazywithclass committed Sep 2, 2019
1 parent 39b5709 commit 1188bfe
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
18 changes: 10 additions & 8 deletions lib/cloudwatch-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,23 @@ var find = require('lodash.find'),
async = require('async'),
debug = require('./utils').debug;

var lib = {};
var lib = {
_postingEvents: {}
};

lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, cb) {
debug('upload', logEvents);

// trying to send a batch before the last completed
// would cause InvalidSequenceTokenException.
if (lib._postingEvents || logEvents.length <= 0) {
if (lib._postingEvents[streamName] || logEvents.length <= 0) {
debug('nothing to do or already doing something');
return cb();
}

lib._postingEvents = true;
lib._postingEvents[streamName] = true;
safeUpload(function(err) {
lib._postingEvents = false;
lib._postingEvents[streamName] = false;
return cb(err);
});

Expand Down Expand Up @@ -69,7 +71,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, cb
};
if (token) payload.sequenceToken = token;

lib._postingEvents = true;
lib._postingEvents[streamName] = true;
debug('send to aws');
aws.putLogEvents(payload, function(err, data) {
debug('sent to aws, err: ', err, ' data: ', data)
Expand All @@ -89,7 +91,7 @@ lib.upload = function(aws, groupName, streamName, logEvents, retentionInDays, cb
lib._nextToken[previousKeyMapKey(groupName, streamName)] = data.nextSequenceToken;
}

lib._postingEvents = false;
lib._postingEvents[streamName] = false;
cb()
}
});
Expand All @@ -102,7 +104,7 @@ lib.submitWithAnotherToken = function(aws, groupName, streamName, payload, reten
lib.getToken(aws, groupName, streamName, retentionInDays, function(err, token) {
payload.sequenceToken = token;
aws.putLogEvents(payload, function(err) {
lib._postingEvents = false;
lib._postingEvents[streamName] = false;
cb(err)
});
})
Expand All @@ -114,7 +116,7 @@ function retrySubmit(aws, payload, times, cb) {
if (err && times > 0) {
retrySubmit(aws, payload, times - 1, cb)
} else {
lib._postingEvents = false;
lib._postingEvents[payload.logStreamName] = false;
cb(err)
}
})
Expand Down
17 changes: 15 additions & 2 deletions test/cloudwatch-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ describe('cloudwatch-integration', function() {
lib.upload(aws, 'group', 'stream', events, 0, function() {
// The second upload call should get ignored
aws.putLogEvents.calledOnce.should.equal(true);
lib._postingEvents = false; // reset
lib._postingEvents['stream'] = false; // reset
done()
});
});
Expand All @@ -46,11 +46,24 @@ describe('cloudwatch-integration', function() {
lib.upload(aws, 'group', 'stream', events, 0, function() {
// The second upload call should get ignored
lib.getToken.calledOnce.should.equal(true);
lib._postingEvents = false; // reset
lib._postingEvents['stream'] = false; // reset
done()
});
});

it('not ignores upload calls if getToken already in progress for another stream', function(done) {
const events = [{ message : "test message", timestamp : new Date().toISOString()}];
lib.getToken.onFirstCall().returns(); // Don't call call back to simulate ongoing token request.
lib.getToken.onSecondCall().yields(null, 'token');
lib.upload(aws, 'group', 'stream1', events, 0, function(){ });
lib.upload(aws, 'group', 'stream2', events, 0, function(){ done() });

lib.getToken.calledTwice.should.equal(true);

lib._postingEvents['stream1'] = false; // reset
lib._postingEvents['stream2'] = false; // reset
});

it('truncates very large messages and alerts the error handler', function(done) {
var BIG_MSG_LEN = 300000;
const events = [{ message : new Array(BIG_MSG_LEN).join('A'), timestamp : new Date().toISOString()}];
Expand Down

0 comments on commit 1188bfe

Please sign in to comment.