From 26ce5dfb0270009fc10f003f5627046ddaf5ae4e Mon Sep 17 00:00:00 2001 From: Alex Hinson Date: Fri, 18 Sep 2020 13:58:57 -0700 Subject: [PATCH] fix(@aws-amplify/datastore): use runExclusive when enqueuing (#6828) --- packages/datastore/src/sync/outbox.ts | 86 ++++++++++--------- .../datastore/src/sync/processors/mutation.ts | 2 +- 2 files changed, 45 insertions(+), 43 deletions(-) diff --git a/packages/datastore/src/sync/outbox.ts b/packages/datastore/src/sync/outbox.ts index d51fdc01593..10dad1b53be 100644 --- a/packages/datastore/src/sync/outbox.ts +++ b/packages/datastore/src/sync/outbox.ts @@ -27,54 +27,56 @@ class MutationEventOutbox { storage: Storage, mutationEvent: MutationEvent ): Promise { - const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ - 'MutationEvent' - ]; - - const predicate = ModelPredicateCreator.createFromExisting( - mutationEventModelDefinition, - c => - c - .modelId('eq', mutationEvent.modelId) - .id('ne', this.inProgressMutationEventId) - ); + storage.runExclusive(async s => { + const mutationEventModelDefinition = this.schema.namespaces[SYNC].models[ + 'MutationEvent' + ]; - const [first] = await storage.query(this.MutationEvent, predicate); + const predicate = ModelPredicateCreator.createFromExisting( + mutationEventModelDefinition, + c => + c + .modelId('eq', mutationEvent.modelId) + .id('ne', this.inProgressMutationEventId) + ); - if (first === undefined) { - await storage.save(mutationEvent, undefined, this.ownSymbol); - return; - } + const [first] = await s.query(this.MutationEvent, predicate); - const { operation: incomingMutationType } = mutationEvent; + if (first === undefined) { + await s.save(mutationEvent, undefined, this.ownSymbol); + return; + } - if (first.operation === TransformerMutationType.CREATE) { - if (incomingMutationType === TransformerMutationType.DELETE) { - // delete all for model - await storage.delete(this.MutationEvent, predicate); + const { operation: incomingMutationType } = mutationEvent; + + if (first.operation === TransformerMutationType.CREATE) { + if (incomingMutationType === TransformerMutationType.DELETE) { + // delete all for model + await s.delete(this.MutationEvent, predicate); + } else { + // first gets updated with incoming's data, condition intentionally skiped + await s.save( + this.MutationEvent.copyOf(first, draft => { + draft.data = mutationEvent.data; + }), + undefined, + this.ownSymbol + ); + } } else { - // first gets updated with incoming's data, condition intentionally skiped - await storage.save( - this.MutationEvent.copyOf(first, draft => { - draft.data = mutationEvent.data; - }), - undefined, - this.ownSymbol - ); - } - } else { - const { condition: incomingConditionJSON } = mutationEvent; - const incomingCondition = JSON.parse(incomingConditionJSON); - - // If no condition - if (Object.keys(incomingCondition).length === 0) { - // delete all for model - await storage.delete(this.MutationEvent, predicate); - } + const { condition: incomingConditionJSON } = mutationEvent; + const incomingCondition = JSON.parse(incomingConditionJSON); + + // If no condition + if (Object.keys(incomingCondition).length === 0) { + // delete all for model + await s.delete(this.MutationEvent, predicate); + } - // Enqueue new one - await storage.save(mutationEvent, undefined, this.ownSymbol); - } + // Enqueue new one + await s.save(mutationEvent, undefined, this.ownSymbol); + } + }); } public async dequeue(storage: StorageFacade): Promise { diff --git a/packages/datastore/src/sync/processors/mutation.ts b/packages/datastore/src/sync/processors/mutation.ts index 0d329d89f8f..842eabff92c 100644 --- a/packages/datastore/src/sync/processors/mutation.ts +++ b/packages/datastore/src/sync/processors/mutation.ts @@ -310,7 +310,7 @@ class MutationProcessor { : null, }); } catch (err) { - logger.warn("failed to execute errorHandler", err); + logger.warn('failed to execute errorHandler', err); } finally { // Return empty tuple, dequeues the mutation return error.data