Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@

import Pumpify from "pumpify";
import stream from "stream";
import stream, { Stream } from "stream";
import { Callback, EnrichOptions, EnrichBatchOptions, OffloadOptions, OffloadBatchOptions, ReadOptions, StreamUtil, ToCheckpointOptions, WriteOptions } from "./lib/lib";
import { LeoCron } from "./lib/cron";
import { LeoDynamodb } from "./lib/dynamodb";
import AWS, { Credentials } from "aws-sdk";
import { Event } from "./lib/types";
import ConfigurationProvider from "./lib/rstreams-configuration";
import { ReadableStream } from "./lib/types";
import { StreamUtilV2 } from "./lib/stream/leo-stream-v2";
export * from "./lib/types";

/**
Expand Down Expand Up @@ -104,7 +105,7 @@ export declare class RStreamsSdk {
* @return Rstreams Used to get the leo stream to do more advanced processing of the streams.
* @todo question do we still need this? can/should we put all useful things in this interface?
*/
streams: typeof StreamUtil;
streams: typeof StreamUtil & StreamUtilV2;

/** @method */
load: typeof StreamUtil.load;
Expand Down Expand Up @@ -246,22 +247,6 @@ export declare class RStreamsSdk {
createSource: <T, R = any>(fn: CreateSourceFunction<T, R>, opt?: CreateSourceOptions, state?: R) => ReadableStream<T>;
}

/**
* Async function that you write that takes the current state R and returns an array of data tpye T
*/
export declare type CreateSourceFunction<T, R> = (state: R) => Promise<T[] | undefined>;

/**
* Options for the function [[`RStreamsSdk.createSource`]]
*/
export interface CreateSourceOptions {
/** max number or records to emit before ending the stream */
records?: number;

/** max number of milliseconds to wait before closing the stream */
milliseconds?: number;
}

export interface AwsResourceConfig {
s3Config?: AWS.S3.ClientConfiguration,
dynamodbConfig?: AWS.DynamoDB.DocumentClient.DocumentClientOptions & AWS.ConfigurationOptions & AWS.DynamoDB.ClientApiVersions,
Expand Down
105 changes: 1 addition & 104 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -193,110 +193,7 @@ function SDK(id, data, awsResourceConfig) {
credentials: configuration.credentials
})
},
createSource: function(fn, opts = {}, state = {}) {
let log = leologger.sub("CreateSource");
// Set default option values
opts = Object.assign({
records: Number.POSITIVE_INFINITY,
milliseconds: undefined
}, opts);

// Counter/Timers
let startTime = Date.now();
let lastStart = startTime;
let totalRecords = 0;

// Stream pass through - This is the returned object
let pass = this.streams.passThrough({ objectMode: true });


// Setup a timeout if requested
let timeout;
if (opts.milliseconds != null && opts.milliseconds > 0) {
timeout = setTimeout(() => {
if (!pass.isEnding) {
log.debug('Requested timeout ms hit. Ending');
pass.end();
}
}, opts.milliseconds);
}

// Override stream end to cleanup timers
// and protect agains duplicate calls
pass.isEnding = false;
pass.orig_end = pass.end;
pass.end = function() {
log.debug('Pass.end Called');
if (!pass.isEnding) {
pass.isEnding = true;
timeout && clearTimeout(timeout);
pass.orig_end();
}
};


// Convience method for async writting with backpressure
pass.throttleWrite = function(data) {
return new Promise((resolve) => {
if (!pass.write(data)) {
pass.once('drain', () => {
resolve();
});
} else {
resolve();
}
});
};

// Generator to poll for more data
async function* poller() {

// Get the initial set of data to stream
let records = await fn(state);

// Loop yielding and fetching records until
// 1) There are no more recrods
// 2) Time runs out
// 3) We have yielding the requested number of records
outerLoop:
while ((records != null && records.length > 0) && opts.records > totalRecords && !pass.isEnding) {
for (const hit of records) {
totalRecords++;

// send the results back to the caller and wait to be resumed
// that's why this is a generator function (function*)
yield hit;

// Break out of the current batch because we hit
// an end condition
if (opts.records <= totalRecords || pass.isEnding) {
break outerLoop;
}
}

log.debug(`Batch Records: ${records.length}, Percent: ${totalRecords}/${opts.records}, Total Duration: ${Date.now() - startTime}, Batch Duration ${Date.now() - lastStart}`);
lastStart = Date.now();

// Get the next set of records
records = await fn(state);
}
}

// Async function to query and write data to the stream
let run = (async function() {
for await (const data of poller()) {
await pass.throttleWrite(data);
}
});

// Start running the async function with hooks to pass along errors
// and end the pass through
run()
.then(() => pass.end())
.catch(err => pass.emit('error', err));

return pass;
}
createSource: leoStream.createSource
});
}

