Skip to content

Commit

Permalink
Cosmos core events API (#49), remove rolling snapshots
Browse files Browse the repository at this point in the history
* Reorganize, adding Batch structure
* Rework stored procedure
* Remove rolling snapshots
* Add explicit non-indexed mode
  • Loading branch information
bartelink authored Nov 23, 2018
1 parent aece49d commit a9ff593
Show file tree
Hide file tree
Showing 19 changed files with 1,589 additions and 1,005 deletions.
95 changes: 59 additions & 36 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

open Argu
open Domain
open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.EventStore
open Infrastructure
open Serilog
Expand Down Expand Up @@ -119,7 +119,7 @@ module Cosmos =
let connect (log: ILogger) discovery operationTimeout (maxRetryForThrottling, maxRetryWaitTime) =
EqxConnector(log=log, requestTimeout=operationTimeout, maxRetryAttemptsOnThrottledRequests=maxRetryForThrottling, maxRetryWaitTimeInSeconds=maxRetryWaitTime)
.Connect("equinox-cli", discovery)
let createGateway connection batchSize = EqxGateway(connection, EqxBatchingPolicy(maxBatchSize = batchSize))
let createGateway connection maxItems = EqxGateway(connection, EqxBatchingPolicy(defaultMaxItems=maxItems))

[<RequireQualifiedAccess; NoEquality; NoComparison>]
type Store =
Expand All @@ -135,7 +135,7 @@ module Test =
clients.[clientIndex % clients.Length]
let selectClient = async { return async { return selectClient() } }
Local.runLoadTest log reportingIntervals testsPerSecond errorCutoff duration selectClient runSingleTest
let fold, initial, compact, index = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact, Domain.Favorites.Folds.index
let fold, initial, compact = Domain.Favorites.Folds.fold, Domain.Favorites.Folds.initial, Domain.Favorites.Folds.compact
let serializationSettings = Newtonsoft.Json.Converters.FSharp.Settings.CreateCorrect()
let genCodec<'Union when 'Union :> TypeShape.UnionContract.IUnionContract>() = Equinox.UnionCodec.JsonUtf8.Create<'Union>(serializationSettings)
let codec = genCodec<Domain.Favorites.Events.Event>()
Expand All @@ -147,22 +147,19 @@ module Test =
else None
let eqxCache =
if targs.Contains Cached then
let c = Equinox.Cosmos.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
let c = Equinox.Cosmos.Builder.Caching.Cache("Cli", sizeMb = 50)
Equinox.Cosmos.Builder.CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
let resolveStream streamName =
let resolveStream =
match store with
| Store.Mem store ->
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create(streamName)
Equinox.MemoryStore.MemoryStreamBuilder(store, fold, initial).Create
| Store.Es gateway ->
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create(streamName)
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact, ?caching = esCache).Create
| Store.Cosmos (gateway, databaseId, connectionId) ->
if targs.Contains Indexed then
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.IndexedSearch index, ?caching = cache)
.Create(databaseId, connectionId, streamName)
else
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact, ?caching = cache)
.Create(databaseId, connectionId, streamName)
let store = EqxStore(gateway, EqxCollections(databaseId, connectionId))
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection compact, ?caching = eqxCache).Create
else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
let sku = Guid.NewGuid() |> SkuId
Expand All @@ -172,34 +169,41 @@ module Test =

