Skip to content

Commit

Permalink
Push compaction down into Stores, resolves #23
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 19, 2018
1 parent cd8f75b commit 4728016
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 48 deletions.
7 changes: 3 additions & 4 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ param(
[Alias("cc")][string] $cosmosCollection=$env:EQUINOX_COSMOS_COLLECTION,
[Alias("scp")][switch][bool] $skipProvisionCosmos=$skipCosmos -or -not $cosmosServer -or -not $cosmosDatabase -or -not $cosmosCollection,
[Alias("scd")][switch][bool] $skipDeprovisionCosmos=$skipProvisionCosmos,
[string] $additionalMsBuildArgs
[string] $additionalMsBuildArgs="-t:Build"
)

Expand All @@ -21,8 +20,8 @@ $env:EQUINOX_INTEGRATION_SKIP_EVENTSTORE=[string]$skipEs
if ($skipEs) { warn "Skipping EventStore tests" }

function cliCosmos($arghs) {
Write-Host "dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run cli/Equinox.Cli cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
Write-Host "dotnet run cli/Equinox.Cli cosmos -s <REDACTED> -d $cosmosDatabase -c $cosmosCollection $arghs"
dotnet run -p cli/Equinox.Cli -f netcoreapp2.1 cosmos -s $cosmosServer -d $cosmosDatabase -c $cosmosCollection @arghs
}

if ($skipCosmos) {
Expand All @@ -31,7 +30,7 @@ if ($skipCosmos) {
warn "Skipping Provisioning Cosmos"
} else {
warn "Provisioning cosmos..."
dotnet run cli/Equinox.Cli cosmos $cosmosServer -d $cosmosDatabase -c $cosmosCollection provision -ru 10000
cliCosmos @("provision", "-ru", "1000")
$deprovisionCosmos=$true
}
$env:EQUINOX_INTEGRATION_SKIP_COSMOS=[string]$skipCosmos
Expand Down
6 changes: 3 additions & 3 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesResolver(gateway, codec, fold, initial).Resolve

let resolveEqxStreamWithCompactionEventType gateway compactionEventType (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType compactionEventType).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway _compactionEventType (StreamArgs args) =
let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
Expand Down
12 changes: 5 additions & 7 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ let resolveStreamGesWithOptimizedStorageSemantics gateway =
let resolveStreamGesWithoutAccessStrategy gateway =
GesResolver(gateway defaultBatchSize, codec, fold, initial).Resolve

let resolveStreamEqxWithCompactionSemantics gateway =
fun predicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.Predicate predicate).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway =
fun _ignoreWindowSize _ignoreCompactionPredicate (StreamArgs args) ->
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -65,7 +63,7 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrangeWithoutCompaction connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.CompactionStrategy.EventType cet).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
93 changes: 70 additions & 23 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,41 @@ module Store =
if array = null || Array.length array = 0 then serializer.Serialize(writer, null)
else writer.WriteRawValue(System.Text.Encoding.UTF8.GetString(array))

[<NoEquality; NoComparison; JsonObject(ItemRequired=Required.Always)>]
type IndexEvent =
{ p: string // "{streamName}"
id: string // "{-1}"

w: int64 // 100: window size
/// last index/i value
m: int64 // {index}

(* "x": [
{ "i":0,
"c":"ISO 8601"
"e":[
[{"t":"added","d":"..."},{"t":"compacted/1","d":"..."}],
[{"t":"removed","d":"..."}],
]
}
] *)
x: JObject[][] }

