Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track processed events in Azure provider #1527

Merged
merged 13 commits into from
Mar 20, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@boostercloud/framework-core",
"comment": "Track processed events in Azure provider to avoid duplication",
"type": "minor"
}
],
"packageName": "@boostercloud/framework-core"
}
25 changes: 23 additions & 2 deletions packages/framework-core/src/booster-event-processor.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {
TraceActionTypes,
BoosterConfig,
EventEnvelope,
EventHandlerGlobalError,
EventHandlerInterface,
EventInterface,
Register,
TraceActionTypes,
UUID,
} from '@boostercloud/framework-types'
import { EventStore } from './services/event-store'
Expand All @@ -24,8 +24,10 @@ export class BoosterEventProcessor {
*/
public static eventProcessor(eventStore: EventStore, readModelStore: ReadModelStore): EventsStreamingCallback {
return async (entityName, entityID, eventEnvelopes, config) => {
// Filter events that have already been dispatched
const eventsNotDispatched = await BoosterEventProcessor.filterDispatched(config, eventEnvelopes, eventStore)
const eventEnvelopesProcessors = [
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(eventEnvelopes, config),
BoosterEventProcessor.dispatchEntityEventsToEventHandlers(eventsNotDispatched, config),
]

// Read models are not updated for notification events (events that are not related to an entity but a topic)
Expand All @@ -39,6 +41,25 @@ export class BoosterEventProcessor {
}
alvaroloes marked this conversation as resolved.
Show resolved Hide resolved
}

private static async filterDispatched(
config: BoosterConfig,
eventEnvelopes: Array<EventEnvelope>,
eventStore: EventStore
): Promise<Array<EventEnvelope>> {
const logger = getLogger(config, 'BoosterEventDispatcher#filterDispatched')
const filteredResults = await Promise.all(
eventEnvelopes.map(async (eventEnvelope) => {
const result = await eventStore.storeDispatchedEvent(eventEnvelope)
if (!result) {
logger.warn('Event has already been dispatched. Skipping.', eventEnvelope)
}
return result
})
)

return eventEnvelopes.filter((_, index) => filteredResults[index])
}

private static async snapshotAndUpdateReadModels(
config: BoosterConfig,
entityName: string,
Expand Down
12 changes: 12 additions & 0 deletions packages/framework-core/src/services/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ export class EventStore {
}
}

@Trace(TraceActionTypes.CUSTOM)
public async storeDispatchedEvent(eventEnvelope: EventEnvelope): Promise<boolean | undefined> {
const logger = getLogger(this.config, 'EventStore#storeDispatchedEvent')
try {
logger.debug('Storing event in the dispatched event store:', eventEnvelope)
return await this.config.provider.events.storeDispatched(eventEnvelope, this.config)
} catch (e) {
logger.debug('Could not store dispatched event. Continue its processing.', eventEnvelope)
MarcAstr0 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
}

@Trace(TraceActionTypes.STORE_SNAPSHOT)
private async storeSnapshot(
snapshot: NonPersistedEntitySnapshotEnvelope
Expand Down
38 changes: 33 additions & 5 deletions packages/framework-core/test/booster-event-processor.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
/* eslint-disable @typescript-eslint/explicit-function-return-type */
import { fake, replace, restore, createStubInstance } from 'sinon'
import { createStubInstance, fake, match, replace, restore } from 'sinon'
import {
BoosterConfig,
EntitySnapshotEnvelope,
UUID,
EntityInterface,
ProviderLibrary,
Register,
EntitySnapshotEnvelope,
EventInterface,
NonPersistedEventEnvelope,
ProviderLibrary,
Register,
UUID,
} from '@boostercloud/framework-types'
import { expect } from './expect'
import { ReadModelStore } from '../src/services/read-model-store'
Expand All @@ -24,6 +24,7 @@ class SomeEvent {
public entityID(): UUID {
return this.id
}

public getPrefixedId(prefix: string): string {
return `${prefix}-${this.id}`
}
Expand Down Expand Up @@ -129,6 +130,9 @@ describe('BoosterEventProcessor', () => {
const stubReadModelStore = createStubInstance(ReadModelStore)

const boosterEventProcessor = BoosterEventProcessor as any
const fakeFilterDispatched = fake.returns([someEvent])

replace(boosterEventProcessor, 'filterDispatched', fakeFilterDispatched)
replace(boosterEventProcessor, 'snapshotAndUpdateReadModels', fake())
replace(boosterEventProcessor, 'dispatchEntityEventsToEventHandlers', fake())

Expand Down Expand Up @@ -318,6 +322,30 @@ describe('BoosterEventProcessor', () => {
})
})

describe('the `filterDispatched` method', () => {
it("removes events if they've been already dispatched", async () => {
const boosterEventProcessor = BoosterEventProcessor as any
const eventStore = createStubInstance(EventStore)
const someEventEnvelope = { ...someEvent, id: 'event-id' }
eventStore.storeDispatchedEvent = fake.returns(false) as any

const eventsNotDispatched = await boosterEventProcessor.filterDispatched(
config,
[someEventEnvelope],
eventStore
)

expect(eventStore.storeDispatchedEvent).to.have.been.called
expect(eventStore.storeDispatchedEvent).to.have.been.calledOnceWith(someEventEnvelope)
expect(eventsNotDispatched).to.deep.equal([])
expect(config.logger?.warn).to.have.been.calledWith(
'[Booster]|BoosterEventDispatcher#filterDispatched: ',
'Event has already been dispatched. Skipping.',
match.any
)
})
})

it('calls an instance method in the event and it is executed without failing', async () => {
config.eventHandlers[SomeEvent.name] = [{ handle: AnEventHandler.handle }]
const boosterEventProcessor = BoosterEventProcessor as any
Expand Down
7 changes: 7 additions & 0 deletions packages/framework-provider-aws/src/library/events-adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,3 +188,10 @@ async function persistEvent(
throw e
}
}

/**
* Dummy method that'll always return true, since local provider won't be tracking dispatched events
*/
export async function storeDispatchedEvent() {
return true
}
2 changes: 2 additions & 0 deletions packages/framework-provider-aws/src/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
rawEventsToEnvelopes,
readEntityEventsSince,
readEntityLatestSnapshot,
storeDispatchedEvent,
storeEvents,
storeSnapshot,
} from './library/events-adapter'
Expand Down Expand Up @@ -75,6 +76,7 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => {
searchEntitiesIDs: searchEntitiesIds.bind(null, dynamoDB),
store: storeEvents.bind(null, dynamoDB),
storeSnapshot: storeSnapshot.bind(null, dynamoDB),
storeDispatched: storeDispatchedEvent,
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const dispatchedEventsContainer = this.createDispatchedEventsContainer(
azureProvider,
appPrefix,
terraformStack,
config,
cosmosdbDatabase,
cosmosdbSqlDatabase
)
const readModels = Object.keys(config.readModels).map((readModel) =>
this.createReadModel(azureProvider, terraformStack, config, readModel, cosmosdbDatabase, cosmosdbSqlDatabase)
)
Expand All @@ -53,7 +61,12 @@ export class TerraformContainers {
cosmosdbDatabase,
cosmosdbSqlDatabase
)
return [cosmosdbSqlEventContainer, subscriptionsContainer, connectionsContainer].concat(readModels)
return [
cosmosdbSqlEventContainer,
dispatchedEventsContainer,
subscriptionsContainer,
connectionsContainer,
].concat(readModels)
}
return [cosmosdbSqlEventContainer].concat(readModels)
}
Expand Down Expand Up @@ -81,6 +94,31 @@ export class TerraformContainers {
})
}

private static createDispatchedEventsContainer(
providerResource: AzurermProvider,
appPrefix: string,
terraformStackResource: TerraformStack,
config: BoosterConfig,
cosmosdbDatabaseResource: cosmosdbAccount.CosmosdbAccount,
cosmosdbSqlDatabaseResource: cosmosdbSqlDatabase.CosmosdbSqlDatabase
): cosmosdbSqlContainer.CosmosdbSqlContainer {
const idEvent = toTerraformName(appPrefix, 'dispatched-events')
return new cosmosdbSqlContainer.CosmosdbSqlContainer(terraformStackResource, idEvent, {
name: config.resourceNames.dispatchedEventsStore,
resourceGroupName: cosmosdbDatabaseResource.resourceGroupName,
accountName: cosmosdbDatabaseResource.name,
databaseName: cosmosdbSqlDatabaseResource.name,
partitionKeyPath: '/eventId',
partitionKeyVersion: 2,
uniqueKey: [{ paths: ['/eventId'] }],
autoscaleSettings: {
maxThroughput: MAX_CONTAINER_THROUGHPUT,
},
defaultTtl: config.dispatchedEventsTtl,
provider: providerResource,
})
}

private static createReadModel(
providerResource: AzurermProvider,
terraformStackResource: TerraformStack,
Expand Down
6 changes: 4 additions & 2 deletions packages/framework-provider-azure/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {
rawEventsToEnvelopes,
readEntityEventsSince,
readEntityLatestSnapshot,
storeDispatchedEvent,
storeEvents,
storeSnapshot,
} from './library/events-adapter'
Expand Down Expand Up @@ -38,13 +39,13 @@ import { EventHubProducerClient, RetryMode } from '@azure/event-hubs'
import { dedupEventStream, rawEventsStreamToEnvelopes } from './library/events-stream-consumer-adapter'
import {
areDatabaseReadModelsUp,
databaseUrl,
databaseEventsHealthDetails,
databaseReadModelsHealthDetails,
databaseUrl,
graphqlFunctionUrl,
isDatabaseEventUp,
isGraphQLFunctionUp,
rawRequestToSensorHealth,
databaseReadModelsHealthDetails,
} from './library/health-adapter'

let cosmosClient: CosmosClient
Expand Down Expand Up @@ -104,6 +105,7 @@ export const Provider = (rockets?: RocketDescriptor[]): ProviderLibrary => ({
latestEntitySnapshot: readEntityLatestSnapshot.bind(null, cosmosClient),
search: searchEvents.bind(null, cosmosClient),
searchEntitiesIDs: searchEntitiesIds.bind(null, cosmosClient),
storeDispatched: storeDispatchedEvent.bind(null, cosmosClient),
},
// ProviderReadModelsLibrary
readModels: {
Expand Down
34 changes: 31 additions & 3 deletions packages/framework-provider-azure/src/library/events-adapter.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { CosmosClient, SqlQuerySpec } from '@azure/cosmos'
import {
EventEnvelope,
BoosterConfig,
UUID,
EntitySnapshotEnvelope,
NonPersistedEventEnvelope,
EventEnvelope,
NonPersistedEntitySnapshotEnvelope,
NonPersistedEventEnvelope,
UUID,
} from '@boostercloud/framework-types'
import { getLogger } from '@boostercloud/framework-common-helpers'
import { eventsStoreAttributes } from '../constants'
Expand Down Expand Up @@ -180,3 +180,31 @@ export async function storeSnapshot(
logger.debug('Snapshot stored', snapshotEnvelope)
return persistableEntitySnapshot
}

export async function storeDispatchedEvent(
cosmosDb: CosmosClient,
eventEnvelope: EventEnvelope,
config: BoosterConfig
): Promise<boolean> {
const logger = getLogger(config, 'events-adapter#storeDispatchedEvent')
logger.debug('[EventsAdapter#storeDispatchedEvent] Storing EventEnvelope for event with ID: ', eventEnvelope.id)
try {
await cosmosDb
.database(config.resourceNames.applicationStack)
.container(config.resourceNames.dispatchedEventsStore)
.items.create({
eventId: eventEnvelope.id,
})
return true
} catch (e) {
if (e.code === 409) {
// If an item with the same ID already exists in the container, it will return a 409 status code.
// See https://learn.microsoft.com/en-us/rest/api/cosmos-db/http-status-codes-for-cosmosdb
logger.debug('[EventsAdapter#storeDispatchedEvent] Event has already been dispatched', eventEnvelope.id)
return false
} else {
logger.error('[EventsAdapter#storeDispatchedEvent] Error storing dispatched event', e)
return true
MarcAstr0 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable @typescript-eslint/no-explicit-any */
import { EventEnvelope } from '@boostercloud/framework-types'
import { random, date } from 'faker'
import { date, random } from 'faker'
import { Context, ExecutionContext, Logger as AzureLogger, TraceContext } from '@azure/functions'

export function createMockEventEnvelopes(numOfEvents = 1): Array<EventEnvelope> {
Expand All @@ -18,9 +18,10 @@ export function createMockEventEnvelopes(numOfEvents = 1): Array<EventEnvelope>
entityTypeName: random.word(),
requestID: random.uuid(),
createdAt: date.past().toISOString(),
id: random.uuid(),
},
0,
numOfEvents
numOfEvents,
)
}

Expand All @@ -46,7 +47,8 @@ export function wrapEventEnvelopesForCosmosDB(eventEnvelopes: Array<EventEnvelop
invocationId: '',
log: {} as AzureLogger,
traceContext: {} as TraceContext,
done(err?: Error | string | null, result?: any): void {},
done(err?: Error | string | null, result?: any): void {
},
bindings: { rawEvent: eventEnvelopes },
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { expect } from '../expect'
import * as EventsAdapter from '../../src/library/events-adapter'
import { createStubInstance, fake, restore, match, stub, SinonStubbedInstance } from 'sinon'
import { createStubInstance, fake, match, restore, SinonStubbedInstance, stub } from 'sinon'
import { BoosterConfig, EventEnvelope, UUID } from '@boostercloud/framework-types'
import { CosmosClient } from '@azure/cosmos'
import { eventsStoreAttributes } from '../../src/constants'
Expand Down Expand Up @@ -141,4 +141,20 @@ describe('Events adapter', () => {
)
})
})

describe('The "storeDispatchedEvent" method', () => {
it('Persists the IDs of the eventEnvelopes passed via parameters', async () => {
await EventsAdapter.storeDispatchedEvent(mockCosmosDbClient as any, mockEvents[0], mockConfig)

expect(mockCosmosDbClient.database).to.have.been.calledWithExactly(mockConfig.resourceNames.applicationStack)
expect(
mockCosmosDbClient.database(mockConfig.resourceNames.applicationStack).container
).to.have.been.calledWithExactly(mockConfig.resourceNames.dispatchedEventsStore)
expect(
mockCosmosDbClient
.database(mockConfig.resourceNames.applicationStack)
.container(mockConfig.resourceNames.dispatchedEventsStore).items.create
).to.have.been.calledWithExactly(match({ eventId: mockEvents[0].id }))
})
})
})
Loading
Loading