Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
chore: renamed wrapper function, fixed arguments handling, fixed types
  • Loading branch information
dreamorosi committed Jul 4, 2023
commit 02740b81a3338c8cf8e67e17b69dfa10f072d28a
103 changes: 66 additions & 37 deletions packages/idempotency/src/IdempotencyHandler.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AnyFunctionWithRecord, IdempotencyHandlerOptions } from './types';
import type { JSONValue } from '@aws-lambda-powertools/commons';
import type { AnyFunction, IdempotencyHandlerOptions } from './types';
import { IdempotencyRecordStatus } from './types';
import {
IdempotencyAlreadyInProgressError,
Expand All @@ -13,31 +14,57 @@ import { search } from 'jmespath';

/**
* @internal
*
* Class that handles the idempotency lifecycle.
*
* This class is used under the hood by the Idempotency utility
* and provides several methods that are called at different stages
* to orchestrate the idempotency logic.
*/
export class IdempotencyHandler<U> {
private readonly fullFunctionPayload: Record<string, unknown>;
private readonly functionPayloadToBeHashed: Record<string, unknown>;
private readonly functionToMakeIdempotent: AnyFunctionWithRecord<U>;
private readonly idempotencyConfig: IdempotencyConfig;
private readonly persistenceStore: BasePersistenceLayer;
export class IdempotencyHandler<Func extends AnyFunction> {
/**
* The arguments passed to the function.
*
* For example, if the function is `foo(a, b)`, then `functionArguments` will be `[a, b]`.
* We need to keep track of the arguments so that we can pass them to the function when we call it.
*/
readonly #functionArguments: unknown[];
/**
* The payload to be hashed.
*
* This is the argument that is used for the idempotency.
*/
readonly #functionPayloadToBeHashed: JSONValue;
/**
* Reference to the function to be made idempotent.
*/
readonly #functionToMakeIdempotent: AnyFunction;
/**
* Idempotency configuration options.
*/
readonly #idempotencyConfig: IdempotencyConfig;
/**
* Persistence layer used to store the idempotency records.
*/
readonly #persistenceStore: BasePersistenceLayer;

public constructor(options: IdempotencyHandlerOptions<U>) {
public constructor(options: IdempotencyHandlerOptions) {
const {
functionToMakeIdempotent,
functionPayloadToBeHashed,
idempotencyConfig,
fullFunctionPayload,
functionArguments,
persistenceStore,
} = options;
this.functionToMakeIdempotent = functionToMakeIdempotent;
this.functionPayloadToBeHashed = functionPayloadToBeHashed;
this.idempotencyConfig = idempotencyConfig;
this.fullFunctionPayload = fullFunctionPayload;
this.#functionToMakeIdempotent = functionToMakeIdempotent;
this.#functionPayloadToBeHashed = functionPayloadToBeHashed;
this.#idempotencyConfig = idempotencyConfig;
this.#functionArguments = functionArguments;

this.persistenceStore = persistenceStore;
this.#persistenceStore = persistenceStore;

this.persistenceStore.configure({
config: this.idempotencyConfig,
this.#persistenceStore.configure({
config: this.#idempotencyConfig,
});
}

Expand Down Expand Up @@ -69,14 +96,14 @@ export class IdempotencyHandler<U> {
return idempotencyRecord.getResponse();
}

public async getFunctionResult(): Promise<U> {
let result: U;
public async getFunctionResult(): Promise<ReturnType<Func>> {
let result;
try {
result = await this.functionToMakeIdempotent(this.fullFunctionPayload);
result = await this.#functionToMakeIdempotent(...this.#functionArguments);
} catch (e) {
try {
await this.persistenceStore.deleteRecord(
this.functionPayloadToBeHashed
await this.#persistenceStore.deleteRecord(
this.#functionPayloadToBeHashed
);
} catch (e) {
throw new IdempotencyPersistenceLayerError(
Expand All @@ -87,9 +114,9 @@ export class IdempotencyHandler<U> {
throw e;
}
try {
await this.persistenceStore.saveSuccess(
this.functionPayloadToBeHashed,
result as Record<string, unknown>
await this.#persistenceStore.saveSuccess(
this.#functionPayloadToBeHashed,
result
);
} catch (e) {
throw new IdempotencyPersistenceLayerError(
Expand All @@ -108,7 +135,7 @@ export class IdempotencyHandler<U> {
* window, we might get an `IdempotencyInconsistentStateError`. In such
* cases we can safely retry the handling a few times.
*/
public async handle(): Promise<U> {
public async handle(): Promise<ReturnType<Func>> {
let e;
for (let retryNo = 0; retryNo <= MAX_RETRIES; retryNo++) {
try {
Expand All @@ -129,34 +156,36 @@ export class IdempotencyHandler<U> {
throw e;
}

public async processIdempotency(): Promise<U> {
public async processIdempotency(): Promise<ReturnType<Func>> {
// early return if we should skip idempotency completely
if (
IdempotencyHandler.shouldSkipIdempotency(
this.idempotencyConfig.eventKeyJmesPath,
this.idempotencyConfig.throwOnNoIdempotencyKey,
this.fullFunctionPayload
this.#idempotencyConfig.eventKeyJmesPath,
this.#idempotencyConfig.throwOnNoIdempotencyKey,
this.#functionPayloadToBeHashed
)
) {
return await this.functionToMakeIdempotent(this.fullFunctionPayload);
return await this.#functionToMakeIdempotent(...this.#functionArguments);
}

try {
await this.persistenceStore.saveInProgress(
this.functionPayloadToBeHashed,
this.idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
await this.#persistenceStore.saveInProgress(
this.#functionPayloadToBeHashed,
this.#idempotencyConfig.lambdaContext?.getRemainingTimeInMillis()
);
} catch (e) {
if (e instanceof IdempotencyItemAlreadyExistsError) {
const idempotencyRecord: IdempotencyRecord =
await this.persistenceStore.getRecord(this.functionPayloadToBeHashed);
await this.#persistenceStore.getRecord(
this.#functionPayloadToBeHashed
);

return IdempotencyHandler.determineResultFromIdempotencyRecord(
idempotencyRecord
) as U;
) as ReturnType<Func>;
} else {
throw new IdempotencyPersistenceLayerError(
'Failed to save record in progress',
'Failed to save in progress record to idempotency store',
e as Error
);
}
Expand All @@ -177,7 +206,7 @@ export class IdempotencyHandler<U> {
public static shouldSkipIdempotency(
eventKeyJmesPath: string,
throwOnNoIdempotencyKey: boolean,
fullFunctionPayload: Record<string, unknown>
fullFunctionPayload: JSONValue
): boolean {
return (eventKeyJmesPath &&
!throwOnNoIdempotencyKey &&
Expand Down
2 changes: 1 addition & 1 deletion packages/idempotency/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export * from './errors';
export * from './IdempotencyConfig';
export * from './makeFunctionIdempotent';
export * from './makeIdempotent';
87 changes: 0 additions & 87 deletions packages/idempotency/src/makeFunctionIdempotent.ts

This file was deleted.

108 changes: 108 additions & 0 deletions packages/idempotency/src/makeIdempotent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import type { Context, Handler } from 'aws-lambda';
import type {
AnyFunction,
ItempotentFunctionOptions,
IdempotencyLambdaHandlerOptions,
} from './types';
import { IdempotencyHandler } from './IdempotencyHandler';
import { IdempotencyConfig } from './IdempotencyConfig';

const isContext = (arg: unknown): arg is Context => {
return (
arg !== undefined &&
arg !== null &&
typeof arg === 'object' &&
'getRemainingTimeInMillis' in arg
);
};

const isFnHandler = (
fn: AnyFunction,
args: Parameters<AnyFunction>
): fn is Handler => {
// get arguments of function
return (
fn !== undefined &&
fn !== null &&
typeof fn === 'function' &&
isContext(args[1])
);
};

const isOptionsWithDataIndexArgument = (
options: unknown
): options is IdempotencyLambdaHandlerOptions & {
dataIndexArgument: number;
} => {
return (
options !== undefined &&
options !== null &&
typeof options === 'object' &&
'dataIndexArgument' in options
);
};

/**
* Use function wrapper to make your function idempotent.
* @example
* ```ts
* // this is your processing function with an example record { transactionId: '123', foo: 'bar' }
* const processRecord = (record: Record<string, unknown>): any => {
* // you custom processing logic
* return result;
* };
*
* // we use wrapper to make processing function idempotent with DynamoDBPersistenceLayer
* const processIdempotently = makeFunctionIdempotent(processRecord, {
* persistenceStore: new DynamoDBPersistenceLayer()
* dataKeywordArgument: 'transactionId', // keyword argument to hash the payload and the result
* });
*
* export const handler = async (
* _event: EventRecords,
* _context: Context
* ): Promise<void> => {
* for (const record of _event.records) {
* const result = await processIdempotently(record);
* // do something with the result
* }
*
* return Promise.resolve();
* };
*
* ```
*/
const makeIdempotent = <Func extends AnyFunction>(
fn: Func,
options: ItempotentFunctionOptions<Parameters<Func>>
): ((...args: Parameters<Func>) => ReturnType<Func>) => {
const { persistenceStore, config } = options;
const idempotencyConfig = config ? config : new IdempotencyConfig({});

if (!idempotencyConfig.isEnabled()) return fn;

return (...args: Parameters<Func>): ReturnType<Func> => {
let functionPayloadToBeHashed;

if (isFnHandler(fn, args)) {
idempotencyConfig.registerLambdaContext(args[1]);
functionPayloadToBeHashed = args[0];
} else {
if (isOptionsWithDataIndexArgument(options)) {
functionPayloadToBeHashed = args[options.dataIndexArgument];
} else {
functionPayloadToBeHashed = args[0];
}
}

return new IdempotencyHandler({
functionToMakeIdempotent: fn,
idempotencyConfig: idempotencyConfig,
persistenceStore: persistenceStore,
functionArguments: args,
functionPayloadToBeHashed,
}).handle() as ReturnType<Func>;
};
};

export { makeIdempotent };
Loading