Skip to content

Commit

Permalink
Track processed events in Azure provider (#1527)
Browse files Browse the repository at this point in the history
* Add infra for processed events table

* Add logic for skipping duplicate events

* Fix unit tests in framework-core

* Add unit tests to core and Azure provider

* Add dummy functions to local provider

* Add dummy functions to deprecated AWS provider

* Add rush change file

* Fix small linter issues

* Rename "processed" to "dispatched" events

* Refactor events processor to use atomic operation

* Remove unused adapter methods

* Add several minor changes

* Change error handling

---------

Co-authored-by: Castro, Mario <mariocs@optum.com>
  • Loading branch information
MarcAstr0 and Castro, Mario authored Mar 20, 2024
1 parent 92f9aa5 commit 7cf0d74
Show file tree
Hide file tree
Showing 16 changed files with 224 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@boostercloud/framework-core",
"comment": "Track processed events in Azure provider to avoid duplication",
"type": "minor"
}
],
"packageName": "@boostercloud/framework-core"
}
25 changes: 23 additions & 2 deletions packages/framework-core/src/booster-event-processor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {
TraceActionTypes,
BoosterConfig,
EventEnvelope,
EventHandlerGlobalError,
EventHandlerInterface,
EventInterface,
Register,
TraceActionTypes,
UUID,
} from '@boostercloud/framework-types'
import { EventStore } from './services/event-store'
Expand All @@ -24,8 +24,10 @@ export class BoosterEventProcessor {
*/
public static eventProcessor(eventStore: EventStore, readModelStore: ReadModelStore): EventsStreamingCallback {
return async (entityName, entityID, eventEnvelopes, config) => {
// Filter events that have already been dispatched
const eventsNotDispatched = await BoosterEventProcessor.filterDispatched(config, eventEnvelopes, eventStore)
const eventEnvelopesProcessors = [
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(eventEnvelopes, config),
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(eventsNotDispatched, config),
]

// Read models are not updated for notification events (events that are not related to an entity but a topic)
Expand All @@ -39,6 +41,25 @@ export class BoosterEventProcessor {
}
}

private static async filterDispatched(
config: BoosterConfig,
eventEnvelopes: Array<EventEnvelope>,
eventStore: EventStore
): Promise<Array<EventEnvelope>> {
const logger = getLogger(config, 'BoosterEventDispatcher#filterDispatched')
const filteredResults = await Promise.all(
eventEnvelopes.map(async (eventEnvelope) => {
const result = await eventStore.storeDispatchedEvent(eventEnvelope)
if (!result) {
logger.warn('Event has already been dispatched. Skipping.', eventEnvelope)
}
return result
})
)

return eventEnvelopes.filter((_, index) => filteredResults[index])
}

private static async snapshotAndUpdateReadModels(
config: BoosterConfig,
entityName: string,
Expand Down
12 changes: 12 additions & 0 deletions packages/framework-core/src/services/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ export class EventStore {
}
}

@Trace(TraceActionTypes.CUSTOM)
public async storeDispatchedEvent(eventEnvelope: EventEnvelope): Promise<boolean | undefined> {
const logger = getLogger(this.config, 'EventStore#storeDispatchedEvent')
try {
logger.debug('Storing event in the dispatched event store:', eventEnvelope)
return await this.config.provider.events.storeDispatched(eventEnvelope, this.config)
} catch (e) {
logger.debug('Could not store dispatched event. Continue its processing.', {error: e, eventEnvelope })
return true
}
}

@Trace(TraceActionTypes.STORE_SNAPSHOT)
private async storeSnapshot(
snapshot: NonPersistedEntitySnapshotEnvelope
Expand Down
38 changes: 33 additions & 5 deletions packages/framework-core/test/booster-event-processor.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import { fake, replace, restore, createStubInstance } from 'sinon'
import { createStubInstance, fake, match, replace, restore } from 'sinon'
import {
BoosterConfig,
EntitySnapshotEnvelope,
UUID,
EntityInterface,
ProviderLibrary,
Register,
EntitySnapshotEnvelope,
EventInterface,
NonPersistedEventEnvelope,
ProviderLibrary,
Register,
UUID,
} from '@boostercloud/framework-types'
import { expect } from './expect'
import { ReadModelStore } from '../src/services/read-model-store'
Expand All @@ -24,6 +24,7 @@ class SomeEvent {
public entityID(): UUID {
return this.id
}

public getPrefixedId(prefix: string): string {
return `${prefix}-${this.id}`
}
Expand Down Expand Up @@ -129,6 +130,9 @@ describe('BoosterEventProcessor', () => {
const stubReadModelStore = createStubInstance(ReadModelStore)

const boosterEventProcessor = BoosterEventProcessor as any
const fakeFilterDispatched = fake.returns([someEvent])

replace(boosterEventProcessor, 'filterDispatched', fakeFilterDispatched)
replace(boosterEventProcessor, 'snapshotAndUpdateReadModels', fake())
replace(boosterEventProcessor, 'dispatchEntityEventsToEventHandlers', fake())

Expand Down Expand Up @@ -318,6 +322,30 @@ describe('BoosterEventProcessor', () => {
})
})

describe('the `filterDispatched` method', () => {
it("removes events if they've been already dispatched", async () => {
const boosterEventProcessor = BoosterEventProcessor as any
const eventStore = createStubInstance(EventStore)
const someEventEnvelope = { ...someEvent, id: 'event-id' }
eventStore.storeDispatchedEvent = fake.returns(false) as any

const eventsNotDispatched = await boosterEventProcessor.filterDispatched(
config,
[someEventEnvelope],
eventStore
)

expect(eventStore.storeDispatchedEvent).to.have.been.called
expect(eventStore.storeDispatchedEvent).to.have.been.calledOnceWith(someEventEnvelope)
expect(eventsNotDispatched).to.deep.equal([])
expect(config.logger?.warn).to.have.been.calledWith(
'[Booster]|BoosterEventDispatcher#filterDispatched: ',
'Event has already been dispatched. Skipping.',
match.any
)
})
})

it('calls an instance method in the event and it is executed without failing', async () => {
config.eventHandlers[SomeEvent.name] = [{ handle: AnEventHandler.handle }]
const boosterEventProcessor = BoosterEventProcessor as any
Expand Down
7 changes: 7 additions & 0 deletions packages/framework-provider-aws/src/library/events-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,10 @@ async function persistEvent(
throw e
}
}

/**
* Dummy method that'll always return true, since local provider won't be tracking dispatched events
*/
export async function storeDispatchedEvent() {
return true
}
2 changes: 2 additions & 0 deletions packages/framework-provider-aws/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
rawEventsToEnvelopes,
readEntityEventsSince,
readEntityLatestSnapshot,
storeDispatchedEvent,
storeEvents,
storeSnapshot,
} from './library/events-adapter'
Expand Down Expand Up @@ -75,6 +76,7 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => {
searchEntitiesIDs: searchEntitiesIds.bind(null, dynamoDB),
store: storeEvents.bind(null, dynamoDB),
storeSnapshot: storeSnapshot.bind(null, dynamoDB),
storeDispatched: storeDispatchedEvent,
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const dispatchedEventsContainer = this.createDispatchedEventsContainer(
azureProvider,
appPrefix,
terraformStack,
config,
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const readModels = Object.keys(config.readModels).map((readModel) =>
this.createReadModel(azureProvider, terraformStack, config, readModel, cosmosdbDatabase, cosmosdbSqlDatabase)
)
Expand All @@ -53,7 +61,12 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
return [cosmosdbSqlEventContainer, subscriptionsContainer, connectionsContainer].concat(readModels)
return [
cosmosdbSqlEventContainer,
dispatchedEventsContainer,
subscriptionsContainer,
connectionsContainer,
].concat(readModels)
}
return [cosmosdbSqlEventContainer].concat(readModels)
}
Expand Down Expand Up @@ -81,6 +94,31 @@ export class TerraformContainers {
})
}

private static createDispatchedEventsContainer(
providerResource: AzurermProvider,
appPrefix: string,
terraformStackResource: TerraformStack,
config: BoosterConfig,
cosmosdbDatabaseResource: cosmosdbAccount.CosmosdbAccount,
cosmosdbSqlDatabaseResource: cosmosdbSqlDatabase.CosmosdbSqlDatabase
): cosmosdbSqlContainer.CosmosdbSqlContainer {
const idEvent = toTerraformName(appPrefix, 'dispatched-events')
return new cosmosdbSqlContainer.CosmosdbSqlContainer(terraformStackResource, idEvent, {
name: config.resourceNames.dispatchedEventsStore,
resourceGroupName: cosmosdbDatabaseResource.resourceGroupName,
accountName: cosmosdbDatabaseResource.name,
databaseName: cosmosdbSqlDatabaseResource.name,
partitionKeyPath: '/eventId',
partitionKeyVersion: 2,
uniqueKey: [{ paths: ['/eventId'] }],
autoscaleSettings: {
maxThroughput: MAX_CONTAINER_THROUGHPUT,
},
defaultTtl: config.dispatchedEventsTtl,
provider: providerResource,
})
}

private static createReadModel(
providerResource: AzurermProvider,
terraformStackResource: TerraformStack,
Expand Down
6 changes: 4 additions & 2 deletions packages/framework-provider-azure/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
rawEventsToEnvelopes,
readEntityEventsSince,
readEntityLatestSnapshot,
storeDispatchedEvent,
storeEvents,
storeSnapshot,
} from './library/events-adapter'
Expand Down Expand Up @@ -38,13 +39,13 @@ import { EventHubProducerClient, RetryMode } from '@azure/event-hubs'
import { dedupEventStream, rawEventsStreamToEnvelopes } from './library/events-stream-consumer-adapter'
import {
areDatabaseReadModelsUp,
databaseUrl,
databaseEventsHealthDetails,
databaseReadModelsHealthDetails,
databaseUrl,
graphqlFunctionUrl,
isDatabaseEventUp,
isGraphQLFunctionUp,
rawRequestToSensorHealth,
databaseReadModelsHealthDetails,
} from './library/health-adapter'

let cosmosClient: CosmosClient
Expand Down Expand Up @@ -104,6 +105,7 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => ({
latestEntitySnapshot: readEntityLatestSnapshot.bind(null, cosmosClient),
search: searchEvents.bind(null, cosmosClient),
searchEntitiesIDs: searchEntitiesIds.bind(null, cosmosClient),
storeDispatched: storeDispatchedEvent.bind(null, cosmosClient),
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
34 changes: 31 additions & 3 deletions packages/framework-provider-azure/src/library/events-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { CosmosClient, SqlQuerySpec } from '@azure/cosmos'
import {
EventEnvelope,
BoosterConfig,
UUID,
EntitySnapshotEnvelope,
NonPersistedEventEnvelope,
EventEnvelope,
NonPersistedEntitySnapshotEnvelope,
NonPersistedEventEnvelope,
UUID,
} from '@boostercloud/framework-types'
import { getLogger } from '@boostercloud/framework-common-helpers'
import { eventsStoreAttributes } from '../constants'
Expand Down Expand Up @@ -180,3 +180,31 @@ export async function storeSnapshot(
logger.debug('Snapshot stored', snapshotEnvelope)
return persistableEntitySnapshot
}

export async function storeDispatchedEvent(
cosmosDb: CosmosClient,
eventEnvelope: EventEnvelope,
config: BoosterConfig
): Promise<boolean> {
const logger = getLogger(config, 'events-adapter#storeDispatchedEvent')
logger.debug('[EventsAdapter#storeDispatchedEvent] Storing EventEnvelope for event with ID: ', eventEnvelope.id)
try {
await cosmosDb
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.dispatchedEventsStore)
.items.create({
eventId: eventEnvelope.id,
})
return true
} catch (e) {
if (e.code === 409) {
// If an item with the same ID already exists in the container, it will return a 409 status code.
// See https://learn.microsoft.com/en-us/rest/api/cosmos-db/http-status-codes-for-cosmosdb
logger.debug('[EventsAdapter#storeDispatchedEvent] Event has already been dispatched', eventEnvelope.id)
return false
} else {
logger.error('[EventsAdapter#storeDispatchedEvent] Error storing dispatched event', e)
throw e
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { EventEnvelope } from '@boostercloud/framework-types'
import { random, date } from 'faker'
import { date, random } from 'faker'
import { Context, ExecutionContext, Logger as AzureLogger, TraceContext } from '@azure/functions'

export function createMockEventEnvelopes(numOfEvents = 1): Array<EventEnvelope> {
Expand All @@ -18,9 +18,10 @@ export function createMockEventEnvelopes(numOfEvents = 1): Array<EventEnvelope>
entityTypeName: random.word(),
requestID: random.uuid(),
createdAt: date.past().toISOString(),
id: random.uuid(),
},
0,
numOfEvents
numOfEvents,
)
}

Expand All @@ -46,7 +47,8 @@ export function wrapEventEnvelopesForCosmosDB(eventEnvelopes: Array<EventEnvelop
invocationId: '',
log: {} as AzureLogger,
traceContext: {} as TraceContext,
done(err?: Error | string | null, result?: any): void {},
done(err?: Error | string | null, result?: any): void {
},
bindings: { rawEvent: eventEnvelopes },
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { expect } from '../expect'
import * as EventsAdapter from '../../src/library/events-adapter'
import { createStubInstance, fake, restore, match, stub, SinonStubbedInstance } from 'sinon'
import { createStubInstance, fake, match, restore, SinonStubbedInstance, stub } from 'sinon'
import { BoosterConfig, EventEnvelope, UUID } from '@boostercloud/framework-types'
import { CosmosClient } from '@azure/cosmos'
import { eventsStoreAttributes } from '../../src/constants'
Expand Down Expand Up @@ -141,4 +141,20 @@ describe('Events adapter', () => {
)
})
})

describe('The "storeDispatchedEvent" method', () => {
it('Persists the IDs of the eventEnvelopes passed via parameters', async () => {
await EventsAdapter.storeDispatchedEvent(mockCosmosDbClient as any, mockEvents[0], mockConfig)

expect(mockCosmosDbClient.database).to.have.been.calledWithExactly(mockConfig.resourceNames.applicationStack)
expect(
mockCosmosDbClient.database(mockConfig.resourceNames.applicationStack).container
).to.have.been.calledWithExactly(mockConfig.resourceNames.dispatchedEventsStore)
expect(
mockCosmosDbClient
.database(mockConfig.resourceNames.applicationStack)
.container(mockConfig.resourceNames.dispatchedEventsStore).items.create
).to.have.been.calledWithExactly(match({ eventId: mockEvents[0].id }))
})
})
})
Loading

0 comments on commit 7cf0d74

Please sign in to comment.