Skip to content

Commit

Permalink
Clarify projections/unfolding (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Nov 27, 2018
1 parent 05c8342 commit d33749d
Show file tree
Hide file tree
Showing 10 changed files with 246 additions and 276 deletions.
21 changes: 10 additions & 11 deletions cli/Equinox.Cli/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ module Test =
GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create
| Store.Cosmos (gateway, databaseId, connectionId) ->
let store = EqxStore(gateway, EqxCollections(databaseId, connectionId))
let projection = "Compacted",snd snapshot
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection projection, ?caching = eqxCache).Create
if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Snapshot snapshot, ?caching = eqxCache).Create
else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create
Backend.Favorites.Service(log, resolveStream)
let runFavoriteTest (service : Backend.Favorites.Service) clientId = async {
Expand All @@ -172,16 +171,16 @@ module Test =
module SerilogHelpers =
let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds
open Equinox.Cosmos
let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosSliceRc|) = function
| Log.Index (Stats s)
| Log.IndexNotFound (Stats s)
| Log.IndexNotModified (Stats s)
| Log.Batch (_,_, (Stats s)) -> CosmosReadRc s
| Log.WriteSuccess (Stats s)
| Log.WriteConflict (Stats s) -> CosmosWriteRc s
| Log.WriteResync (Stats s) -> CosmosResyncRc s
let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function
| Log.Tip (Stats s)
| Log.TipNotFound (Stats s)
| Log.TipNotModified (Stats s)
| Log.Query (_,_, (Stats s)) -> CosmosReadRc s
// slices are rolled up into batches so be sure not to double-count
| Log.Slice (_,(Stats s)) -> CosmosSliceRc s
| Log.Response (_,(Stats s)) -> CosmosResponseRc s
| Log.SyncSuccess (Stats s)
| Log.SyncConflict (Stats s) -> CosmosWriteRc s
| Log.SyncResync (Stats s) -> CosmosResyncRc s
let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function
| (:? ScalarValue as x) -> Some x.Value
| _ -> None
Expand Down
3 changes: 1 addition & 2 deletions samples/Store/Integration/CartIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ let resolveGesStreamWithRollingSnapshots gateway =
let resolveGesStreamWithoutCustomAccessStrategy gateway =
GesStreamBuilder(gateway, codec, fold, initial).Create

let projection = "Compacted",snd snapshot
let resolveEqxStreamWithProjection gateway =
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection projection).Create
EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Create
let resolveEqxStreamWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway, codec, fold, initial).Create

Expand Down
2 changes: 1 addition & 1 deletion samples/Store/Integration/ContactPreferencesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ let resolveStreamGesWithoutAccessStrategy gateway =
GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

let resolveStreamEqxWithKnownEventTypeSemantics gateway =
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType (System.Collections.Generic.HashSet ["contactPreferencesChanged"])).Create
EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Create
let resolveStreamEqxWithoutCustomAccessStrategy gateway =
EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create

Expand Down
3 changes: 1 addition & 2 deletions samples/Store/Integration/FavoritesIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ let createServiceGes gateway log =
Backend.Favorites.Service(log, resolveStream)

let createServiceEqx gateway log =
let projection = "Compacted",snd snapshot
let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection projection).Create
let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Create
Backend.Favorites.Service(log, resolveStream)

type Tests(testOutputHelper) =
Expand Down
26 changes: 13 additions & 13 deletions samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ module EquinoxEsInterop =
module EquinoxCosmosInterop =
open Equinox.Cosmos
[<NoEquality; NoComparison>]
type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; batches: int option; ru: float } with
type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
let flatten (evt : Log.Event) : FlatMetric =
let action, metric, batches, ru =
match evt with
| Log.WriteSuccess m -> "EqxAppendToStreamAsync", m, None, m.ru
| Log.WriteConflict m -> "EqxAppendToStreamConflictAsync", m, None, m.ru
| Log.WriteResync m -> "EqxAppendToStreamResyncAsync", m, None, m.ru
| Log.Slice (Direction.Forward,m) -> "EqxReadStreamEventsForwardAsync", m, None, m.ru
| Log.Slice (Direction.Backward,m) -> "EqxReadStreamEventsBackwardAsync", m, None, m.ru
| Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru
| Log.Batch (Direction.Backward,c,m) -> "EqxLoadB", m, Some c, m.ru
| Log.Index m -> "EqxLoadI", m, None, m.ru
| Log.IndexNotFound m -> "EqxLoadI404", m, None, m.ru
| Log.IndexNotModified m -> "EqxLoadI302", m, None, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; batches = batches
| Log.Tip m -> "CosmosTip", m, None, m.ru
| Log.TipNotFound m -> "CosmosTip404", m, None, m.ru
| Log.TipNotModified m -> "CosmosTip302", m, None, m.ru
| Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru
| Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru
| Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru
| Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru
| Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru
| Log.SyncConflict m -> "CosmosSync409", m, None, m.ru
| Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru
{ action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches
interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru }

type SerilogMetricsExtractor(emit : string -> unit) =
Expand Down Expand Up @@ -127,5 +127,5 @@ type Tests() =
let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway)
let itemCount = batchSize / 2 + 1
let cartId = Guid.NewGuid() |> CartId
do! act buffer service itemCount context cartId skuId "Eqx Index " // one is a 404, one is a 200
do! act buffer service itemCount context cartId skuId "EqxCosmos Tip " // one is a 404, one is a 200
}
Loading

0 comments on commit d33749d

Please sign in to comment.