Skip to content

Commit

Permalink
Push compaction down into Stores, resolves #23
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 7, 2018
1 parent 7a27bbe commit 89c6a5d
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 213 deletions.
24 changes: 14 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ While the implementations are distilled from code from [`Jet.com` systems dating
- Using a [versionable convention-based approach (using `Typeshape`'s `UnionContractEncoder` under the covers)](https://eiriktsarpalis.wordpress.com/2018/10/30/a-contract-pattern-for-schemaless-datastores/), providing for serializer-agnostic schema evolution with minimal boilerplate
- Independent of the store used, Equinox provides for caching using the .NET `MemoryCache` to minimize roundtrips, latency and bandwidth / request charges costs by maintaining the folded state without any explicit code within the Domain Model
- Logging is both high performance and pluggable (using [Serilog](https://github.com/serilog/serilog) to your hosting context (we feed log info to Splunk atm and feed metrics embedded in the LogEvent Properties to Prometheus; see relevant tests for examples)
- EventStore-optimized Compaction support: Command processing can by optimized by employing in-stream 'compaction' events in service of the following ends:
- Extracted from working software; currently used for all data storage within Jet's API gateway and Cart processing.
- Significant test coverage for core facilities, and per Storage system.
- **`Equinox.EventStore` Transactionally-consistent Rolling Snapshots**: Command processing can be optimized by employing in-stream 'compaction' events in service of the following ends:
- no additional roundtrips to the store needed at either the Load or Sync points in the flow
- support, (via `UnionContractEncoder`) for the maintenance of multiple co-existing snapshot schemas in a given stream (A snapshot isa Event)
- compaction events typically do not get deleted (consistent with how EventStore works)
- (Azure CosmosDb-specific, WIP) Snapshotting support: Command processing can by optimized by employing a snapshot document which maintains a) (optionally) a rendition of the folded state b) (optionally) batches of events to fold into the state in a
- support, (via `UnionContractEncoder`) for the maintenance of multiple co-existing compaction schemas in a given stream (A snapshot isa Event)
- compaction events typically do not get deleted (consistent with how EventStore works), although it is safe to do so in concept
- NB while this works well, and can deliver excellent performance (especially when allied with the Cache), [it's not a panacea, as noted in this excellent EventStore article on the topic](https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html)
- **Azure CosmosDb Indexed mode**: Using `Equinox.Cosmos`, command processing can be optimized through an index document in the same partition as the event documents that maintains:
a) compacted rendition(s) of the folded state
b) (optionally) events since those snapshots have last been updated

This yields many of the benefits of rolling snapshots while reducing latency, RU provisioning requirement, and Request Charges:-
- no additional roundtrips to the store needed at either the Load or Sync points in the flow
- when coupled with the cache, a typical read is a point read with an etag, costing 1 RU
- A snapshot isa Document, but not an Event
- snapshot events can safely be deleted; they'll get regenerated in the course of normal processing
- A given snapshot will typically only contain a single version of the snapshot
- Extracted from working software; currently used for all data storage within Jet's API gateway and Cart processing.
- Significant test coverage for core facilities, and per Storage system.
- The index isa DocDb Document, but _not_ an Event
- The index can safely be deleted at any time; it'll get regenerated in the course of normal processing

# Elements

Expand Down Expand Up @@ -100,7 +104,7 @@ $env:EQUINOX_COSMOS_COLLECTION="equinox-test"
cli/Equinox.cli/bin/Release/net461/Equinox.Cli `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION `
run
dotnet .\benchmarks\Equinox.Cli\bin\Release\netcoreapp2.1\Equinox.Cli.dll `
dotnet .\cli\Equinox.Cli\bin\Release\netcoreapp2.1\Equinox.Cli.dll `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION `
run
```
Expand Down
8 changes: 4 additions & 4 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ param(
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs
[string] $additionalMsBuildArgs="-t:Build"
)

$args=@("/v:$verbosity","/fl","/bl",$additionalMsBuildArgs)
Expand All @@ -20,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs
if ($skipEs) { warn "Skipping EventStore tests" }

function cliCosmos($arghs) {
Write-Host "dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
Write-Host "dotnet run cli/Equinox.Cli cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
}

if ($skipCosmos) {
Expand All @@ -30,7 +30,7 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
dotnet run cli/Equinox.Cli cosmos $cosmosServer -d $cosmosDatabase -c $cosmosCollection provision -ru 10000
cliCosmos @("provision", "-ru", "1000")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
8 changes: 4 additions & 4 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -132,19 +132,19 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
let createFavoritesService store log =
let resolveStream cet streamName =
let resolveStream streamName =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.CompactionStrategy.EventType cet).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.ReadStrategy.RollingSnapshots compact).Create(streamName)
| Store.Cosmos (gateway, databaseId, connectionId) ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType cet).Create(streamName,databaseId, connectionId)
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.ReadStrategy.RollingSnapshots compact).Create(streamName,databaseId, connectionId)
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand Down
16 changes: 8 additions & 8 deletions samples/Store/Backend/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ open Domain

