Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const overrideLeoFunctions = (data = {}, leo=leosdk) => {
toDynamoDB: true,
testId: process.pid
}, data);
readQueueObjectArray = data.queues;
let readQueueObjectArray = data.queues;
leo.configuration.validate = () => true;

let testId = data.testId;
Expand Down
38 changes: 29 additions & 9 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1232,29 +1232,45 @@ module.exports = function(configure) {
}

let getEvents;

if (leoEvent.v >= 2 || !leoEvent.max_eid) {
var max_eid = (configure.registry && configure.registry.__cron && configure.registry.__cron.maxeid) || opts.maxOverride || leoEvent.max_eid || '';
var max_eid_query = max_eid;
var max_end_params = {};

// If the end is not on a record boundry
// make sure the containing record is included
if (leoEvent.max_eid !== max_eid){
max_eid_query = max_eid.replace(/-\d+/, "~");
max_end_params = {
FilterExpression: "#start <= :max_eid ",
ExpressionAttributeNames: {
"#start": "start"
},
ExpressionAttributeValues: {
":max_eid": max_eid
}
}
}
var table_name = configure.resources.LeoStream;
var eid = "eid";
var range_key = "end";

getEvents = function(callback) {
var params = {
var params = extend(true, {
TableName: table_name,
KeyConditionExpression: "#event = :event and #key between :start and :maxkey",
ExpressionAttributeNames: {
"#event": "event",
"#key": range_key,
"#key": range_key
},
Limit: 50,
ExpressionAttributeValues: {
":event": queue,
":maxkey": usingSnapshot ? snapshotEnd.replace("_snapshot/", "") + 9 : max_eid,
":maxkey": usingSnapshot ? snapshotEnd.replace("_snapshot/", "") + 9 : max_eid_query,
":start": usingSnapshot ? start.replace("_snapshot/", "") : start
},
"ReturnConsumedCapacity": 'TOTAL'
};
}, max_end_params);
logger.debug(params);
dynamodb.docClient.query(params, function(err, data) {
logger.debug("Consumed Capacity", data && data.ConsumedCapacity);
Expand All @@ -1263,6 +1279,7 @@ module.exports = function(configure) {
callback(err);
return;
}

callback(null, data.Items);
});
};
Expand Down Expand Up @@ -1400,14 +1417,15 @@ module.exports = function(configure) {
}
}), ls.write((obj, done) => {
var e = obj.obj;
totalSize += obj.length;
if (!item.archive) {
e.eid = createEId(eid++);
}
let isGreaterThanStart = e.eid.localeCompare(start) > 0;
let isLessThanEnd = e.eid.localeCompare(max_eid) <= 0;

if (!isPassDestroyed && isGreaterThanStart && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
if (!isPassDestroyed && isGreaterThanStart && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
totalCount++;
totalSize += obj.length;
pass.throttledWrite(e, done);
} else { //otherwise it means we had a number in the middle of a file
if (isPassDestroyed || totalCount >= opts.limit || totalSize >= opts.size) {
Expand Down Expand Up @@ -1442,7 +1460,8 @@ module.exports = function(configure) {
item.payload = JSON.parse(item.payload);
}

if (!isPassDestroyed && item.eid.localeCompare(start) > 0 && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
let isLessThanEnd = item.eid.localeCompare(max_eid) <= 0;
if (!isPassDestroyed && item.eid.localeCompare(start) > 0 && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
totalCount++;
pass.throttledWrite(item, done);
} else { //otherwise it means we had a number in the middle of a file
Expand All @@ -1457,7 +1476,8 @@ module.exports = function(configure) {
});
pump(gzip, split(JSON.parse), ls.write((e, done) => {
e.eid = createEId(e.eid);
if (!isPassDestroyed && e.eid.localeCompare(start) > 0 && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
let isLessThanEnd = e.eid.localeCompare(max_eid) <= 0;
if (!isPassDestroyed && e.eid.localeCompare(start) > 0 && isLessThanEnd && totalCount < opts.limit && totalSize < opts.size) { //Then it is greater than the event they were looking at
totalCount++;
pass.throttledWrite(e, done);
} else { //otherwise it means we had a number in the middle of a file
Expand Down