Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/stream/helper/leo-stream-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ export function determineReadHooks<T>(settings: ReadOptions<T>, partialHookSetti
opts: {
parser: settings.parser,
...settings.parserOpts,
...partialHookSettings?.parseTaskParser.opts
...partialHookSettings?.parseTaskParser?.opts
},
} : undefined;
let parallelParse = parseTaskParser != null;
Expand Down Expand Up @@ -767,6 +767,7 @@ export function determineReadHooks<T>(settings: ReadOptions<T>, partialHookSetti
hooks: createFastS3ReadHooks(hookSettings)
};
if (defaultsFromMem.parallelFetchMax > 0) {
readOpts.stream_query_limit = 1000;
readOpts.fast_s3_read = true;
readOpts.fast_s3_read_parallel_fetch_max_bytes = readOpts.fast_s3_read_parallel_fetch_max_bytes || defaultsFromMem.parallelFetchMax;
}
Expand Down
2 changes: 1 addition & 1 deletion lib/stream/helper/parser-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ export let parsers: Record<ParserName, (settings: any) => (input: string) => any

let fieldParsers: Record<FastParseType, { parse?: (value: string | Buffer) => any, set?: (field: string, value: any) => void }> = {
[FastParseType.String]: { parse: (value) => value },
[FastParseType.Number]: { parse: (value) => parseInt(value.toString(), 10) },
[FastParseType.Number]: { parse: (value) => Number(value) },
[FastParseType.Eid]: { parse: (value) => value.toString().startsWith("z/") ? value : parseInt(value.toString(), 10) },
[FastParseType.Raw]: { set: (field, value) => set(field, value, "_raw") }
};
Expand Down
9 changes: 5 additions & 4 deletions lib/stream/leo-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -1250,7 +1250,7 @@ module.exports = function(configure) {
return result;
},
fromLeo: (ID, queue, opts) => {
opts = opts || {};
opts = Object.assign({}, opts || {});
queue = refUtil.ref(queue).queue(opts.subqueue).id;
if (!opts.stopTime && configure.registry && configure.registry.context) {
opts.stopTime = moment.now() + (configure.registry.context.getRemainingTimeInMillis() * 0.8);
Expand Down Expand Up @@ -1287,8 +1287,6 @@ module.exports = function(configure) {
}
}
);
// Allow access to added stuff from outside this function
Object.assign(origOpts, opts);
}

logger.info(opts);
Expand Down Expand Up @@ -1332,7 +1330,10 @@ module.exports = function(configure) {

// tracks if we've passed destroy on the passthrough
let isPassDestroyed = false;


// Allow access to added stuff from outside this function
pass.getOpts = () => opts;

pass.destroy = function() {
hasTime = false;
// we have destroyed pass
Expand Down
3 changes: 2 additions & 1 deletion lib/types.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
export * from "./lib";
export * from "./streams";
import { Context } from "aws-lambda";
import { Callback, RStreamsSdk } from "../index";
import { Callback, RStreamsSdk, ReadOptions } from "../index";
/// <reference types="node" />
import stream from 'stream';
// export interface LeoStreamOptions extends stream.TransformOptions {
Expand All @@ -13,6 +13,7 @@ import stream from 'stream';

export interface ReadableQueueStream<T> extends ReadableStream<ReadEvent<T>> {
get(): Checkpoint;
getOpts(): ReadOptions<T>;
checkpoint(params: Checkpoint, done: Callback): void;
checkpoint(done: Callback): void;
}
Expand Down
5 changes: 4 additions & 1 deletion wrappers/cron.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ module.exports = function(configOverride, botHandler) {
//const kms = require("../lib/kms")(leosdk.configuration);
const refUtil = require("../lib/reference.js");

process.__config = config;
process.__config = process.__config || config;
process.__config.registry = process.__config.registry || {};
config.registry = Object.assign(process.__config.registry, config.registry || {});

const fill = require("../lib/build-config").fillWithTableReferences;
process.env.TZ = config.timezone;
// require('source-map-support').install({
Expand Down