Expand Down
58 changes: 58 additions & 0 deletions lib/lib.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import { Event, ReadEvent, ReadableStream, WritableStream, TransformStream, Corr
import * as es from "event-stream";
import zlib from "zlib";
import { Options as BackoffOptions } from "backoff";
import { Context } from "aws-lambda";
import { ConfigurationResources, Cron } from "../index";

/**
* A standard callback function. If the operation failed, return the first argument only,
Expand Down Expand Up @@ -279,6 +281,32 @@ export interface ReadOptions {
*/
fast_s3_read_parallel_fetch_max_bytes?: number;


/**
* When using the [[`ReadOptions.fast_s3_read`]] feature, this specifies if downloaded files should be saved after they are consumed.
*
* @default false
* @todo inconsistent fast_s3_read_save_files
*/
fast_s3_read_save_files?: boolean;

/**
* When using the [[`ReadOptions.fast_s3_read`]] feature, this specifies if max size in bytes that a file should be download locally vs streamed from S3.
*
* @default 5mb in bytes
* @todo inconsistent fast_s3_read_max_download_file_size_bytes
*/
fast_s3_read_max_download_file_size_bytes?: number;

/**
* When using the [[`ReadOptions.fast_s3_read`]] feature, this specifies if downloaded files should be saved uncompressed (jsonl vs jsonl.gz).
*
* @default false
* @todo inconsistent fast_s3_read_download_as_uncompressed
*/
fast_s3_read_download_as_uncompressed?: boolean;


/**
* The max number of records, events, the SDK should retrieve each time it retrieves events from the
* RStreams Bus' Dynamo DB events table.
Expand Down Expand Up @@ -616,6 +644,16 @@ export interface CreateCorrelationOptions {
partial: boolean;
}


export interface ConfigurationObject {
registry?: {
context?: Context,
__cron?: Cron
},
resources: ConfigurationResources
}


/**
* This namespace encompasses the majority of the functionality of the SDK.
* It might be helpful to start at [[RStreamsSdk]] which exposes functionality from this namespace
Expand All @@ -625,6 +663,9 @@ export interface CreateCorrelationOptions {
*/
export declare namespace StreamUtil {

const configuration: ConfigurationObject;


/**
* Helper function to turn a timestamp into an RStreams event ID.
*
Expand Down Expand Up @@ -1160,3 +1201,20 @@ export declare namespace StreamUtil {
function createCorrelation<T>(events: ReadEvent<T>[], opts?: CreateCorrelationOptions): CorrelationId;

}


/**
* Async function that you write that takes the current state R and returns an array of data tpye T
*/
export declare type CreateSourceFunction<T, R> = (state: R) => Promise<T[] | undefined>;

/**
* Options for the function [[`RStreamsSdk.createSource`]]
*/
export interface CreateSourceOptions<R = any> {
/** max number or records to emit before ending the stream */
records?: number;

/** max number of milliseconds to wait before closing the stream */
milliseconds?: number;
}
Loading