diff --git a/packages/idempotency/src/IdempotencyHandler.ts b/packages/idempotency/src/IdempotencyHandler.ts index bc34c048d7..651301bbba 100644 --- a/packages/idempotency/src/IdempotencyHandler.ts +++ b/packages/idempotency/src/IdempotencyHandler.ts @@ -1,41 +1,84 @@ +import type { AnyFunctionWithRecord, IdempotencyOptions } from './types'; import { IdempotencyRecordStatus } from './types'; -import type { - AnyFunctionWithRecord, - IdempotencyOptions, -} from './types'; import { + IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, IdempotencyItemAlreadyExistsError, - IdempotencyAlreadyInProgressError, - IdempotencyPersistenceLayerError + IdempotencyPersistenceLayerError, } from './Exceptions'; -import { IdempotencyRecord } from './persistence/IdempotencyRecord'; +import { IdempotencyRecord } from './persistence'; export class IdempotencyHandler { public constructor( private functionToMakeIdempotent: AnyFunctionWithRecord, - private functionPayloadToBeHashed: Record, + private functionPayloadToBeHashed: Record, private idempotencyOptions: IdempotencyOptions, - private fullFunctionPayload: Record - ) {} + private fullFunctionPayload: Record, + ) { + } - public determineResultFromIdempotencyRecord(idempotencyRecord: IdempotencyRecord): Promise | U { + public determineResultFromIdempotencyRecord( + idempotencyRecord: IdempotencyRecord + ): Promise | U { if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.EXPIRED) { - throw new IdempotencyInconsistentStateError('Item has expired during processing and may not longer be valid.'); - } else if (idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS){ - throw new IdempotencyAlreadyInProgressError(`There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}`); + throw new IdempotencyInconsistentStateError( + 'Item has expired during processing and may not longer be valid.' + ); + } else if ( + idempotencyRecord.getStatus() === IdempotencyRecordStatus.INPROGRESS + ) { + if ( + idempotencyRecord.inProgressExpiryTimestamp && + idempotencyRecord.inProgressExpiryTimestamp < + new Date().getUTCMilliseconds() + ) { + throw new IdempotencyInconsistentStateError( + 'Item is in progress but the in progress expiry timestamp has expired.' + ); + } else { + throw new IdempotencyAlreadyInProgressError( + `There is already an execution in progress with idempotency key: ${idempotencyRecord.idempotencyKey}` + ); + } } else { // Currently recalling the method as this fulfills FR1. FR3 will address using the previously stored value https://github.com/awslabs/aws-lambda-powertools-typescript/issues/447 - return this.functionToMakeIdempotent(this.fullFunctionPayload); + return this.functionToMakeIdempotent(this.fullFunctionPayload); + } + } + + /** + * Main entry point for the handler + * IdempotencyInconsistentStateError can happen under rare but expected cases + * when persistent state changes in the small time between put & get requests. + * In most cases we can retry successfully on this exception. + */ + public async handle(): Promise { + + const MAX_RETRIES = 2; + for (let i = 1; i <= MAX_RETRIES; i++) { + try { + return await this.processIdempotency(); + } catch (e) { + if (!(e instanceof IdempotencyAlreadyInProgressError) || i === MAX_RETRIES) { + throw e; + } + } } + /* istanbul ignore next */ + throw new Error('This should never happen'); } public async processIdempotency(): Promise { try { - await this.idempotencyOptions.persistenceStore.saveInProgress(this.functionPayloadToBeHashed); + await this.idempotencyOptions.persistenceStore.saveInProgress( + this.functionPayloadToBeHashed, + ); } catch (e) { if (e instanceof IdempotencyItemAlreadyExistsError) { - const idempotencyRecord: IdempotencyRecord = await this.idempotencyOptions.persistenceStore.getRecord(this.functionPayloadToBeHashed); + const idempotencyRecord: IdempotencyRecord = + await this.idempotencyOptions.persistenceStore.getRecord( + this.functionPayloadToBeHashed + ); return this.determineResultFromIdempotencyRecord(idempotencyRecord); } else { @@ -45,4 +88,4 @@ export class IdempotencyHandler { return this.functionToMakeIdempotent(this.fullFunctionPayload); } -} \ No newline at end of file +} diff --git a/packages/idempotency/src/idempotentDecorator.ts b/packages/idempotency/src/idempotentDecorator.ts index 4db95a86b0..7e35cfd0de 100644 --- a/packages/idempotency/src/idempotentDecorator.ts +++ b/packages/idempotency/src/idempotentDecorator.ts @@ -11,12 +11,11 @@ const idempotent = function (options: IdempotencyOptions) { descriptor.value = function(record: GenericTempRecord){ const idempotencyHandler = new IdempotencyHandler(childFunction, record[options.dataKeywordArgument], options, record); - return idempotencyHandler.processIdempotency(); + return idempotencyHandler.handle(); }; - + return descriptor; - }; + }; }; - + export { idempotent }; - \ No newline at end of file diff --git a/packages/idempotency/src/makeFunctionIdempotent.ts b/packages/idempotency/src/makeFunctionIdempotent.ts index 75c44b47c3..77e0a9e994 100644 --- a/packages/idempotency/src/makeFunctionIdempotent.ts +++ b/packages/idempotency/src/makeFunctionIdempotent.ts @@ -13,7 +13,7 @@ const makeFunctionIdempotent = function ( const wrappedFn: AnyIdempotentFunction = function (record: GenericTempRecord): Promise { const idempotencyHandler: IdempotencyHandler = new IdempotencyHandler(fn, record[options.dataKeywordArgument], options, record); - return idempotencyHandler.processIdempotency(); + return idempotencyHandler.handle(); }; return wrappedFn; diff --git a/packages/idempotency/tests/unit/IdempotencyHandler.test.ts b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts new file mode 100644 index 0000000000..aeff922501 --- /dev/null +++ b/packages/idempotency/tests/unit/IdempotencyHandler.test.ts @@ -0,0 +1,178 @@ +/** + * Test Idempotency Handler + * + * @group unit/idempotency/IdempotencyHandler + */ + +import { + IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, + IdempotencyItemAlreadyExistsError, + IdempotencyPersistenceLayerError +} from '../../src/Exceptions'; +import { IdempotencyOptions, IdempotencyRecordStatus } from '../../src/types'; +import { BasePersistenceLayer, IdempotencyRecord } from '../../src/persistence'; +import { IdempotencyHandler } from '../../src/IdempotencyHandler'; + +class PersistenceLayerTestClass extends BasePersistenceLayer { + protected _deleteRecord = jest.fn(); + protected _getRecord = jest.fn(); + protected _putRecord = jest.fn(); + protected _updateRecord = jest.fn(); +} + +const mockFunctionToMakeIdempotent = jest.fn(); +const mockFunctionPayloadToBeHashed = {}; +const mockIdempotencyOptions: IdempotencyOptions = { + persistenceStore: new PersistenceLayerTestClass(), + dataKeywordArgument: 'testingKey' +}; +const mockFullFunctionPayload = {}; + +const idempotentHandler = new IdempotencyHandler( + mockFunctionToMakeIdempotent, + mockFunctionPayloadToBeHashed, + mockIdempotencyOptions, + mockFullFunctionPayload, +); + +describe('Class IdempotencyHandler', () => { + beforeEach(() => jest.resetAllMocks()); + + describe('Method: determineResultFromIdempotencyRecord', () => { + test('when record is in progress and within expiry window, it rejects with IdempotencyAlreadyInProgressError', async () => { + + const stubRecord = new IdempotencyRecord({ + idempotencyKey: 'idempotencyKey', + expiryTimestamp: Date.now() + 1000, // should be in the future + inProgressExpiryTimestamp: 0, // less than current time in milliseconds + responseData: { responseData: 'responseData' }, + payloadHash: 'payloadHash', + status: IdempotencyRecordStatus.INPROGRESS + }); + + expect(stubRecord.isExpired()).toBe(false); + expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS); + + try { + await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord); + } catch (e) { + expect(e).toBeInstanceOf(IdempotencyAlreadyInProgressError); + } + }); + + test('when record is in progress and outside expiry window, it rejects with IdempotencyInconsistentStateError', async () => { + + const stubRecord = new IdempotencyRecord({ + idempotencyKey: 'idempotencyKey', + expiryTimestamp: Date.now() + 1000, // should be in the future + inProgressExpiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past + responseData: { responseData: 'responseData' }, + payloadHash: 'payloadHash', + status: IdempotencyRecordStatus.INPROGRESS + }); + + expect(stubRecord.isExpired()).toBe(false); + expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.INPROGRESS); + + try { + await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord); + } catch (e) { + expect(e).toBeInstanceOf(IdempotencyInconsistentStateError); + } + }); + + test('when record is expired, it rejects with IdempotencyInconsistentStateError', async () => { + + const stubRecord = new IdempotencyRecord({ + idempotencyKey: 'idempotencyKey', + expiryTimestamp: new Date().getUTCMilliseconds() - 1000, // should be in the past + inProgressExpiryTimestamp: 0, // less than current time in milliseconds + responseData: { responseData: 'responseData' }, + payloadHash: 'payloadHash', + status: IdempotencyRecordStatus.EXPIRED + }); + + expect(stubRecord.isExpired()).toBe(true); + expect(stubRecord.getStatus()).toBe(IdempotencyRecordStatus.EXPIRED); + + try { + await idempotentHandler.determineResultFromIdempotencyRecord(stubRecord); + } catch (e) { + expect(e).toBeInstanceOf(IdempotencyInconsistentStateError); + } + }); + }); + + describe('Method: handle', () => { + + afterAll(() => jest.restoreAllMocks()); // restore processIdempotency for other tests + + test('when IdempotencyAlreadyInProgressError is thrown, it retries two times', async () => { + const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new IdempotencyAlreadyInProgressError('There is already an execution in progress')); + await expect( + idempotentHandler.handle() + ).rejects.toThrow(IdempotencyAlreadyInProgressError); + expect(mockProcessIdempotency).toHaveBeenCalledTimes(2); + }); + + test('when non IdempotencyAlreadyInProgressError is thrown, it rejects', async () => { + + const mockProcessIdempotency = jest.spyOn(IdempotencyHandler.prototype, 'processIdempotency').mockRejectedValue(new Error('Some other error')); + + await expect( + idempotentHandler.handle() + ).rejects.toThrow(Error); + expect(mockProcessIdempotency).toHaveBeenCalledTimes(1); + }); + + }); + + describe('Method: processIdempotency', () => { + + test('when persistenceStore saves successfuly, it resolves', async () => { + const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockResolvedValue(); + + mockFunctionToMakeIdempotent.mockImplementation(() => Promise.resolve('result')); + + await expect( + idempotentHandler.processIdempotency() + ).resolves.toBe('result'); + expect(mockSaveInProgress).toHaveBeenCalledTimes(1); + }); + + test('when persistences store throws any error, it wraps the error to IdempotencyPersistencesLayerError', async () => { + const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new Error('Some error')); + const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result'); + + await expect( + idempotentHandler.processIdempotency() + ).rejects.toThrow(IdempotencyPersistenceLayerError); + expect(mockSaveInProgress).toHaveBeenCalledTimes(1); + expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(0); + }); + + test('when idempotency item already exists, it returns the existing record', async () => { + const mockSaveInProgress = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'saveInProgress').mockRejectedValue(new IdempotencyItemAlreadyExistsError('There is already an execution in progress')); + + const stubRecord = new IdempotencyRecord({ + idempotencyKey: 'idempotencyKey', + expiryTimestamp: 0, + inProgressExpiryTimestamp: 0, + responseData: { responseData: 'responseData' }, + payloadHash: 'payloadHash', + status: IdempotencyRecordStatus.INPROGRESS + }); + const mockGetRecord = jest.spyOn(mockIdempotencyOptions.persistenceStore, 'getRecord').mockImplementation(() => Promise.resolve(stubRecord)); + const mockDetermineResultFromIdempotencyRecord = jest.spyOn(IdempotencyHandler.prototype, 'determineResultFromIdempotencyRecord').mockResolvedValue('result'); + + await expect( + idempotentHandler.processIdempotency() + ).resolves.toBe('result'); + expect(mockSaveInProgress).toHaveBeenCalledTimes(1); + expect(mockGetRecord).toHaveBeenCalledTimes(1); + expect(mockDetermineResultFromIdempotencyRecord).toHaveBeenCalledTimes(1); + }); + }); + +}); + diff --git a/packages/idempotency/tests/unit/makeFunctionIdempotent.test.ts b/packages/idempotency/tests/unit/makeFunctionIdempotent.test.ts index cb531bae23..c8d90b251b 100644 --- a/packages/idempotency/tests/unit/makeFunctionIdempotent.test.ts +++ b/packages/idempotency/tests/unit/makeFunctionIdempotent.test.ts @@ -4,17 +4,14 @@ * @group unit/idempotency/makeFunctionIdempotent */ import { IdempotencyOptions } from '../../src/types/IdempotencyOptions'; -import { IdempotencyRecord, BasePersistenceLayer } from '../../src/persistence'; +import { BasePersistenceLayer, IdempotencyRecord } from '../../src/persistence'; import { makeFunctionIdempotent } from '../../src/makeFunctionIdempotent'; +import type { AnyIdempotentFunction, IdempotencyRecordOptions } from '../../src/types'; import { IdempotencyRecordStatus } from '../../src/types'; -import type { - AnyIdempotentFunction, - IdempotencyRecordOptions -} from '../../src/types'; import { - IdempotencyItemAlreadyExistsError, IdempotencyAlreadyInProgressError, IdempotencyInconsistentStateError, + IdempotencyItemAlreadyExistsError, IdempotencyPersistenceLayerError } from '../../src/Exceptions'; @@ -29,8 +26,11 @@ class PersistenceLayerTestClass extends BasePersistenceLayer { } describe('Given a function to wrap', (functionToWrap = jest.fn()) => { - beforeEach(()=> jest.clearAllMocks()); - describe('Given options for idempotency', (options: IdempotencyOptions = { persistenceStore: new PersistenceLayerTestClass(), dataKeywordArgument: 'testingKey' }) => { + beforeEach(() => jest.clearAllMocks()); + describe('Given options for idempotency', (options: IdempotencyOptions = { + persistenceStore: new PersistenceLayerTestClass(), + dataKeywordArgument: 'testingKey' + }) => { const keyValueToBeSaved = 'thisWillBeSaved'; const inputRecord = { testingKey: keyValueToBeSaved, otherKey: 'thisWillNot' }; describe('When wrapping a function with no previous executions', () => { @@ -66,11 +66,11 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { resultingError = e as Error; } }); - + test('Then it will attempt to save the record to INPROGRESS', () => { expect(mockSaveInProgress).toBeCalledWith(keyValueToBeSaved); }); - + test('Then it will get the previous execution record', () => { expect(mockGetRecord).toBeCalledWith(keyValueToBeSaved); }); @@ -79,7 +79,7 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { expect(functionToWrap).not.toBeCalled(); }); - test('Then an IdempotencyAlreadyInProgressError is thrown', ()=> { + test('Then an IdempotencyAlreadyInProgressError is thrown', () => { expect(resultingError).toBeInstanceOf(IdempotencyAlreadyInProgressError); }); }); @@ -101,11 +101,11 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { resultingError = e as Error; } }); - + test('Then it will attempt to save the record to INPROGRESS', () => { expect(mockSaveInProgress).toBeCalledWith(keyValueToBeSaved); }); - + test('Then it will get the previous execution record', () => { expect(mockGetRecord).toBeCalledWith(keyValueToBeSaved); }); @@ -113,8 +113,8 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { test('Then the function that was wrapped is not called again', () => { expect(functionToWrap).not.toBeCalled(); }); - - test('Then an IdempotencyInconsistentStateError is thrown', ()=> { + + test('Then an IdempotencyInconsistentStateError is thrown', () => { expect(resultingError).toBeInstanceOf(IdempotencyInconsistentStateError); }); }); @@ -127,15 +127,15 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { idempotencyKey: 'key', status: IdempotencyRecordStatus.COMPLETED }; - mockGetRecord.mockResolvedValue(new IdempotencyRecord(idempotencyOptions)); + mockGetRecord.mockResolvedValue(new IdempotencyRecord(idempotencyOptions)); resultingFunction = makeFunctionIdempotent(functionToWrap, options); await resultingFunction(inputRecord); }); - + test('Then it will attempt to save the record to INPROGRESS', () => { expect(mockSaveInProgress).toBeCalledWith(keyValueToBeSaved); }); - + test('Then it will get the previous execution record', () => { expect(mockGetRecord).toBeCalledWith(keyValueToBeSaved); }); @@ -158,12 +158,12 @@ describe('Given a function to wrap', (functionToWrap = jest.fn()) => { resultingError = e as Error; } }); - + test('Then it will attempt to save the record to INPROGRESS', () => { expect(mockSaveInProgress).toBeCalledWith(keyValueToBeSaved); }); - - test('Then an IdempotencyPersistenceLayerError is thrown', ()=> { + + test('Then an IdempotencyPersistenceLayerError is thrown', () => { expect(resultingError).toBeInstanceOf(IdempotencyPersistenceLayerError); }); });