Skip to content

Commit

Permalink
Push compaction down into Stores, resolves #23
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 11, 2018
1 parent a4dc2b5 commit 0bbbea5
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 48 deletions.
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,15 @@ _If you're looking to learn more about and/or discuss Event Sourcing and it's my
- support, (via `UnionContractEncoder`) for the maintenance of multiple co-existing compaction schemas in a given stream (A snapshot isa Event)
- compaction events typically do not get deleted (consistent with how EventStore works), although it is safe to do so in concept
- NB while this works well, and can deliver excellent performance (especially when allied with the Cache), [it's not a panacea, as noted in this excellent EventStore article on the topic](https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html)
- **Azure CosmosDb Indexed mode**: Using `Equinox.Cosmos`, command processing can be optimized through an index document in the same partition as the event documents that maintains:
a) compacted rendition(s) of the folded state
b) (optionally) events since those snapshots have last been updated

This yields many of the benefits of rolling snapshots while reducing latency, RU provisioning requirement, and Request Charges:-
- no additional roundtrips to the store needed at either the Load or Sync points in the flow
- when coupled with the cache, a typical read is a point read with an etag, costing 1 RU
- The index isa DocDb Document, but _not_ an Event
- The index can safely be deleted at any time; it'll get regenerated in the course of normal processing

# Elements

Expand Down
7 changes: 3 additions & 4 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ param(
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs
[string] $additionalMsBuildArgs="-t:Build"
)

Expand All @@ -21,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs
if ($skipEs) { warn "Skipping EventStore tests" }

function cliCosmos($arghs) {
Write-Host "dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
Write-Host "dotnet run cli/Equinox.Cli cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
}

if ($skipCosmos) {
Expand All @@ -31,7 +30,7 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
dotnet run cli/Equinox.Cli cosmos $cosmosServer -d $cosmosDatabase -c $cosmosCollection provision -ru 10000
cliCosmos @("provision", "-ru", "1000")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesResolver(gateway, codec, fold, initial).Resolve

let resolveEqxStreamWithCompactionEventType gateway compactionEventType (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType compactionEventType).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway _compactionEventType (StreamArgs args) =
let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
Expand Down
12 changes: 5 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway =
let resolveStreamGesWithoutAccessStrategy gateway =
GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve

let resolveStreamEqxWithCompactionSemantics gateway =
fun predicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.Predicate predicate).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway =
fun _ignoreWindowSize _ignoreCompactionPredicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -65,7 +63,7 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrangeWithoutCompaction connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType cet).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
93 changes: 70 additions & 23 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,41 @@ module Store =
if array = null || Array.length array = 0 then serializer.Serialize(writer, null)
else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array))

[<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
type IndexEvent =
{ p: string // "{streamName}"
id: string // "{-1}"

w: int64 // 100: window size
/// last index/i value
m: int64 // {index}

(* "x": [
{ "i":0,
"c":"ISO 8601"
"e":[
[{"t":"added","d":"..."},{"t":"compacted/1","d":"..."}],
[{"t":"removed","d":"..."}],
]
}
] *)
x: JObject[][] }

(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
coll.insert(p,0,{ p:p, id:-1, w:windowSize, m:flatLen(events)})
} else {
const i = doc.find(p=p && id=-1)
if(i.m <> expectedVersion) then emit from expectedVersion else
i.x.append(events)
for (var (i, c, e: [ {e1}, ...]) in events) {
coll.insert({p:p, id:i, i:i, c:c, e:e1)
}
// trim i.x to w total items in i.[e]
coll.update(p,id,i)
}
} *)
[<RequireQualifiedAccess>]
type Direction = Forward | Backward with
override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
Expand Down Expand Up @@ -289,7 +324,6 @@ module UnionEncoderAdapters =

