Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "node",
"request": "launch",
"name": "Unit Tests",
"env": {
"LEO_LOGGER": "/.*/a"
},
"program": "${workspaceFolder}/node_modules/mocha/bin/_mocha",
"args": [
"-u",
"bdd",
"--timeout",
"999999",
"--colors",
"${workspaceFolder}/**/*.utest.js"
],
"internalConsoleOptions": "openOnSessionStart"
}
]
}
55 changes: 54 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
{
"name": "leo-sdk",
"version": "5.0.12",
"version": "5.0.13",
"description": "Load data onto the LEO Platform",
"homepage": "https://leoplatform.io",
"main": "index.js",
"directories": {
"test": "test"
},
"scripts": {
"test": "mocha --recursive -w"
"test": "mocha --recursive -w",
"utest": "mocha \"**/*.utest.js\""
},
"dependencies": {
"async": "2.6.2",
Expand All @@ -32,6 +33,7 @@
"uuid": "3.3.2"
},
"devDependencies": {
"chai": "^4.2.0",
"mocha": "6.1.4"
},
"peerDependencies": {
Expand Down
79 changes: 51 additions & 28 deletions wrappers/fanout.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const logger = require("leo-logger")("======= [ leo-fanout ] =======");
* @param {function(BotEvent, LambdaContext, Callback)} handler function to the code handler
* @param {function(QueueEvent): any} eventPartition function to return the value representing what partition for the event
*/
module.exports = (handler, eventPartition, opts = {}) => {
const fanoutFactory = (handler, eventPartition, opts = {}) => {
if (typeof eventPartition !== "function") {
opts = eventPartition || {};
eventPartition = opts.eventPartition;
Expand Down Expand Up @@ -193,39 +193,57 @@ module.exports = (handler, eventPartition, opts = {}) => {

// Wait for all workers to return and figure out what checkpoint to persist
logger.debug(`Waiting on all Fanout workers: count ${workers.length}`);
Promise.all(workers).then(responses => {
logger.log("Return from all workers, reducing checkpoints");
let checkpoints = reduceCheckpoints(responses).map((data) => {
logger.log("[data]", data);
return Object.keys(data).map((botId) => {
logger.log("[botId]", botId);
return Object.keys(data[botId].read || {}).map((queue) => {
logger.log("[queue]", queue);
let params = data[botId].read[queue];
logger.log("[params]", params);
return (done) => {
logger.log("---------------------- Executing checkpoint ---------------", params);
leoBotCheckpoint(botId, queue, params, done);
};
});
});
});
logger.log("[promise all checkpoints]", checkpoints);
if(checkpoints && checkpoints[0] && checkpoints[0][0] && checkpoints[0][0].length) {
logger.log("---- calling checkpoints ----");
async.parallelLimit(checkpoints[0][0], 5, callback);
} else {
logger.log("---- no events processed ----");
callback(null, true);
}
}).catch((err) => {
Promise.all(workers).then(callCheckpointOnResponses(leoBotCheckpoint, callback)).catch((err) => {
logger.error("[err]", err);
return callback(err);
});
}
};
};

function callCheckpointOnResponses(leoBotCheckpoint, callback) {
return function (responses) {
logger.log("Return from all workers, reducing checkpoints");
let checkpoints = reduceCheckpoints(responses).map((data) => {
logger.log("[data]", data);
return Object.keys(data).map((botId) => {
logger.log("[botId]", botId);
const checkpointTasks = [];
Object.keys(data[botId].read || {}).map((queue) => {
logger.log("[queue]", queue);
let params = data[botId].read[queue];
params.type = "read";
logger.log("[params]", params);
checkpointTasks.push((done) => {
logger.log(`---------------------- Executing read checkpoint against ${queue} ---------------`, params);
leoBotCheckpoint(botId, queue, params, done);
});
});
Object.keys(data[botId].write || {}).map((queue) => {
logger.log("[queue]", queue);
let params = data[botId].write[queue];
params.type = "write";
logger.log("[params]", params);
checkpointTasks.push((done) => {
logger.log(`---------------------- Executing write checkpoint against ${queue} ---------------`, params);
leoBotCheckpoint(botId, queue, params, done);
});
});
return checkpointTasks;
});
});
logger.log("[promise all checkpoints]", checkpoints);
if(checkpoints && checkpoints[0] && checkpoints[0][0] && checkpoints[0][0].length) {
logger.log("---- calling checkpoints ----");
async.parallelLimit(checkpoints[0][0], 5, callback);
} else {
logger.log("---- no events processed ----");
callback(null, true);
}
};
}


/**
* @param {*} event The base event to send to the worker
* @param {number} iid The instance id of this worker
Expand Down Expand Up @@ -322,6 +340,7 @@ function invokeSelf(event, iid, count, context) {
* @returns {[checkpoint]} Consolidated checkpoint
*/
function reduceCheckpoints(responses) {
logger.log("[responses]", JSON.stringify(responses, null, 2));
let checkpoints = responses.reduce((agg, curr) => {
if (curr && curr.error) {
agg.errors.push(curr.error);
Expand Down Expand Up @@ -357,7 +376,6 @@ function reduceCheckpoints(responses) {
errors: [],
checkpoints: {}
});
logger.log("[responses]", JSON.stringify(responses, null, 2));
logger.log("[checkpoints]", JSON.stringify(checkpoints, null, 2));
if(checkpoints.errors && checkpoints.errors.length) {
throw new Error("errors from sub lambdas");
Expand Down Expand Up @@ -408,3 +426,8 @@ function min(...args) {
}
return current;
}

fanoutFactory.reduceCheckpoints = reduceCheckpoints;
fanoutFactory.callCheckpointOnResponses = callCheckpointOnResponses;

module.exports = fanoutFactory;
Loading