[<AutoOpen>]
module SerilogHelpers =
let (|CosmosReadRu|CosmosWriteRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds
let (|CosmosReadRu|CosmosWriteRu|CosmosResyncRu|CosmosSliceRu|) (evt : Equinox.Cosmos.Log.Event) =
match evt with
| Equinox.Cosmos.Log.Index { ru = ru }
| Equinox.Cosmos.Log.IndexNotFound { ru = ru }
| Equinox.Cosmos.Log.IndexNotModified { ru = ru }
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Forward,_, { ru = ru })
| Equinox.Cosmos.Log.Batch (Equinox.Cosmos.Direction.Backward,_, { ru = ru }) -> CosmosReadRu ru
| Equinox.Cosmos.Log.WriteSuccess {ru = ru }
| Equinox.Cosmos.Log.WriteConflict {ru = ru } -> CosmosWriteRu ru
| Equinox.Cosmos.Log.Index (Stats s)
| Equinox.Cosmos.Log.IndexNotFound (Stats s)
| Equinox.Cosmos.Log.IndexNotModified (Stats s)
| Equinox.Cosmos.Log.Batch (_,_, (Stats s)) -> CosmosReadRu s
| Equinox.Cosmos.Log.WriteSuccess (Stats s)
| Equinox.Cosmos.Log.WriteConflict (Stats s) -> CosmosWriteRu s
| Equinox.Cosmos.Log.WriteResync (Stats s) -> CosmosResyncRu s
// slices are rolled up into batches so be sure not to double-count
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Forward,{ ru = ru })
| Equinox.Cosmos.Log.Slice (Equinox.Cosmos.Direction.Backward,{ ru = ru }) -> CosmosSliceRu ru
| Equinox.Cosmos.Log.Slice (_,{ ru = ru }) -> CosmosSliceRu ru
let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
let (|CosmosMetric|_|) (logEvent : LogEvent) : Equinox.Cosmos.Log.Event option =
match logEvent.Properties.TryGetValue("cosmosEvt") with
| true, SerilogScalar (:? Equinox.Cosmos.Log.Event as e) -> Some e
| _ -> None
type RuCounter =
{ mutable rux100: int64; mutable count: int64; mutable ms: int64 }
static member Create() = { rux100 = 0L; count = 0L; ms = 0L }
member __.Ingest (ru, ms) =
Interlocked.Increment(&__.count) |> ignore
Interlocked.Add(&__.rux100, int64 (ru*100.)) |> ignore
Interlocked.Add(&__.ms, ms) |> ignore
type RuCounterSink() =
static let mutable readX10 = 0L
static let mutable writeX10 = 0L
static member Read = readX10 / 10L
static member Write = writeX10 / 10L
static member val Read = RuCounter.Create()
static member val Write = RuCounter.Create()
static member val Resync = RuCounter.Create()
interface Serilog.Core.ILogEventSink with
member __.Emit logEvent = logEvent |> function
| CosmosMetric (CosmosReadRu ru) -> Interlocked.Add(&readX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosWriteRu ru) -> Interlocked.Add(&writeX10, int64 (ru*10.)) |> ignore
| CosmosMetric (CosmosReadRu stats) -> RuCounterSink.Read.Ingest stats
| CosmosMetric (CosmosWriteRu stats) -> RuCounterSink.Write.Ingest stats
| CosmosMetric (CosmosResyncRu stats) -> RuCounterSink.Resync.Ingest stats
| _ -> ()

let createStoreLog verbose verboseConsole maybeSeqEndpoint =
Expand Down Expand Up @@ -260,7 +264,7 @@ let main argv =
let resultFile = createResultLog report
for r in results do
resultFile.Information("Aggregate: {aggregate}", r)
log.Information("Run completed; Current memory allocation: {bytes:n0}", GC.GetTotalMemory(true))
log.Information("Run completed; Current memory allocation: {bytes:n2}MB", (GC.GetTotalMemory(true) |> float) / 1024./1024.)
0

