Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package-lock.json
.scannerwork
.idea
.idea/
*.js
*.js.map
*.utest.js
docs/build/
Expand Down
56 changes: 44 additions & 12 deletions lib/cron.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AWSError } from "aws-sdk";
import moment, { Moment } from "moment";
import { Checkpoints, Cron } from "./types";

/**
* Defines a callback function that takes an error object.
Expand All @@ -14,28 +15,59 @@ export declare type Callback<E> = (err: E) => void;
*/
declare type DataCallback<E, T> = (err: E, data: T) => void;

type InstanceStatus = "error" | "complete";

/**
* @todo question What is this here for?
*/
interface CronData {
export interface InstanceData {

invokeTime?: number;
startTime?: number;
completedTime?: number;

maxDuration?: number;

status?: InstanceStatus;
requestId?: string;
log?: Buffer;
result?: Buffer;
}

/**
* @todo question What is this here for?
*/
interface InstanceData { }
export interface ReadFilterGroup {
eid: string;
icount: number;
ts: number;
}

type ExecutionType = "lambda" | "fargate";

/**
* @todo question What is this here for?
*/
interface BotData { }
export interface BotData<Settings = unknown> {
id: string;
checkpoints: Checkpoints,

lambdaName: string,
name?: string,
description?: string,
lambda: {
settings: Settings[]
}
time?: string,
triggers?: string[],
executionType: ExecutionType,
instances: Record<string, InstanceData>,
requested_kinesis: Record<string, string>
read_filter_groups?: ReadFilterGroup[]
}

/**
* Options for reporting a bot is done.
* @todo review
*/
interface ReportCompleteOptions {
export interface ReportCompleteOptions {
/** If true, force the bot to be marked completed. */
forceComplete?: boolean;
}
Expand Down Expand Up @@ -154,7 +186,7 @@ export interface LeoCron {
* @param callback A callback that will be called if something goes wrong
* @todo question cron is the first arg but it's an empty type definition, what is it for?
*/
checkLock: (cron: CronData, runid: string, remainingTime: Milliseconds, callback: Callback<AWSError>) => void;
checkLock: (cron: Cron & { time?: number }, runid: string, remainingTime: Milliseconds, callback: Callback<AWSError>) => void;

/**
* Mark a bot done running.
Expand All @@ -168,7 +200,7 @@ export interface LeoCron {
*
* @todo question what does forcing a bot to complete mean, remove the lock?
*/
reportComplete: (cron: CronData, runid: string, status: string, log: any, opts: ReportCompleteOptions, callback: Callback<AWSError>) => void;
reportComplete: (cron: Cron & { result?: any, message?: any }, runid: string, status: string, log: any, opts: ReportCompleteOptions, callback: Callback<AWSError>) => void;

/**
* Lock the given bot, marking it as currently running. Only one instance of a bot is meant to be running
Expand Down Expand Up @@ -221,7 +253,7 @@ export interface LeoCron {
* @param bot The bot data to save on the bot
* @param callback A callback that will be called if something goes wrong
*/
update: (bot: BotData, callback: Callback<AWSError>) => void;
update: (bot: Partial<BotData> & { id: string }, callback?: Callback<AWSError>) => Promise<BotData>;

/**
* @todo unclear
Expand Down Expand Up @@ -267,7 +299,7 @@ export interface LeoCron {
* @internal Don't use.
* @todo docbug preg/opts missing types
*/
buildPayloads: (cron: CronData, prev, opts) => void;
buildPayloads: (cron: Cron, prev, opts) => void;

/**
* @internal Don't use.
Expand Down Expand Up @@ -305,7 +337,7 @@ export interface LeoCron {
*
* @todo docbug types missing?
*/
createBot: (id: string, bot, opts) => void;
createBot: (id: string, bot, opts?) => Promise<BotData>;
}

export default function (config: any): LeoCron;
79 changes: 54 additions & 25 deletions lib/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ var refUtil = require("./reference.js");
const logger = require('leo-logger')('leo-cron');


function getCheckpointLocationAndType(registry, type = "read") {

let container = (registry && registry.rstreamsLeoReadFilterContainer) || (registry && registry.__cron) || {};
let checkpointLocation = container.cploc;
if (checkpointLocation) {
type = container.iid.toString();
} else {
checkpointLocation = "checkpoints";
}

return {
checkpointLocation,
type
};
}

module.exports = function(configure) {
configure = configure || {};
var dynamodb = new dynamo(configure);
Expand Down Expand Up @@ -205,7 +221,7 @@ module.exports = function(configure) {
":value": {
completedTime: Date.now(),
status: status,
log: zlib.gzipSync(JSON.stringify(log)),
log: zlib.gzipSync(JSON.stringify(log, log instanceof Error ? Object.getOwnPropertyNames(log) : undefined)),
result: (cron.result != undefined && cron.result != null) ? zlib.gzipSync(JSON.stringify(cron.result)) : null,
message: cron.message || configure.registry.cron_message || null
}
Expand Down Expand Up @@ -240,7 +256,7 @@ module.exports = function(configure) {
//":requestId": runid,
":token": cron.ts,
":status": status,
":log": zlib.gzipSync(JSON.stringify(log)),
":log": zlib.gzipSync(JSON.stringify(log, log instanceof Error ? Object.getOwnPropertyNames(log) : undefined)),
":result": (cron.result != undefined && cron.result != null) ? zlib.gzipSync(JSON.stringify(cron.result)) : null
},
"ReturnConsumedCapacity": 'TOTAL'
Expand Down Expand Up @@ -341,9 +357,12 @@ module.exports = function(configure) {
},
checkpoint: function(id, event, params, callback) {
event = refUtil.refId(event);
var type = params.type == "write" ? "write" : "read";
var opts = {};

// Check for override location for checkpointing
let { checkpointLocation, type } = getCheckpointLocationAndType(configure.registry, params.type == "write" ? "write" : "read");


var checkpointData = {
checkpoint: params.eid || params.kinesis_number,
source_timestamp: params.source_timestamp,
Expand All @@ -360,7 +379,7 @@ module.exports = function(configure) {
},
UpdateExpression: 'set #checkpoints.#type.#event = :value',
ExpressionAttributeNames: {
"#checkpoints": "checkpoints",
"#checkpoints": checkpointLocation,
"#type": type,
"#event": event
},
Expand All @@ -374,10 +393,10 @@ module.exports = function(configure) {
!configure.registry.__cron || !configure.registry.__cron.force) && (params.expected || (configure.registry && configure.registry.__cron))) {
var expected = params.expected || (configure.registry &&
configure.registry.__cron &&
configure.registry.__cron.checkpoints &&
configure.registry.__cron.checkpoints[type] &&
configure.registry.__cron.checkpoints[type][event] &&
configure.registry.__cron.checkpoints[type][event].checkpoint) || undefined;
configure.registry.__cron[checkpointLocation] &&
configure.registry.__cron[checkpointLocation][type] &&
configure.registry.__cron[checkpointLocation][type][event] &&
configure.registry.__cron[checkpointLocation][type][event].checkpoint) || undefined;

if (expected) {
cronCheckpointCommand.ConditionExpression = '#checkpoints.#type.#event.#checkpoint = :expected';
Expand All @@ -391,20 +410,20 @@ module.exports = function(configure) {

var updateInMemoryCheckpoint = () => {
// this path is not guaranteed to be in config so do a safe set
var c = ["registry", "__cron", "checkpoints", type].reduce((o, f) => o[f] = o[f] || {}, configure);
var c = ["registry", "__cron", checkpointLocation, type].reduce((o, f) => o[f] = o[f] || {}, configure);
c[event] = checkpointData;
};

let checkpointLocation = (configure.registry && configure.registry.__cron && configure.registry.__cron.cploc);
if (checkpointLocation) {
cronCheckpointCommand.ExpressionAttributeNames[`#checkpoints`] = checkpointLocation;
cronCheckpointCommand.ExpressionAttributeNames["#type"] = configure.registry.__cron.iid.toString();

delete cronCheckpointCommand.ExpressionAttributeNames["#checkpoint"];
delete cronCheckpointCommand.ExpressionAttributeValues[":expected"];
delete cronCheckpointCommand.ConditionExpression;
// if (checkpointLocation) {
// cronCheckpointCommand.ExpressionAttributeNames[`#checkpoints`] = checkpointLocation;
// cronCheckpointCommand.ExpressionAttributeNames["#type"] = configure.registry.__cron.iid.toString();

}
// delete cronCheckpointCommand.ExpressionAttributeNames["#checkpoint"];
// delete cronCheckpointCommand.ExpressionAttributeValues[":expected"];
// delete cronCheckpointCommand.ConditionExpression;

// }

// Safey check to prevent setting checkpoint to undefined
if (checkpointData.checkpoint === undefined) {
Expand Down Expand Up @@ -440,14 +459,17 @@ module.exports = function(configure) {
name: id,
description: null
};
if (checkpointLocation) {

// The structure for `checkpoints` is required so it is handled a little different
// Then when it is a different location
if (checkpointLocation != "checkpoints") {
entry[checkpointLocation] = {
[configure.registry.__cron.iid]: {
[type]: {
[event]: checkpointData
}
};
} else {
entry.checkpoints[type][event] = checkpointData;
entry[checkpointLocation][type][event] = checkpointData;
}
dynamodb.put(CRON_TABLE, id, entry, function(err, data) {
if (err) {
Expand Down Expand Up @@ -476,7 +498,11 @@ module.exports = function(configure) {
botId = (configure.registry.__cron && configure.registry.__cron.id);
}
let id = refUtil.refId(queue);
if (!configure.registry.__cron || !configure.registry.__cron.checkpoints) {

// Check for override location for checkpointing
let { checkpointLocation, type } = getCheckpointLocationAndType(configure.registry);

if (!configure.registry.__cron || !configure.registry.__cron[checkpointLocation]) {
let data = await new Promise((resolve, reject) => {
this.get(botId || configure.registry.__cron.id, {}, (err, data) => {
if (err) {
Expand All @@ -487,9 +513,9 @@ module.exports = function(configure) {
});
});
configure.registry.__cron = configure.registry.__cron || data.__cron;
configure.registry.__cron.checkpoints = data.__cron.checkpoints;
configure.registry.__cron[checkpointLocation] = data.__cron[checkpointLocation];
}
var checkpointData = ["registry", "__cron", "checkpoints", "read"].reduce((o, f) => o[f] = o[f] || {}, configure);
var checkpointData = ["registry", "__cron", checkpointLocation, type].reduce((o, f) => o[f] = o[f] || {}, configure);
return checkpointData[id] && checkpointData[id].checkpoint;

},
Expand Down Expand Up @@ -577,11 +603,14 @@ module.exports = function(configure) {
}
});
if (opts.register) {
// Check for override location for checkpointing
let { checkpointLocation, type } = getCheckpointLocationAndType(configure.registry);

process.__config = process.__config || configure;
process.__config.registry = process.__config.registry || {};
configure.registry = extend(true, process.__config.registry, configure.registry || {});
var checkpointData = ["registry", "__cron", "checkpoints", "read"].reduce((o, f) => o[f] = o[f] || {}, configure);
Object.assign(checkpointData, data.checkpoints && data.checkpoints.read || {});
var checkpointData = ["registry", "__cron", checkpointLocation, type].reduce((o, f) => o[f] = o[f] || {}, configure);
Object.assign(checkpointData, data[checkpointLocation] && data[checkpointLocation][type] || {});
}
let single = !opts.instances;
this.buildPayloads(data, {}, {
Expand Down
8 changes: 4 additions & 4 deletions lib/mock-wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { StreamUtil } from "./lib";
import { ReadEvent, Event, ReadOptions, ReadableStream, WritableStream, TransformStream, WriteOptions, BaseWriteOptions } from "./types";
import { ReadEvent, Event, ReadOptions, ReadableStream, WritableStream, TransformStream, WriteOptions, BaseWriteOptions, Cron } from "./types";
import fs from "fs";
import path from "path";
import util from "./aws-util";
import stream from "stream";
import { Callback, CronData, Milliseconds, ReportCompleteOptions } from "./cron";
import { Callback, Milliseconds, ReportCompleteOptions } from "./cron";
import { AWSError } from "aws-sdk";
import uuid from "uuid";

Expand Down Expand Up @@ -128,8 +128,8 @@ export default function (leoStream: LeoStream) {
};


leoStream.cron.checkLock = (cron: CronData, runid: string, remainingTime: number, callback: Callback<AWSError>) => callback(null);
leoStream.cron.reportComplete = (cron: CronData, runid: string, status: string, log: any, opts: ReportCompleteOptions, callback: Callback<AWSError>) => callback(null);
leoStream.cron.checkLock = (cron: Cron, runid: string, remainingTime: number, callback: Callback<AWSError>) => callback(null);
leoStream.cron.reportComplete = (cron: Cron, runid: string, status: string, log: any, opts: ReportCompleteOptions, callback: Callback<AWSError>) => callback(null);
leoStream.cron.createLock = (id: string, runid: string, maxDuration: Milliseconds, callback: Callback<AWSError>) => callback(null);
leoStream.cron.removeLock = (id: string, runid: string, callback: Callback<AWSError>) => callback(null);

Expand Down
9 changes: 4 additions & 5 deletions lib/stream/helper/chunkEventStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ module.exports = function(ls, event, opts) {
};
}
let correlation = item.correlations[source];
correlation.end = c.end || c.start;
correlation.end = c.cp_eid || c.end || c.start;
correlation.records += c.units || 1;
if (!correlation.start) {
correlation.start = c.start;
Expand All @@ -127,6 +127,7 @@ module.exports = function(ls, event, opts) {
c.end = c.partial_end;
delete c.partial_end;
}
delete c.cp_eid;
}
}

Expand Down Expand Up @@ -197,16 +198,14 @@ module.exports = function(ls, event, opts) {
});
}
}
},
function emit(done, data) {
}, function emit(done, data) {
emitChunk(data.isLast, (value) => {
if (value && (value.size || (value.correlations && Object.keys(value.correlations).length))) {
this.push(value);
}
done();
});
},
function end(done) {
}, function end(done) {
logger.debug("done chunking");
logger.debug("total", totalWrites, totalRecords, totalWrites / totalRecords);
done();
Expand Down
Loading