Skip to content

Commit

Permalink
Ability to use custom aggregations in Live lifecycle, easier recipes …
Browse files Browse the repository at this point in the history
…for using explicit code. Closes GH-2941
  • Loading branch information
jeremydmiller committed Sep 15, 2024
1 parent 8bf381b commit 297170e
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/.vitepress/config.mts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ const config: UserConfig<DefaultTheme.Config> = {
text: 'Aggregate Projections', link: '/events/projections/aggregate-projections', items: [
{ text: 'Live Aggregations', link: '/events/projections/live-aggregates' },
{ text: 'Multi-Stream Projections', link: '/events/projections/multi-stream-projections' },
{ text: 'Custom Aggregations', link: '/events/projections/custom-aggregates' },]
{ text: 'Explicit Aggregations', link: '/events/projections/custom-aggregates' },]
},
{ text: 'Event Projections', link: '/events/projections/event-projections' },
{ text: 'Custom Projections', link: '/events/projections/custom' },
Expand Down
23 changes: 19 additions & 4 deletions docs/events/projections/custom-aggregates.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Custom Aggregations
# Explicit Aggregations

Once in awhile users are hitting use cases or desired functionality for aggregation projections that just don't fit in well to our [`SingleStreamProjection<T>`](/events/projections/aggregate-projections) or [`MultiStreamProjection<TDoc, TId>`](/events/projections/multi-stream-projections) models. Not to worry though, because
Marten V5.0 introduces the new `CustomAggregation<T>` base type that will let you define aggregation projections with explicit user code while still taking advantage of some of the parallelization
optimizations that were built for the previous aggregation types running in the [async daemon](/events/projections/async-daemon);
The original concept for Marten projections was the conventional method model (`Apply()` / `Create()` / `ShouldDelete()` methods), but we
quickly found out that the workflow generated from these methods just isn't sufficient for many user needs. At the same time,
other users just prefer explicit code anyway, so Marten provides the `CustomProjection<TDoc, TId>` base class as a way to
configure custom projections that use explicit code for the actual work of building projected, aggregate documents from
raw events.

Alright, let's jump right into an example. Two of the drivers for this feature were for aggregations to document types that were [soft-deleted](/documents/deletes.html#soft-deletes) or aggregations where some events should only apply to the aggregate document if the document already existed. To illustrate this with a contrived example, let's say that we've got these event types:

Expand Down Expand Up @@ -136,3 +138,16 @@ All aggregations in Marten come in two parts:

`CustomAggregate` supports aggregating by the stream identity as shown above. You can also use all the same customizable grouping functionality as
the older [MultiStreamProjection](/events/projections/multi-stream-projections) subclass.

## Simple Workflows <Badge type="tip" text="7.28" />

The base class can be used for strictly live aggregations. If all you're doing is using this
mechanism for `Live` aggregation, or have a simple workflow where the aggregate is always
going to be built strictly from the event data, you can override _only_ the `Apply()` method
as shown below:

snippet: sample_using_simple_explicit_code_for_live_aggregation

Note that this usage is valid for all possible projection lifecycles now (`Live`, `Inline`, and `Async`).


Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.Sessions;
Expand Down Expand Up @@ -130,8 +133,11 @@ public async Task ForEventsAppendedToTenantedSession_CustomProjection()
singleCompanyLocation.Id.ShouldBe(companyLocationId);
singleCompanyLocation.Name.ShouldBe(companyLocationName);
}

}



public record Event;

public record ResourceCreatedEvent(string Name, Guid OrganisationId): Event;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using EventSourcingTests.Aggregation;
using Marten;
using Marten.Events;
using Marten.Events.Aggregation;
using Marten.Events.Projections;
using Marten.Internal.Sessions;
using Marten.Testing.Harness;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Projections;

public class using_explicit_code_for_live_aggregation : OneOffConfigurationsContext
{

[Fact]
public async Task using_a_custom_projection_for_live_aggregation()
{
StoreOptions(opts =>
{
opts.Projections.Add(new ExplicitCounter(), ProjectionLifecycle.Live);
});

var streamId = theSession.Events.StartStream<SimpleAggregate>(new AEvent(), new AEvent(), new BEvent(), new CEvent(), new CEvent(), new CEvent()).Id;
await theSession.SaveChangesAsync();

var aggregate = await theSession.Events.AggregateStreamAsync<SimpleAggregate>(streamId);
aggregate.ACount.ShouldBe(2);
aggregate.BCount.ShouldBe(1);
aggregate.CCount.ShouldBe(3);
aggregate.Id.ShouldBe(streamId);
}
}

#region sample_using_simple_explicit_code_for_live_aggregation

public class ExplicitCounter: CustomProjection<SimpleAggregate, Guid>
{
public override SimpleAggregate Apply(SimpleAggregate snapshot, IReadOnlyList<IEvent> events)
{
snapshot ??= new SimpleAggregate();
foreach (var e in events.Select(x => x.Data))
{
if (e is AEvent) snapshot.ACount++;
if (e is BEvent) snapshot.BCount++;
if (e is CEvent) snapshot.CCount++;
if (e is DEvent) snapshot.DCount++;
}

// You have to explicitly return the new value
// of the aggregated document no matter what!
return snapshot;
}
}

