Skip to content

Commit

Permalink
Push compaction down into Stores, resolves #23
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 26, 2018
1 parent d325ac6 commit fd17461
Show file tree
Hide file tree
Showing 8 changed files with 101 additions and 50 deletions.
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ While the implementations are distilled from code from [`Jet.com` systems dating
- support, (via `UnionContractEncoder`) for the maintenance of multiple co-existing compaction schemas in a given stream (A snapshot isa Event)
- compaction events typically do not get deleted (consistent with how EventStore works), although it is safe to do so in concept
- NB while this works well, and can deliver excellent performance (especially when allied with the Cache), [it's not a panacea, as noted in this excellent EventStore article on the topic](https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html)
- **Azure CosmosDb Indexed mode**: Using `Equinox.Cosmos`, command processing can be optimized through an index document in the same partition as the event documents that maintains:
a) compacted rendition(s) of the folded state
b) (optionally) events since those snapshots have last been updated

This yields many of the benefits of rolling snapshots while reducing latency, RU provisioning requirement, and Request Charges:-
- no additional roundtrips to the store needed at either the Load or Sync points in the flow
- when coupled with the cache, a typical read is a point read with an etag, costing 1 RU
- The index isa DocDb Document, but _not_ an Event
- The index can safely be deleted at any time; it'll get regenerated in the course of normal processing

# Elements

Expand Down Expand Up @@ -95,7 +104,7 @@ $env:EQUINOX_COSMOS_COLLECTION="equinox-test"
cli/Equinox.cli/bin/Release/net461/Equinox.Cli `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION `
run
dotnet .\benchmarks\Equinox.Cli\bin\Release\netcoreapp2.1\Equinox.Cli.dll `
dotnet .\cli\Equinox.Cli\bin\Release\netcoreapp2.1\Equinox.Cli.dll `
cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_COLLECTION `
run
```
Expand Down
7 changes: 3 additions & 4 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ param(
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs
[string] $additionalMsBuildArgs="-t:Build"
)

Expand All @@ -21,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs
if ($skipEs) { warn "Skipping EventStore tests" }

function cliCosmos($arghs) {
Write-Host "dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
Write-Host "dotnet run cli/Equinox.Cli cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
}

if ($skipCosmos) {
Expand All @@ -31,7 +30,7 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
dotnet run cli/Equinox.Cli cosmos $cosmosServer -d $cosmosDatabase -c $cosmosCollection provision -ru 10000
cliCosmos @("provision", "-ru", "1000")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
2 changes: 1 addition & 1 deletion cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ module Test =
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create(streamName)
| Store.Cosmos (gateway, databaseId, connectionId) ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.RollingSnapshots snapshot).Create(streamName,databaseId, connectionId)
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots snapshot).Create(streamName,databaseId, connectionId)
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let resolveEqxStreamWithCompactionEventType gateway compactionEventType (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType compactionEventType).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway _compactionEventType (StreamArgs args) =
let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
Expand Down
12 changes: 5 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway =
let resolveStreamGesWithoutAccessStrategy gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway =
fun predicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.Predicate predicate).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway =
fun _ignoreWindowSize _ignoreCompactionPredicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -65,7 +63,7 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrangeWithoutCompaction connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType cet).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
93 changes: 70 additions & 23 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,41 @@ module Store =
if array = null || Array.length array = 0 then serializer.Serialize(writer, null)
else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array))

[<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
type IndexEvent =
{ p: string // "{streamName}"
id: string // "{-1}"

w: int64 // 100: window size
/// last index/i value
m: int64 // {index}

(* "x": [
{ "i":0,
"c":"ISO 8601"
"e":[
[{"t":"added","d":"..."},{"t":"compacted/1","d":"..."}],
[{"t":"removed","d":"..."}],
]
}
] *)
x: JObject[][] }

(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
coll.insert(p,0,{ p:p, id:-1, w:windowSize, m:flatLen(events)})
} else {
const i = doc.find(p=p && id=-1)
if(i.m <> expectedVersion) then emit from expectedVersion else
i.x.append(events)
for (var (i, c, e: [ {e1}, ...]) in events) {
coll.insert({p:p, id:i, i:i, c:c, e:e1)
}
// trim i.x to w total items in i.[e]
coll.update(p,id,i)
}
} *)
[<RequireQualifiedAccess>]
type Direction = Forward | Backward with
override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
Expand Down Expand Up @@ -289,7 +324,6 @@ module UnionEncoderAdapters =

type [<NoComparison>]Token = { pos: Store.Position; compactionEventNumber: int64 option }

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Token =
let private create compactionEventNumber batchCapacityLimit pos : Storage.StreamToken =
{ value = box { pos = pos; compactionEventNumber = compactionEventNumber }; batchCapacityLimit = batchCapacityLimit }
Expand Down Expand Up @@ -385,26 +419,49 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) =
member __.Gateway = gateway
member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId)

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?compactionStrategy) =
[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
| EventsAreState
| RollingSnapshots of eventType: string * compact: ('state -> 'event)

type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) =
/// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes)
member __.IsCompactionDue = eventsLen > capacityBeforeCompaction

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?access : AccessStrategy<'event,'state>) =
let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None }
let compactionPredicate =
match access with
| None -> None
| Some AccessStrategy.EventsAreState -> Some (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et)
let loadAlgorithm load (Pos pos) initial log =
let batched = load initial (coll.Gateway.LoadBatched log None pos)
let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos)
match compactionStrategy with
| Some predicate -> compacted predicate
match access with
| None -> batched
| Some AccessStrategy.EventsAreState -> compacted (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et)
let load (fold: 'state -> 'event seq -> 'state) initial loadF = async {
let! token, events = loadF
return token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) }
member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm (load fold) streamName initial log
member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) token (log : ILogger) : Async<Storage.StreamToken * 'state> =
(load fold) state (coll.Gateway.LoadFromToken log token compactionStrategy false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
(load fold) state (coll.Gateway.LoadFromToken log token compactionPredicate false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger)
(token : Storage.StreamToken, state : 'state)
(events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let events =
match access with
| None | Some AccessStrategy.EventsAreState -> events
| Some (AccessStrategy.RollingSnapshots (_,f)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [f state'] else events
let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events)
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionStrategy
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate
match syncRes with
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionStrategy true))
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true))
| GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
Expand Down Expand Up @@ -449,8 +506,8 @@ module Caching =
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
interceptAsync (inner.Load streamName log) streamName
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) events
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync streamName)
| Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (token', state') }
Expand Down Expand Up @@ -478,17 +535,12 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: '
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm streamName initial log
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) events
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list, state': 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync
| Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') }

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CompactionStrategy =
| EventType of string
| Predicate of (string -> bool)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
| SlidingWindow of Caching.Cache * window: TimeSpan
Expand All @@ -497,12 +549,7 @@ type CachingStrategy =

type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?compaction, ?caching) =
member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> =
let compactionPredicateOption =
match compaction with
| None -> None
| Some (CompactionStrategy.Predicate predicate) -> Some predicate
| Some (CompactionStrategy.EventType eventType) -> Some (fun x -> x = eventType)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?compactionStrategy = compactionPredicateOption)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?access = compaction)

let readCacheOption =
match caching with
Expand Down
Loading

0 comments on commit fd17461

Please sign in to comment.