match args.GetSubCommand() with
Expand Down Expand Up @@ -309,15 +313,34 @@ let main argv =
match sargs.TryGetSubCommand() with
| Some (Provision args) ->
let rus = args.GetResult(Rus)
log.Information("Configuring CosmosDb with Request Units (RU) Provision: {rus:n0}", rus)
Equinox.Cosmos.Initialization.initialize conn.Client dbName collName rus |> Async.RunSynchronously
log.Information("Configuring CosmosDb Collection with Throughput Provision: {rus:n0} RU/s", rus)
Equinox.Cosmos.Sync.Initialization.initialize log conn.Client dbName collName rus |> Async.RunSynchronously
0
| Some (Run targs) ->
let conn = Store.Cosmos (Cosmos.createGateway conn defaultBatchSize, dbName, collName)
let res = runTest log conn targs
let read, write = RuCounterSink.Read, RuCounterSink.Write
let total = read+write
log.Information("Total Request Charges sustained in test: {totalRus:n0} (R:{readRus:n0}, W:{writeRus:n0})", total, read, write)
let stats =
[ "Read", RuCounterSink.Read
"Write", RuCounterSink.Write
"Resync", RuCounterSink.Resync ]
let mutable totalCount, totalRc, totalMs = 0L, 0., 0L
let logActivity name count rc lat =
log.Information("{name}: {count:n0} requests costing {ru:n0} RU (average: {avg:n2}); Average latency: {lat:n0}ms",
name, count, rc, (if count = 0L then Double.NaN else rc/float count), (if count = 0L then Double.NaN else float lat/float count))
for name, stat in stats do
let ru = float stat.rux100 / 100.
totalCount <- totalCount + stat.count
totalRc <- totalRc + ru
totalMs <- totalMs + stat.ms
logActivity name stat.count ru stat.ms
logActivity "TOTAL" totalCount totalRc totalMs
let measures : (string * (TimeSpan -> float)) list =
[ "s", fun x -> x.TotalSeconds
"m", fun x -> x.TotalMinutes
"h", fun x -> x.TotalHours ]
let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru)
let duration = targs.GetResult(DurationM,1.) |> TimeSpan.FromMinutes
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)
res
| _ -> failwith "init or run is required"
| _ -> failwith "ERROR: please specify memory, es or cosmos Store"
Expand Down
3 changes: 1 addition & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Folds =
let toSnapshot (s: State) : Events.Compaction.State =
{ items = [| for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } |] }
let ofCompacted (s: Events.Compaction.State) : State =
{ items = [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
{ items = if s.items = null then [] else [ for i in s.items -> { skuId = i.skuId; quantity = i.quantity; returnsWaived = i.returnsWaived } ] }
let initial = { items = [] }
let evolve (state : State) event =
let updateItems f = { state with items = f state.items }
Expand All @@ -43,7 +43,6 @@ module Folds =
| Events.ItemWaiveReturnsChanged e -> updateItems (List.map (function i when i.skuId = e.skuId -> { i with returnsWaived = e.waived } | i -> i))
let fold state = Seq.fold evolve state
let compact = Events.Compaction.EventType, fun state -> Events.Compacted (State.toSnapshot state)
let index = (fun et -> et = Events.Compaction.EventType), fun state -> seq [ yield Events.Compacted (State.toSnapshot state) ]
type Context = { time: System.DateTime; requestId : RequestId }
type Command =
| AddItem of Context * SkuId * quantity: int
Expand Down
4 changes: 3 additions & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Events =
type Preferences = { manyPromotions : bool; littlePromotions : bool; productReview : bool; quickSurveys : bool }
type Value = { email : string; preferences : Preferences }

let [<Literal>] EventTypeName = "contactPreferencesChanged"
type Event =
| [<System.Runtime.Serialization.DataMember(Name = "contactPreferencesChanged")>]Updated of Value
| [<System.Runtime.Serialization.DataMember(Name = EventTypeName)>]Updated of Value
interface TypeShape.UnionContract.IUnionContract
let eventTypeNames = System.Collections.Generic.HashSet<string>([EventTypeName])

module Folds =
type State = Events.Preferences
Expand Down
1 change: 0 additions & 1 deletion samples/Store/Domain/Favorites.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ module Folds =
for e in events do evolve s e
s.AsState()
let compact = Events.Compaction.EventType, fun state -> Events.Compacted { net = state }
let index = (fun x -> x = Events.Compaction.EventType), fun state -> seq [ Events.Compacted { net = state } ]

type Command =
| Favorite of date : System.DateTimeOffset * skuIds : SkuId list
Expand Down
32 changes: 16 additions & 16 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.CartIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -17,15 +17,15 @@ let createServiceMem log store =

let codec = Equinox.EventStore.Integration.EventStoreIntegration.genCodec<Domain.Cart.Events.Event>()

let resolveGesStreamWithCompactionEventType gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create(streamName)
let resolveGesStreamWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway, codec, fold, initial).Create(streamName)
let resolveGesStreamWithRollingSnapshots gateway =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots compact).Create
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let resolveEqxStreamWithCompactionEventType gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial, Equinox.Cosmos.AccessStrategy.RollingSnapshots compact).Create(args)
let resolveEqxStreamWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway, codec, fold, initial).Create(args)
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection compact).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.FlowAsync(cartId, fun _ctx execute ->
Expand Down Expand Up @@ -60,24 +60,24 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCompactionSemantics
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_EVENTSTORE")>]
let ``Can roundtrip against EventStore, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithCompactionEventType
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithoutCompactionSemantics
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithoutCustomAccessStrategy
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveEqxStreamWithCompactionEventType
let ``Can roundtrip against Cosmos, correctly folding the events with With Projection`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveEqxStreamWithProjection
do! act service args
}
22 changes: 11 additions & 11 deletions samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module Samples.Store.Integration.ContactPreferencesIntegration

open Equinox.Cosmos
open Equinox.Cosmos.Builder
open Equinox.Cosmos.Integration
open Equinox.EventStore
open Equinox.MemoryStore
Expand All @@ -16,15 +16,15 @@ let createServiceMem log store =
Backend.ContactPreferences.Service(log, MemoryStreamBuilder(store, fold, initial).Create)

let codec = genCodec<Domain.ContactPreferences.Events.Event>()
let resolveStreamGesWithCompactionSemantics gateway streamName =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create(streamName)
let resolveStreamGesWithoutCompactionSemantics gateway streamName =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(streamName)
let resolveStreamGesWithCompactionSemantics gateway =
GesStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.EventsAreState).Create
let resolveStreamGesWithoutCompactionSemantics gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway 1, codec, fold, initial, Equinox.Cosmos.AccessStrategy.EventsAreState).Create(args)
let resolveStreamEqxWithoutCompactionSemantics gateway (StreamArgs args) =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create(args)
let resolveStreamEqxWithCompactionSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType Domain.ContactPreferences.Events.eventTypeNames).Create
let resolveStreamEqxWithoutCompactionSemantics gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
Expand Down Expand Up @@ -63,12 +63,12 @@ type Tests(testOutputHelper) =

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithoutCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithoutCompactionSemantics
do! act service args
}

[<AutoData(SkipIfRequestedViaEnvironmentVariable="EQUINOX_INTEGRATION_SKIP_COSMOS")>]
let ``Can roundtrip against Cosmos, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxGateway resolveStreamEqxWithCompactionSemantics
let! service = arrange connectToSpecifiedCosmosOrSimulator createEqxStore resolveStreamEqxWithCompactionSemantics
do! act service args
}
Loading

0 comments on commit a9ff593

Please sign in to comment.