(* Pseudocode:
function sync(p, expectedVersion, windowSize, events) {
if (i == 0) then {
coll.insert(p,0,{ p:p, id:-1, w:windowSize, m:flatLen(events)})
} else {
const i = doc.find(p=p && id=-1)
if(i.m <> expectedVersion) then emit from expectedVersion else
i.x.append(events)
for (var (i, c, e: [ {e1}, ...]) in events) {
coll.insert({p:p, id:i, i:i, c:c, e:e1)
}
// trim i.x to w total items in i.[e]
coll.update(p,id,i)
}
} *)
[<RequireQualifiedAccess>]
type Direction = Forward | Backward with
override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
Expand Down Expand Up @@ -289,7 +324,6 @@ module UnionEncoderAdapters =

type [<NoComparison>]Token = { pos: Store.Position; compactionEventNumber: int64 option }

[<CompilationRepresentation(CompilationRepresentationFlags.ModuleSuffix)>]
module Token =
let private create compactionEventNumber batchCapacityLimit pos : Storage.StreamToken =
{ value = box { pos = pos; compactionEventNumber = compactionEventNumber }; batchCapacityLimit = batchCapacityLimit }
Expand Down Expand Up @@ -385,26 +419,49 @@ type private Collection(gateway : EqxGateway, databaseId, collectionId) =
member __.Gateway = gateway
member __.CollectionUri = Client.UriFactory.CreateDocumentCollectionUri(databaseId, collectionId)

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?compactionStrategy) =
[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
| EventsAreState
| RollingSnapshots of eventType: string * compact: ('state -> 'event)

type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) =
/// Determines whether writing a Compaction event is warranted (based on the existing state and the current `Accumulated` changes)
member __.IsCompactionDue = eventsLen > capacityBeforeCompaction

