Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"comment": "Add ReducerAction enum with Skip value for entity reducers and rename ReadModelAction to ProjectionAction",
"type": "minor",
"packageName": "@magek/common"
}
],
"packageName": "@magek/common",
"email": "copilot@github.com"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"changes": [
{
"comment": "Handle ReducerAction.Skip in event-store and rename ReadModelAction to ProjectionAction in read-model-store",
"type": "minor",
"packageName": "@magek/core"
}
],
"packageName": "@magek/core",
"email": "copilot@github.com"
}
44 changes: 44 additions & 0 deletions docs/content/architecture/entity.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,50 @@ export class Cart {

> **Tip:** It's highly recommended to **keep your reducer functions pure**, which means that you should be able to produce the new entity version by just looking at the event and the current entity state. You should avoid calling third party services, reading or writing to a database, or changing any external state.

### Skipping Events with ReducerAction.Skip

Sometimes a reducer may need to skip an event instead of updating the entity state. This can happen when:
- Receiving an update event for an entity that doesn't exist
- Processing an event that is no longer relevant
- Handling events that should be ignored under certain conditions

To skip an event, return `ReducerAction.Skip` from your reducer. This tells Magek to keep the current entity state unchanged:

```typescript title="src/entities/product.ts"
import { ReducerAction, ReducerResult } from '@magek/common'

@Entity
export class Product {
@Field(type => UUID)
public id!: UUID

@Field()
readonly name!: string

@Field()
readonly price!: number

@Reduces(ProductUpdated)
public static reduceProductUpdated(
event: ProductUpdated,
currentProduct?: Product
): ReducerResult<Product> {
if (!currentProduct) {
// Can't update a non-existent product - skip this event
return ReducerAction.Skip
}
return new Product(currentProduct.id, event.newName, event.newPrice)
}
}
```

When a reducer returns `ReducerAction.Skip`, the framework will:
- Keep the previous entity snapshot unchanged
- Continue processing subsequent events in the event stream
- Not store a new snapshot for this event

> **Note:** `ReducerAction.Skip` should be used sparingly and only for events that genuinely should not affect entity state. In most cases, properly designed events and reducers won't need to skip events.

There could be a lot of events being reduced concurrently among many entities, but, **for a specific entity instance, the events order is preserved**. This means that while one event is being reduced, all other events of any kind _that belong to the same entity instance_ will be waiting in a queue until the previous reducer has finished. This is how Magek guarantees that the entity state is consistent.

![reducer process gif](/img/reducer.gif)
Expand Down
10 changes: 5 additions & 5 deletions docs/content/architecture/read-model.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ export class CarPurchasesReadModel {
oldCarPurchaseReadModel?: CarPurchasesReadModel
): ProjectionResult<CarPurchasesReadModel> {
if (!readModelId) {
return ReadModelAction.Nothing
return ProjectionAction.Skip
}
return evolve(oldCarPurchaseReadModel, {
id: readModelId,
Expand All @@ -217,7 +217,7 @@ Projections usually return a new instance of the read model. However, there are

#### Deleting read models

One of the most common cases is when you want to delete a read model. For example, if you have a `UserReadModel` that projects the `User` entity, you may want to delete the read model when the user is deleted. In this case you can return the `ReadModelAction.Delete` value:
One of the most common cases is when you want to delete a read model. For example, if you have a `UserReadModel` that projects the `User` entity, you may want to delete the read model when the user is deleted. In this case you can return the `ProjectionAction.Delete` value:

```typescript
@ReadModel({
Expand All @@ -241,9 +241,9 @@ export class UserReadModel {

> **Info:** Deleting a read model is a very expensive operation. It will trigger a write operation in the read model store. If you can, try to avoid deleting read models.

#### Keeping read models untouched
#### Skipping read model updates

Another common case is when you want to keep the read model untouched. For example, if you have a `UserReadModel` that projects the `User` entity, you may want to keep the read model untouched there are no releveant changes to your read model. In this case you can return the `ReadModelAction.Nothing` value:
Another common case is when you want to keep the read model untouched. For example, if you have a `UserReadModel` that projects the `User` entity, you may want to skip updating the read model when there are no relevant changes. In this case you can return the `ProjectionAction.Skip` value:

```typescript
@ReadModel({
Expand All @@ -265,7 +265,7 @@ export class UserReadModel {
}
```

> **Info:** Keeping the read model untouched higly recommended in favour of returning a new instance of the read model with the same data. This will not only prevent a new write operation in the database, making your application more efficient. It will also prevent an unnecessary update to be dispatched to any GrahpQL clients subscribed to that read model.
> **Info:** Skipping the read model update is highly recommended in favour of returning a new instance of the read model with the same data. This will not only prevent a new write operation in the database, making your application more efficient. It will also prevent an unnecessary update to be dispatched to any GraphQL clients subscribed to that read model.

## Nested queries and calculated values using getters

Expand Down
6 changes: 3 additions & 3 deletions packages/common/src/concepts/projection-metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ export interface ProjectionMetadata<TEntity extends EntityInterface, TReadModel
joinKey: keyof TEntity | ReadModelJoinKeyFunction<TEntity, TReadModel>
}

export type ProjectionResult<TReadModel> = TReadModel | ReadModelAction
export type ProjectionResult<TReadModel> = TReadModel | ProjectionAction

export enum ReadModelAction {
export enum ProjectionAction {
Skip,
Delete,
Nothing,
}
7 changes: 7 additions & 0 deletions packages/common/src/concepts/reducer-metadata.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import { AnyClass } from '..'
import { EntityInterface } from './entity'

export interface ReducerMetadata {
class: AnyClass
methodName: string
}

export enum ReducerAction {
Skip,
}

export type ReducerResult<TEntity extends EntityInterface> = TEntity | ReducerAction
28 changes: 19 additions & 9 deletions packages/core/src/services/event-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
InvalidParameterError,
InvalidReducerError,
NonPersistedEntitySnapshotEnvelope,
ReducerAction,
ReducerGlobalError,
TraceActionTypes,
UUID,
Expand Down Expand Up @@ -59,7 +60,11 @@ export class EventStore {
// We double-check that what we are reducing is an event
if (pendingEvent.kind === 'event') {
try {
newEntitySnapshot = await this.entityReducer(pendingEvent, newEntitySnapshot)
const reducerResult = await this.entityReducer(pendingEvent, newEntitySnapshot)
// If reducer returns ReducerAction.Skip, keep the current snapshot unchanged
if (reducerResult !== ReducerAction.Skip) {
newEntitySnapshot = reducerResult
}
} catch (e) {
if (e instanceof InvalidEventError) {
const globalErrorDispatcher = new MagekGlobalErrorDispatcher(this.config)
Expand Down Expand Up @@ -158,7 +163,7 @@ export class EventStore {
private async entityReducer(
eventEnvelope: EventEnvelope,
latestSnapshot?: NonPersistedEntitySnapshotEnvelope
): Promise<NonPersistedEntitySnapshotEnvelope | undefined> {
): Promise<NonPersistedEntitySnapshotEnvelope | ReducerAction> {
const logger = getLogger(this.config, 'entityReducer')
logger.debug('Calling reducer with event: ', eventEnvelope, ' and entity snapshot ', latestSnapshot)
if (this.shouldReduceMagekSuperKind(eventEnvelope)) {
Expand Down Expand Up @@ -201,15 +206,20 @@ export class EventStore {
snapshotInstance: EntityInterface | null,
eventEnvelope: EventEnvelope,
reducerMetadata: ReducerMetadata
): Promise<NonPersistedEntitySnapshotEnvelope> {
): Promise<NonPersistedEntitySnapshotEnvelope | ReducerAction> {
const logger = getLogger(this.config, 'createNewSnapshot')
try {
const newEntity = this.reducerForEvent(
const reducerResult = this.reducerForEvent(
migratedEventEnvelope.typeName,
eventInstance,
snapshotInstance
)(eventInstance, snapshotInstance)

if (reducerResult === ReducerAction.Skip) {
logger.debug('Reducer returned ReducerAction.Skip, skipping snapshot creation')
return ReducerAction.Skip
}

const newSnapshot: NonPersistedEntitySnapshotEnvelope = {
version: this.config.currentVersionFor(eventEnvelope.entityTypeName),
kind: 'snapshot',
Expand All @@ -218,7 +228,7 @@ export class EventStore {
entityID: migratedEventEnvelope.entityID,
entityTypeName: migratedEventEnvelope.entityTypeName,
typeName: migratedEventEnvelope.entityTypeName,
value: newEntity,
value: reducerResult,
snapshottedEventCreatedAt: migratedEventEnvelope.createdAt,
}
logger.debug('Reducer result: ', newSnapshot)
Expand All @@ -234,7 +244,7 @@ export class EventStore {
private async reduceSuperKind(
eventEnvelope: EventEnvelope,
latestSnapshot?: NonPersistedEntitySnapshotEnvelope
): Promise<NonPersistedEntitySnapshotEnvelope | undefined> {
): Promise<NonPersistedEntitySnapshotEnvelope | ReducerAction> {
if (eventEnvelope.typeName === MagekEntityTouched.name) {
return this.reduceEntityTouched(eventEnvelope, latestSnapshot)
}
Expand All @@ -252,12 +262,12 @@ export class EventStore {
private reduceEntityTouched(
eventEnvelope: EventEnvelope,
latestSnapshot: NonPersistedEntitySnapshotEnvelope | undefined
): NonPersistedEntitySnapshotEnvelope | undefined {
): NonPersistedEntitySnapshotEnvelope | ReducerAction {
const logger = getLogger(this.config, 'EventStore#reduceEntityTouched')
logger.debug('Reducing ', eventEnvelope, ' with latestSnapshot')
if (!latestSnapshot) {
logger.debug('Latest snapshot not found, returning')
return
logger.debug('Latest snapshot not found, returning Skip')
return ReducerAction.Skip
}

const event = eventEnvelope.value as MagekEntityTouched
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/services/read-model-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import {
EntitySnapshotEnvelope,
FilterFor,
OptimisticConcurrencyUnexpectedVersionError,
ProjectionAction,
ProjectionGlobalError,
ProjectionInfo,
ProjectionInfoReason,
ProjectionMetadata,
ProjectionResult,
ReadModelAction,
ReadModelInterface,
ReadModelJoinKeyFunction,
SequenceKey,
Expand Down Expand Up @@ -403,10 +403,10 @@ export class ReadModelStore {
if (error) throw error
}

if (newReadModel === ReadModelAction.Delete) {
if (newReadModel === ProjectionAction.Delete) {
logger.debug(`Deleting read model ${readModelName} with ID ${readModelID}:`, migratedReadModel)
return this.config.readModelStore.delete(this.config, readModelName, readModelID!)
} else if (newReadModel === ReadModelAction.Nothing) {
} else if (newReadModel === ProjectionAction.Skip) {
logger.debug(`Skipping actions for ${readModelName} with ID ${readModelID}:`, newReadModel)
return
}
Expand Down
Loading
Loading