type Service(log, resolveStream) =
let (|Cart|) (id: CartId) =
let stream = sprintf "Cart-%s" id.Value |> resolveStream Cart.Events.Compaction.EventType
Cart.Handler(log, stream)
let streamName = sprintf "Cart-%s" id.Value
Cart.Handler(log, resolveStream streamName)

member __.FlowAsync (Cart cart, flow, ?prepare) =
cart.FlowAsync(flow, ?prepare = prepare)
member __.FlowAsync (Cart handler, flow, ?prepare) =
handler.FlowAsync(flow, ?prepare = prepare)

member __.Execute (Cart cart) command =
cart.Execute command
member __.Execute (Cart handler) command =
handler.Execute command

member __.Read (Cart cart) =
cart.Read
member __.Read (Cart handler) =
handler.Read
8 changes: 2 additions & 6 deletions samples/Store/Backend/ContactPreferences.fs
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
module Backend.ContactPreferences

open Domain

type Service(log, resolveStream) =
let stream (ContactPreferences.Id email) =
sprintf "ContactPreferences-%s" email // TODO hash >> base64
|> resolveStream 1 (fun (_eventType : string) -> true)
let (|ContactPreferences|) email =
ContactPreferences.Handler(log, stream (Domain.ContactPreferences.Id email))
let streamName = sprintf "ContactPreferences-%s" email // TODO hash >> base64
Domain.ContactPreferences.Handler(log, resolveStream streamName)

member __.Update (ContactPreferences handler as email) value =
handler.Update email value
Expand Down
10 changes: 5 additions & 5 deletions samples/Store/Backend/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ open Domain
type Service(log, resolveStream) =
let (|Favorites|) (clientId : ClientId) =
let streamName = sprintf "Favorites-%s" clientId.Value
Favorites.Handler(log, resolveStream Domain.Favorites.Events.Compaction.EventType streamName)
Favorites.Handler(log, resolveStream streamName)

member __.Execute (Favorites favorites) command =
favorites.Execute command
member __.Execute (Favorites handler) command =
handler.Execute command

member __.Read(Favorites favorites) =
favorites.Read
member __.Read(Favorites handler) =
handler.Read
1 change: 0 additions & 1 deletion samples/Store/Backend/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ type Backoff = int -> int option

/// Operations on back off strategies represented as functions (int -> int option)g
/// which take an attempt number and produce an interval.
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Backoff =

let private checkOverflow x =
Expand Down
18 changes: 3 additions & 15 deletions samples/Store/Domain.Tests/CartTests.fs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
module Samples.Store.Domain.Tests.CartTests

open Domain
open Domain.Cart
open Domain.Cart.Events
open Domain.Cart.Folds
open Domain.Cart.Commands
open Swensen.Unquote
open System
open TypeShape.Empty