type private Category<'event, 'state>(coll : Collection, codec : UnionCodec.IUnionEncoder<'event, byte[]>, ?access : AccessStrategy<'event,'state>) =
let (|Pos|) streamName : Store.Position = { collectionUri = coll.CollectionUri; streamName = streamName; index = None }
let compactionPredicate =
match access with
| None -> None
| Some AccessStrategy.EventsAreState -> Some (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> Some ((=) et)
let loadAlgorithm load (Pos pos) initial log =
let batched = load initial (coll.Gateway.LoadBatched log None pos)
let compacted predicate = load initial (coll.Gateway.LoadBackwardsStoppingAtCompactionEvent log predicate pos)
match compactionStrategy with
| Some predicate -> compacted predicate
match access with
| None -> batched
| Some AccessStrategy.EventsAreState -> compacted (fun _ -> true)
| Some (AccessStrategy.RollingSnapshots (et,_)) -> compacted ((=) et)
let load (fold: 'state -> 'event seq -> 'state) initial loadF = async {
let! token, events = loadF
return token, fold initial (UnionEncoderAdapters.decodeKnownEvents codec events) }
member __.Load (fold: 'state -> 'event seq -> 'state) (initial: 'state) streamName (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm (load fold) streamName initial log
member __.LoadFromToken (fold: 'state -> 'event seq -> 'state) (state: 'state) token (log : ILogger) : Async<Storage.StreamToken * 'state> =
(load fold) state (coll.Gateway.LoadFromToken log token compactionStrategy false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
(load fold) state (coll.Gateway.LoadFromToken log token compactionPredicate false)
member __.TrySync (fold: 'state -> 'event seq -> 'state) (log : ILogger)
(token : Storage.StreamToken, state : 'state)
(events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let events =
match access with
| None | Some AccessStrategy.EventsAreState -> events
| Some (AccessStrategy.RollingSnapshots (_,f)) ->
let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value)
if cc.IsCompactionDue then events @ [f state'] else events
let encodedEvents : Store.EventData[] = UnionEncoderAdapters.encodeEvents codec (Seq.ofList events)
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionStrategy
let! syncRes = coll.Gateway.TrySync log token encodedEvents compactionPredicate
match syncRes with
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionStrategy true))
| GatewaySyncResult.Conflict -> return Storage.SyncResult.Conflict (load fold state (coll.Gateway.LoadFromToken log token compactionPredicate true))
| GatewaySyncResult.Written token' -> return Storage.SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
Expand Down Expand Up @@ -449,8 +506,8 @@ module Caching =
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
interceptAsync (inner.Load streamName log) streamName
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) events
member __.TrySync streamName (log : ILogger) (token, state) (events : 'event list, state' : 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = inner.TrySync streamName log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict (interceptAsync resync streamName)
| Storage.SyncResult.Written (token', state') -> return Storage.SyncResult.Written (token', state') }
Expand Down Expand Up @@ -478,17 +535,12 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: '
interface ICategory<'event, 'state> with
member __.Load (streamName : string) (log : ILogger) : Async<Storage.StreamToken * 'state> =
loadAlgorithm streamName initial log
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) events
member __.TrySync _streamName(* TODO remove from main interface *) (log : ILogger) (token, state) (events : 'event list, state': 'state) : Async<Storage.SyncResult<'state>> = async {
let! syncRes = category.TrySync fold log (token, state) (events,state')
match syncRes with
| Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync
| Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') }

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CompactionStrategy =
| EventType of string
| Predicate of (string -> bool)

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
| SlidingWindow of Caching.Cache * window: TimeSpan
Expand All @@ -497,12 +549,7 @@ type CachingStrategy =

type EqxStreamBuilder<'event, 'state>(gateway : EqxGateway, codec, fold, initial, ?compaction, ?caching) =
member __.Create (databaseId, collectionId, streamName) : Equinox.IStream<'event, 'state> =
let compactionPredicateOption =
match compaction with
| None -> None
| Some (CompactionStrategy.Predicate predicate) -> Some predicate
| Some (CompactionStrategy.EventType eventType) -> Some (fun x -> x = eventType)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?compactionStrategy = compactionPredicateOption)
let category = Category<'event, 'state>(Collection(gateway, databaseId, collectionId), codec, ?access = compaction)

let readCacheOption =
match caching with
Expand Down
18 changes: 8 additions & 10 deletions tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,38 @@ let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() =
Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)

module Cart =
let fold, initial = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial
let fold, initial, compact = Domain.Cart.Folds.fold, Domain.Cart.Folds.initial, Domain.Cart.Folds.compact
let codec = genCodec<Domain.Cart.Events.Event>()
let createServiceWithoutOptimization connection batchSize log =
let gateway = createEqxGateway connection batchSize
let resolveStream _ignoreCompactionEventTypeOption (StreamArgs args) =
let resolveStream (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCompaction connection batchSize log =
let gateway = createEqxGateway connection batchSize
let resolveStream compactionEventType (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, compaction=CompactionStrategy.EventType compactionEventType).Create(args)
let resolveStream (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCaching connection batchSize log cache =
let gateway = createEqxGateway connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream _ignorecompactionEventType (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, caching = sliding20m).Create(args)
Backend.Cart.Service(log, resolveStream)
let createServiceWithCompactionAndCaching connection batchSize log cache =
let gateway = createEqxGateway connection batchSize
let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolveStream cet (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, CompactionStrategy.EventType cet, sliding20m).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.RollingSnapshots compact, sliding20m).Create(args)
Backend.Cart.Service(log, resolveStream)

module ContactPreferences =
let fold, initial = Domain.ContactPreferences.Folds.fold, Domain.ContactPreferences.Folds.initial
let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let createServiceWithoutOptimization createGateway defaultBatchSize log _ignoreWindowSize _ignoreCompactionPredicate =
let gateway = createGateway defaultBatchSize
let resolveStream _windowSize _compactionPredicate (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
Backend.ContactPreferences.Service(log, resolveStream)
let createService createGateway log =
let resolveStream batchSize compactionPredicate (StreamArgs args) =
EqxStreamBuilder(createGateway batchSize, codec, fold, initial, CompactionStrategy.Predicate compactionPredicate).Create(args)
let resolveStream (StreamArgs args) = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(args)
Backend.ContactPreferences.Service(log, resolveStream)

#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
Expand Down

0 comments on commit 4728016

Please sign in to comment.