#endregion
70 changes: 66 additions & 4 deletions src/Marten/Events/Aggregation/CustomProjection.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JasperFx.Core.Reflection;
Expand All @@ -18,11 +19,17 @@ namespace Marten.Events.Aggregation;

/// <summary>
/// Helpful as a base class for more custom aggregation projections that are not supported
/// by the Single/MultipleStreamProjections
/// by the Single/MultipleStreamProjections -- or if you'd just prefer to use explicit code
/// </summary>
/// <typeparam name="TDoc"></typeparam>
/// <typeparam name="TId"></typeparam>
public abstract class CustomProjection<TDoc, TId>: ProjectionBase, IAggregationRuntime<TDoc, TId>, IProjectionSource, IAggregateProjection, IAggregateProjectionWithSideEffects<TDoc>
public abstract class CustomProjection<TDoc, TId>:
ProjectionBase,
IAggregationRuntime<TDoc, TId>,
IProjectionSource,
IAggregateProjection,
IAggregateProjectionWithSideEffects<TDoc>,
ILiveAggregator<TDoc>
{
private IDocumentStorage<TDoc, TId> _storage;

Expand Down Expand Up @@ -107,9 +114,44 @@ async ValueTask<EventRangeGroup> IAggregationRuntime.GroupEvents(DocumentStore s
/// <param name="cancellation"></param>
/// <param name="lifecycle"></param>
/// <returns></returns>
public abstract ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<TDoc, TId> slice,
public virtual async ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<TDoc, TId> slice,
CancellationToken cancellation,
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline);
ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
{
if (!slice.Events().Any()) return;

var snapshot = slice.Aggregate;
snapshot = await BuildAsync(session, snapshot, slice.Events()).ConfigureAwait(false);
ApplyMetadata(snapshot, slice.Events().Last());

slice.Aggregate = snapshot;
session.Store(snapshot);
}

/// <summary>
/// Override if the aggregation always updates the aggregate from new events, but may
/// require data lookup to update the snapshot
/// </summary>
/// <param name="session"></param>
/// <param name="snapshot"></param>
/// <param name="events"></param>
/// <returns></returns>
public virtual ValueTask<TDoc> BuildAsync(IQuerySession session, TDoc? snapshot, IReadOnlyList<IEvent> events)
{
return new ValueTask<TDoc>(Apply(snapshot, events));
}

/// <summary>
/// Override if the aggregation always updates the aggregate from new events and you
/// don't need to do any other kind of data lookup. Simplest possible way to use this
/// </summary>
/// <param name="snapshot"></param>
/// <param name="events"></param>
/// <returns></returns>
public virtual TDoc Apply(TDoc? snapshot, IReadOnlyList<IEvent> events)
{
throw new NotImplementedException("Did you forget to implement this method?");
}

public IAggregateVersioning Versioning { get; set; }

Expand Down Expand Up @@ -289,5 +331,25 @@ public void ConfigureAggregateMapping(DocumentMapping mapping, StoreOptions stor
mapping.UseVersionFromMatchingStream =
Lifecycle == ProjectionLifecycle.Inline && storeOptions.Events.AppendMode == EventAppendMode.Quick && Slicer is ISingleStreamSlicer;
}

TDoc ILiveAggregator<TDoc>.Build(IReadOnlyList<IEvent> events, IQuerySession session, TDoc snapshot)
{
throw new NotSupportedException("It's not supported to do a synchronous, live aggregation with a custom projection");
}

async ValueTask<TDoc> ILiveAggregator<TDoc>.BuildAsync(IReadOnlyList<IEvent> events, IQuerySession session, TDoc snapshot, CancellationToken cancellation)
{
if (!events.Any()) return default;

var documentSessionBase = session.As<DocumentSessionBase>();

var slice = new EventSlice<TDoc, TId>(default, session, events);
await ApplyChangesAsync(documentSessionBase, slice, cancellation).ConfigureAwait(false);

ApplyMetadata(slice.Aggregate, events.Last());

return slice.Aggregate;
}
}


14 changes: 12 additions & 2 deletions src/Marten/Events/Projections/ProjectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ public void Add(
{
if (lifecycle == ProjectionLifecycle.Live)
{
throw new ArgumentOutOfRangeException(nameof(lifecycle),
$"{nameof(ProjectionLifecycle.Live)} cannot be used for IProjection");
if (!projection.GetType().Closes(typeof(ILiveAggregator<>)))
{
throw new ArgumentOutOfRangeException(nameof(lifecycle),
$"{nameof(ProjectionLifecycle.Live)} cannot be used for IProjection");
}
}

if (projection is ProjectionBase p)
Expand Down Expand Up @@ -417,6 +420,13 @@ internal ILiveAggregator<T> AggregatorFor<T>() where T : class
return (ILiveAggregator<T>)aggregator;
}

aggregator = All.OfType<ILiveAggregator<T>>().FirstOrDefault();
if (aggregator != null)
{
_liveAggregators = _liveAggregators.AddOrUpdate(typeof(T), aggregator);
return (ILiveAggregator<T>)aggregator;
}

var source = tryFindProjectionSourceForAggregateType<T>();
source.AssembleAndAssertValidity();

Expand Down

0 comments on commit 297170e

Please sign in to comment.