let mkAddQty skuId qty = ItemAdded { empty<ItemAddInfo> with skuId = skuId; quantity = qty }
Expand All @@ -20,7 +18,6 @@ let mkChangeWaived skuId value = ItemWaiveReturnsChanged { empty<ItemWaiveRetur
let verifyCanProcessInInitialState cmd (originState: State) =
let events = interpret cmd originState
match cmd with
| Compact
| AddItem _ ->
test <@ (not << List.isEmpty) events @>
| PatchItem _
Expand All @@ -30,7 +27,6 @@ let verifyCanProcessInInitialState cmd (originState: State) =
/// Put the aggregate into the state where the command should trigger an event; verify correct events are yielded
let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let initialEvents = command |> function
| Compact -> [ (* Command is not designed to be idempotent *) ]
| AddItem _ -> []
| RemoveItem (_, skuId)
| PatchItem (_, skuId, Some 0, _) -> [ mkAdd skuId ]
Expand All @@ -44,9 +40,6 @@ let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =

let find skuId = state'.items |> List.find (fun x -> x.skuId = skuId)
match command, events with
| Compact, [ Compacted e ] ->
let expectedState = State.ofCompacted e
test <@ expectedState = state' @>
| AddItem (_, csku, quantity), [ ItemAdded e ] ->
test <@ { ItemAddInfo.context = e.context; skuId = csku; quantity = quantity } = e
&& quantity = (find csku).quantity @>
Expand All @@ -71,21 +64,16 @@ let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let verifyIdempotency (cmd: Command) (originState: State) =
// Put the aggregate into the state where the command should not trigger an event
let establish: Event list = cmd |> function
| Compact _ -> let skuId = SkuId (Guid.NewGuid()) in [ mkAdd skuId; mkRemove skuId]
| AddItem (_, skuId, qty) -> [ mkAddQty skuId qty]
| RemoveItem _
| PatchItem (_, _, Some 0, _) -> []
| PatchItem (_, skuId, quantity, waived) -> [ mkAddQty skuId (defaultArg quantity 1)
mkChangeWaived skuId (defaultArg waived false) ]
let state = fold originState establish
let events = interpret cmd state
match cmd, not (List.isEmpty events) with
| Compact, hasEvents ->
// Command should be unconditional
test <@ hasEvents @>
| _, hasEvents ->
// Assert we decided nothing needs to happen
test <@ not hasEvents @>

// Assert we decided nothing needs to happen
test <@ List.isEmpty events @>

[<DomainProperty(MaxTest = 1000)>]
let ``interpret yields correct events, idempotently`` (cmd: Command) (originState: State) =
Expand Down
15 changes: 2 additions & 13 deletions samples/Store/Domain.Tests/FavoritesTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ let mkUnfavorite skuId = Unfavorited { skuId = skuId }
/// Put the aggregate into the state where the command should trigger an event; verify correct events are yielded
let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let initialEvents = command |> function
| Compact -> [ (* Command is not designed to be idempotent *) ]
| Unfavorite skuId -> [ mkFavorite skuId ]
| Favorite _ -> []
let state = fold originState initialEvents
Expand All @@ -24,10 +23,6 @@ let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let stateHasSku (s : State) (skuId : SkuId) = s |> Array.exists (function { skuId = sSkuId } -> sSkuId = skuId)
stateHasSku state, stateHasSku state'
match command, events with
| Compact, [ Compacted { net = netItems } ] ->
test <@ netItems = state'
// It's critical that it should not have any side-effects on state
&& state = state' @>
| Unfavorite skuId, [ Unfavorited e] ->
test <@ e = { skuId = skuId}
&& not (hasSkuId skuId) @>
Expand All @@ -43,18 +38,12 @@ let verifyCorrectEventGenerationWhenAppropriate command (originState: State) =
let verifyIdempotency (command: Command) (originState: State) =
// Put the aggregate into the state where the command should not trigger an event
let initialEvents: Event list = command |> function
| Compact _ -> []
| Unfavorite _ -> []
| Favorite (_,skuIds) -> [| for sku in skuIds -> mkFavorite sku |] |> knuthShuffle |> List.ofArray
let state = fold originState initialEvents
let events = Commands.interpret command state
match command, List.isEmpty events with
| Compact, isEmpty ->
// Command should be unconditional
test <@ not isEmpty @>
| _, isEmpty ->
// Assert we decided nothing needs to happen
test <@ isEmpty @>
// Assert we decided nothing needs to happen
test <@ List.isEmpty events @>

[<DomainProperty(MaxTest = 1000)>]
let ``interpret yields correct events, idempotently`` (cmd: Command) (state: State) =
Expand Down
11 changes: 2 additions & 9 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ module Events =
module Folds =
type ItemInfo = { skuId: SkuId; quantity: int; returnsWaived: bool }
type State = { items: ItemInfo list }
[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module State =
let toSnapshot (s: State) : Events.Compaction.State =
{ items = [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
Expand All @@ -43,10 +42,9 @@ module Folds =
| Events.ItemQuantityChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with quantity = e.quantity } | i -> i))
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state

let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| Compact
| AddItem of Context * SkuId * quantity: int
| PatchItem of Context * SkuId * quantity: int option * waived: bool option
| RemoveItem of Context * SkuId
Expand All @@ -61,8 +59,6 @@ module Commands =
let toEventContext (reqContext: Context) = { requestId = reqContext.requestId; time = reqContext.time } : Events.ContextInfo
let (|Context|) (context : Context) = toEventContext context
match command with
| Compact ->
[ Events.Compacted (Folds.State.toSnapshot state)]
| AddItem (Context c, skuId, quantity) ->
if itemExistsWithSameQuantity skuId quantity then [] else
[ Events.ItemAdded { context = c; skuId = skuId; quantity = quantity } ]
Expand All @@ -88,10 +84,7 @@ type Handler(log, stream) =
inner.DecideAsync <| fun ctx -> async {
let execute = Commands.interpret >> ctx.Execute
match prepare with None -> () | Some prep -> do! prep
let result = flow ctx execute
if ctx.IsCompactionDue then
execute Compact
return result }
return flow ctx execute }
member __.Execute command =
__.FlowAsync(fun _ctx execute -> execute command)
member __.Read : Async<Folds.State> =
Expand Down
9 changes: 2 additions & 7 deletions samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,16 @@ module Folds =
let s = InternalState state
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }

type Command =
| Compact
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
| Unfavorite of skuId : SkuId

module Commands =
let interpret command (state : Folds.State) =
let doesntHave skuId = state |> Array.exists (fun x -> x.skuId = skuId) |> not
match command with
| Compact ->
[ Events.Compacted { net = state } ]
| Favorite (date = date; skuIds = skuIds) ->
[ for skuId in Seq.distinct skuIds do
if doesntHave skuId then
Expand All @@ -61,9 +59,6 @@ type Handler(log, stream, ?maxAttempts) =
let inner = Equinox.Handler(Folds.fold, log, stream, maxAttempts = defaultArg maxAttempts 2)
member __.Execute command : Async<unit> =
inner.Decide <| fun ctx ->
let execute = Commands.interpret >> ctx.Execute
execute command
if ctx.IsCompactionDue then
execute Command.Compact
ctx.Execute (Commands.interpret command)
member __.Read : Async<Folds.State> =
inner.Query id
Loading

0 comments on commit 89c6a5d

Please sign in to comment.