type [<NoComparison>]Token = { pos: Store.Position; compactionEventNumber: int64 option }

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Token =
let private create compactionEventNumber batchCapacityLimit pos : Storage.StreamToken =
{ value = box { pos = pos; compactionEventNumber = compactionEventNumber }; batchCapacityLimit = batchCapacityLimit }
Expand Down Expand Up @@ -385,26 +419,49 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) =
member __.Gateway = gateway
member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId)

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?compactionStrategy) =
[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
| EventsAreState
| RollingSnapshots of eventType: string * compact: ('state -> 'event)

type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) =
/// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes)
member __.IsCompactionDue = eventsLen > capacityBeforeCompaction

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?access : AccessStrategy<'event,'state>) =
let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None }
let compactionPredicate =
match access with
| None -> None
| Some AccessStrategy.EventsAreState -> Some (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et)
let loadAlgorithm load (Pos pos) initial log =
let batched = load initial (coll.Gateway.LoadBatched log None pos)
let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos)
match compactionStrategy with
| Some predicate -> compacted predicate
match access with
| None -> batched
| Some AccessStrategy.EventsAreState -> compacted (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et)
let load (fold: 'state -> 'event seq -> 'state) initial loadF = async {
let! token, events = loadF
return token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) }
member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm (load fold) streamName initial log
member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) token (log : ILogger) : Async<Storage.StreamToken * 'state> =
(load fold) state (coll.Gateway.LoadFromToken log token compactionStrategy false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
(load fold) state (coll.Gateway.LoadFromToken log token compactionPredicate false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger)
(token : Storage.StreamToken, state : 'state)
(events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let events =
match access with
| None | Some AccessStrategy.EventsAreState -> events
| Some (AccessStrategy.RollingSnapshots (_,f)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [f state'] else events
let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events)
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionStrategy
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate
match syncRes with
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionStrategy true))
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true))
| GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
Expand Down Expand Up @@ -449,8 +506,8 @@ module Caching =
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
interceptAsync (inner.Load streamName log) streamName
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) events
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync streamName)
| Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (token', state') }
Expand Down Expand Up @@ -478,17 +535,12 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: '
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm streamName initial log
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) events
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list, state': 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync
| Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') }

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CompactionStrategy =
| EventType of string
| Predicate of (string -> bool)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
| SlidingWindow of Caching.Cache * window: TimeSpan
Expand All @@ -497,12 +549,7 @@ type CachingStrategy =

type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?compaction, ?caching) =
member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> =
let compactionPredicateOption =
match compaction with
| None -> None
| Some (CompactionStrategy.Predicate predicate) -> Some predicate
| Some (CompactionStrategy.EventType eventType) -> Some (fun x -> x = eventType)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?compactionStrategy = compactionPredicateOption)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?access = compaction)

let readCacheOption =
match caching with
Expand Down
18 changes: 8 additions & 10 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,38 @@ let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() =
Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)

module Cart =
let fold, initial = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial
let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact
let codec = genCodec<Domain.Cart.Events.Event>()
let createServiceWithoutOptimization connection batchSize log =
let gateway = createEqxGateway connection batchSize
let resolveStream _ignoreCompactionEventTypeOption (StreamArgs args) =
let resolveStream (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCompaction connection batchSize log =
let gateway = createEqxGateway connection batchSize
let resolveStream compactionEventType (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, compaction=CompactionStrategy.EventType compactionEventType).Create(args)
let resolveStream (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCaching connection batchSize log cache =
let gateway = createEqxGateway connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream _ignorecompactionEventType (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCompactionAndCaching connection batchSize log cache =
let gateway = createEqxGateway connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, CompactionStrategy.EventType cet, sliding20m).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact, sliding20m).Create(args)
Backend.Cart.Service(log, resolveStream)

module ContactPreferences =
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial
let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate =
let gateway = createGateway defaultBatchSize
let resolveStream _windowSize _compactionPredicate (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
Backend.ContactPreferences.Service(log, resolveStream)
let createService createGateway log =
let resolveStream batchSize compactionPredicate (StreamArgs args) =
EqxStreamBuilder(createGateway batchSize, codec, fold, initial, CompactionStrategy.Predicate compactionPredicate).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(args)
Backend.ContactPreferences.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down

0 comments on commit 0bbbea5

Please sign in to comment.