diff --git a/cli/Equinox.Cli/Program.fs b/cli/Equinox.Cli/Program.fs index 579a4116d..5ad3884c3 100644 --- a/cli/Equinox.Cli/Program.fs +++ b/cli/Equinox.Cli/Program.fs @@ -158,8 +158,7 @@ module Test = GesStreamBuilder(gateway, codec, fold, initial, Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot, ?caching = esCache).Create | Store.Cosmos (gateway, databaseId, connectionId) -> let store = EqxStore(gateway, EqxCollections(databaseId, connectionId)) - let projection = "Compacted",snd snapshot - if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection projection, ?caching = eqxCache).Create + if targs.Contains Indexed then EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Snapshot snapshot, ?caching = eqxCache).Create else EqxStreamBuilder(store, codec, fold, initial, ?access=None, ?caching = eqxCache).Create Backend.Favorites.Service(log, resolveStream) let runFavoriteTest (service : Backend.Favorites.Service) clientId = async { @@ -172,16 +171,16 @@ module Test = module SerilogHelpers = let inline (|Stats|) ({ interval = i; ru = ru }: Equinox.Cosmos.Log.Measurement) = ru, let e = i.Elapsed in int64 e.TotalMilliseconds open Equinox.Cosmos - let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosSliceRc|) = function - | Log.Index (Stats s) - | Log.IndexNotFound (Stats s) - | Log.IndexNotModified (Stats s) - | Log.Batch (_,_, (Stats s)) -> CosmosReadRc s - | Log.WriteSuccess (Stats s) - | Log.WriteConflict (Stats s) -> CosmosWriteRc s - | Log.WriteResync (Stats s) -> CosmosResyncRc s + let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function + | Log.Tip (Stats s) + | Log.TipNotFound (Stats s) + | Log.TipNotModified (Stats s) + | Log.Query (_,_, (Stats s)) -> CosmosReadRc s // slices are rolled up into batches so be sure not to double-count - | Log.Slice (_,(Stats s)) -> CosmosSliceRc s + | Log.Response (_,(Stats s)) -> CosmosResponseRc s + | Log.SyncSuccess (Stats s) + | Log.SyncConflict (Stats s) -> CosmosWriteRc s + | Log.SyncResync (Stats s) -> CosmosResyncRc s let (|SerilogScalar|_|) : Serilog.Events.LogEventPropertyValue -> obj option = function | (:? ScalarValue as x) -> Some x.Value | _ -> None diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index 2e8751a53..7124c4e86 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -22,9 +22,8 @@ let resolveGesStreamWithRollingSnapshots gateway = let resolveGesStreamWithoutCustomAccessStrategy gateway = GesStreamBuilder(gateway, codec, fold, initial).Create -let projection = "Compacted",snd snapshot let resolveEqxStreamWithProjection gateway = - EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection projection).Create + EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Create let resolveEqxStreamWithoutCustomAccessStrategy gateway = EqxStreamBuilder(gateway, codec, fold, initial).Create diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index dad7541f4..39468b41f 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -22,7 +22,7 @@ let resolveStreamGesWithoutAccessStrategy gateway = GesStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create let resolveStreamEqxWithKnownEventTypeSemantics gateway = - EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType (System.Collections.Generic.HashSet ["contactPreferencesChanged"])).Create + EqxStreamBuilder(gateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Create let resolveStreamEqxWithoutCustomAccessStrategy gateway = EqxStreamBuilder(gateway defaultBatchSize, codec, fold, initial).Create diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index 8cb90299e..e49b82596 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -21,8 +21,7 @@ let createServiceGes gateway log = Backend.Favorites.Service(log, resolveStream) let createServiceEqx gateway log = - let projection = "Compacted",snd snapshot - let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Projection projection).Create + let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial, AccessStrategy.Snapshot snapshot).Create Backend.Favorites.Service(log, resolveStream) type Tests(testOutputHelper) = diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 832172573..57445d470 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -25,22 +25,22 @@ module EquinoxEsInterop = module EquinoxCosmosInterop = open Equinox.Cosmos [] - type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; batches: int option; ru: float } with + type FlatMetric = { action: string; stream: string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru let flatten (evt : Log.Event) : FlatMetric = let action, metric, batches, ru = match evt with - | Log.WriteSuccess m -> "EqxAppendToStreamAsync", m, None, m.ru - | Log.WriteConflict m -> "EqxAppendToStreamConflictAsync", m, None, m.ru - | Log.WriteResync m -> "EqxAppendToStreamResyncAsync", m, None, m.ru - | Log.Slice (Direction.Forward,m) -> "EqxReadStreamEventsForwardAsync", m, None, m.ru - | Log.Slice (Direction.Backward,m) -> "EqxReadStreamEventsBackwardAsync", m, None, m.ru - | Log.Batch (Direction.Forward,c,m) -> "EqxLoadF", m, Some c, m.ru - | Log.Batch (Direction.Backward,c,m) -> "EqxLoadB", m, Some c, m.ru - | Log.Index m -> "EqxLoadI", m, None, m.ru - | Log.IndexNotFound m -> "EqxLoadI404", m, None, m.ru - | Log.IndexNotModified m -> "EqxLoadI302", m, None, m.ru - { action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; batches = batches + | Log.Tip m -> "CosmosTip", m, None, m.ru + | Log.TipNotFound m -> "CosmosTip404", m, None, m.ru + | Log.TipNotModified m -> "CosmosTip302", m, None, m.ru + | Log.Query (Direction.Forward,c,m) -> "CosmosQueryF", m, Some c, m.ru + | Log.Query (Direction.Backward,c,m) -> "CosmosQueryB", m, Some c, m.ru + | Log.Response (Direction.Forward,m) -> "CosmosResponseF", m, None, m.ru + | Log.Response (Direction.Backward,m) -> "CosmosResponseB", m, None, m.ru + | Log.SyncSuccess m -> "CosmosSync200", m, None, m.ru + | Log.SyncConflict m -> "CosmosSync409", m, None, m.ru + | Log.SyncResync m -> "CosmosSyncResync", m, None, m.ru + { action = action; stream = metric.stream; bytes = metric.bytes; count = metric.count; responses = batches interval = StopwatchInterval(metric.interval.StartTicks,metric.interval.EndTicks); ru = ru } type SerilogMetricsExtractor(emit : string -> unit) = @@ -127,5 +127,5 @@ type Tests() = let service = Backend.Cart.Service(log, CartIntegration.resolveEqxStreamWithProjection gateway) let itemCount = batchSize / 2 + 1 let cartId = Guid.NewGuid() |> CartId - do! act buffer service itemCount context cartId skuId "Eqx Index " // one is a 404, one is a 200 + do! act buffer service itemCount context cartId skuId "EqxCosmos Tip " // one is a 404, one is a 200 } \ No newline at end of file diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 592db84d9..d29e7966b 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1,6 +1,6 @@ namespace Equinox.Cosmos.Events -/// Common form for either a raw Event or a Projection +/// Common form for either a Domain Event or an Unfolded Event type IEvent = /// The Event Type, used to drive deserialization abstract member EventType : string @@ -9,13 +9,13 @@ type IEvent = /// Optional metadata (null, or same as d, not written if missing) abstract member Meta : byte[] -/// Represents an Event or Projection and its relative position in the event sequence +/// Represents a Domain Event or Unfold, together with it's Index in the event sequence type IIndexedEvent = inherit IEvent /// The index into the event sequence of this event abstract member Index : int64 - /// Indicates whether this is a primary event or a projection based on the events <= up to `Index` - abstract member IsProjection: bool + /// Indicates whether this is a Domain Event or an Unfolded Event based on the state inferred from the events up to `Index` + abstract member IsUnfold: bool /// Position and Etag to which an operation is relative type [] Position = { index: int64; etag: string option } with @@ -35,7 +35,7 @@ namespace Equinox.Cosmos.Store open Equinox.Cosmos.Events open Newtonsoft.Json -/// A 'normal' (frozen, not Pending) Batch of Events, without any Projections +/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds) type [] Event = { /// DocDb-mandated Partition Key, must be maintained within the document @@ -56,10 +56,10 @@ type [] /// Same as `id`; necessitated by fact that it's not presently possible to do an ORDER BY on the row key i: int64 // {index} - /// Creation date (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) + /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.) c: System.DateTimeOffset // ISO 8601 - /// The Event Type, used to drive deserialization + /// The Case (Event Type), used to drive deserialization t: string // required /// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for DocDb @@ -77,19 +77,18 @@ type [] static member IndexedFields = [Event.PartitionKeyField; "i"] /// If we encounter a -1 doc, we're interested in its etag so we can re-read for one RU member x.TryToPosition() = - if x.id <> WipBatch.WellKnownDocumentId then None + if x.id <> Tip.WellKnownDocumentId then None else Some { index = (let ``x.e.LongLength`` = 1L in x.i+``x.e.LongLength``); etag = match x._etag with null -> None | x -> Some x } - /// The Special 'Pending' Batch Format /// NB this Type does double duty as /// a) transport for when we read it -/// b) a way of encoding a batch that the stored procedure will write in to the actual document +/// b) a way of encoding a batch that the stored procedure will write in to the actual document (`i` is -1 until Stored Proc computes it) /// The stored representation has the following differences vs a 'normal' (frozen/completed) Batch /// a) `id` and `i` = `-1` as WIP document currently always is /// b) events are retained as in an `e` array, not top level fields -/// c) contains projections (`c`) +/// c) contains unfolds (`c`) and [] - WipBatch = + Tip = { /// Partition key, as per Batch p: string // "{streamName}" /// Document Id within partition, as per Batch @@ -107,8 +106,8 @@ and [] /// Events e: BatchEvent[] - /// Projections - c: Projection[] } + /// Compaction/Snapshot/Projection events + c: Unfold[] } /// arguably this should be a high nember to reflect fact it is the freshest ? static member WellKnownDocumentId = "-1" /// Create Position from [Wip]Batch record context (facilitating 1 RU reads) @@ -130,15 +129,12 @@ and [] [)>] [] m: byte[] } // optional -/// Projection based on the state at a given point in time `i` -and Projection = - { /// Base: Max index rolled into this projection +/// Compaction/Snaphot/Projection Event based on the state at a given point in time `i` +and Unfold = + { /// Base: Stream Position (Version) of State from which this Unfold Event was generated i: int64 - ///// Indicates whether this is actually an event being retained to support a lagging projection - //x: bool - - /// The Event Type of this compaction/snapshot, used to drive deserialization + /// The Case (Event Type) of this compaction/snapshot, used to drive deserialization t: string // required /// Event body - Json -> UTF-8 -> Deflate -> Base64 @@ -151,39 +147,39 @@ and Projection = m: byte[] } // optional type Enum() = - static member Events (b:WipBatch) = + static member Events(b: Tip) = b.e |> Seq.mapi (fun offset x -> - { new IIndexedEvent with - member __.Index = b._i + int64 offset - member __.IsProjection = false - member __.EventType = x.t - member __.Data = x.d - member __.Meta = x.m }) - static member Events (i: int64, e:BatchEvent[]) = + { new IIndexedEvent with + member __.Index = b._i + int64 offset + member __.IsUnfold = false + member __.EventType = x.t + member __.Data = x.d + member __.Meta = x.m }) + static member Events(i: int64, e: BatchEvent[]) = e |> Seq.mapi (fun offset x -> { new IIndexedEvent with member __.Index = i + int64 offset - member __.IsProjection = false + member __.IsUnfold = false member __.EventType = x.t member __.Data = x.d member __.Meta = x.m }) - static member Event (x:Event) = + static member Event(x: Event) = Seq.singleton { new IIndexedEvent with member __.Index = x.i - member __.IsProjection = false + member __.IsUnfold = false member __.EventType = x.t member __.Data = x.d member __.Meta = x.m } - static member Projections (xs: Projection[]) = seq { + static member Unfolds(xs: Unfold[]) = seq { for x in xs -> { new IIndexedEvent with member __.Index = x.i - member __.IsProjection = true + member __.IsUnfold = true member __.EventType = x.t member __.Data = x.d member __.Meta = x.m } } - static member EventsAndProjections (x:WipBatch): IIndexedEvent seq = - Enum.Projections x.c + static member EventsAndUnfolds (x:Tip): IIndexedEvent seq = + Enum.Unfolds x.c /// Reference to Collection and name that will be used as the location for the stream type [] CollectionStream = { collectionUri: System.Uri; name: string } with @@ -211,25 +207,25 @@ module Log = type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int; ru: float } [] type Event = - | WriteSuccess of Measurement - | WriteResync of Measurement - | WriteConflict of Measurement + /// Individual read request for the Tip + | Tip of Measurement + /// Individual read request for the Tip, not found + | TipNotFound of Measurement + /// Tip read with Single RU Request Charge due to correct use of etag in cache + | TipNotModified of Measurement + /// Summarizes a set of Responses for a given Read request + | Query of Direction * responses: int * Measurement /// Individual read request in a Batch - | Slice of Direction * Measurement - /// Individual read request for the Index - | Index of Measurement - /// Individual read request for the Index, not found - | IndexNotFound of Measurement - /// Index read with Single RU Request Charge due to correct use of etag in cache - | IndexNotModified of Measurement - /// Summarizes a set of Slices read together - | Batch of Direction * slices: int * Measurement + | Response of Direction * Measurement + | SyncSuccess of Measurement + | SyncResync of Measurement + | SyncConflict of Measurement let prop name value (log : ILogger) = log.ForContext(name, value) let propData name (events: #IEvent seq) (log : ILogger) = let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (System.Text.Encoding.UTF8.GetString e.Data) } log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) let propEvents = propData "events" - let propDataProjections = Enum.Projections >> propData "projections" + let propDataUnfolds = Enum.Unfolds >> propData "unfolds" let withLoggedRetries<'t> (retryPolicy: IRetryPolicy option) (contextLabel : string) (f : ILogger -> Async<'t>) log: Async<'t> = match retryPolicy with @@ -258,7 +254,6 @@ module private DocDb = | :? AggregateException as agg when agg.InnerExceptions.Count = 1 -> aux agg.InnerExceptions.[0] | _ -> e - aux exn /// DocumentDB Error HttpStatusCode extractor let (|DocDbException|_|) (e : exn) = @@ -393,7 +388,7 @@ function sync(req, expectedVersion) { | Conflict of Position * events: IIndexedEvent[] | ConflictUnknown of Position - let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: WipBatch) + let private run (client: IDocumentClient) (stream: CollectionStream) (expectedVersion: int64 option, req: Tip) : Async = async { let sprocLink = sprintf "%O/sprocs/%s" stream.collectionUri sprocName let opts = Client.RequestOptions(PartitionKey=PartitionKey(stream.name)) @@ -409,41 +404,41 @@ function sync(req, expectedVersion) { | [||] -> Result.ConflictUnknown newPos | xs -> Result.Conflict (newPos, Enum.Events (ev.index, xs) |> Array.ofSeq) } - let private logged client (stream: CollectionStream) (expectedVersion, req: WipBatch) (log : ILogger) + let private logged client (stream: CollectionStream) (expectedVersion, req: Tip) (log : ILogger) : Async = async { let verbose = log.IsEnabled Events.LogEventLevel.Debug - let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataProjections req.c else log + let log = if verbose then log |> Log.propEvents (Enum.Events req) |> Log.propDataUnfolds req.c else log let (Log.BatchLen bytes), count = Enum.Events req, req.e.Length let log = log |> Log.prop "bytes" bytes let writeLog = log |> Log.prop "stream" stream.name |> Log.prop "expectedVersion" expectedVersion - |> Log.prop "count" req.e.Length |> Log.prop "pcount" req.c.Length - let! t, (ru,result) = run client stream (expectedVersion, req) |> Stopwatch.Time + |> Log.prop "count" req.e.Length |> Log.prop "ucount" req.c.Length + let! t, (ru,result) = run client stream (expectedVersion,req) |> Stopwatch.Time let resultLog = let mkMetric ru : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } - let logConflict () = writeLog.Information("Eqx Sync Conflict writing {eventTypes}", [| for x in req.e -> x.t |]) + let logConflict () = writeLog.Information("EqxCosmos Sync: Conflict writing {eventTypes}", [| for x in req.e -> x.t |]) match result with | Result.Written pos -> - log |> Log.event (Log.WriteSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos + log |> Log.event (Log.SyncSuccess (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos | Result.ConflictUnknown pos -> logConflict () - log |> Log.event (Log.WriteConflict (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos |> Log.prop "conflict" true + log |> Log.event (Log.SyncConflict (mkMetric ru)) |> Log.prop "nextExpectedVersion" pos |> Log.prop "conflict" true | Result.Conflict (pos, xs) -> logConflict () let log = if verbose then log |> Log.prop "nextExpectedVersion" pos |> Log.propData "conflicts" xs else log - log |> Log.event (Log.WriteResync(mkMetric ru)) |> Log.prop "conflict" true - resultLog.Information("Eqx {action:l} {count}+{pcount} {ms}ms rc={ru}", "Write", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) + log |> Log.event (Log.SyncResync(mkMetric ru)) |> Log.prop "conflict" true + resultLog.Information("EqxCosmos {action:l} {count}+{ucount} {ms}ms rc={ru}", "Sync", req.e.Length, req.c.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru) return result } let batch (log : ILogger) retryPolicy client pk batch: Async = let call = logged client pk batch Log.withLoggedRetries retryPolicy "writeAttempt" call log - let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) projections: WipBatch = - { p = stream.name; id = Store.WipBatch.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null + let mkBatch (stream: Store.CollectionStream) (events: IEvent[]) unfolds: Tip = + { p = stream.name; id = Store.Tip.WellKnownDocumentId; _i = -1L(*Server-managed*); _etag = null e = [| for e in events -> { c = DateTimeOffset.UtcNow; t = e.EventType; d = e.Data; m = e.Meta } |] - c = Array.ofSeq projections } - let mkProjections baseIndex (projectionEvents: IEvent seq) : Store.Projection seq = - projectionEvents |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; t = x.EventType; d = x.Data; m = x.Meta } : Store.Projection) + c = Array.ofSeq unfolds } + let mkUnfold baseIndex (unfolds: IEvent seq) : Store.Unfold seq = + unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; t = x.EventType; d = x.Data; m = x.Meta } : Store.Unfold) module Initialization = open System.Collections.ObjectModel @@ -481,27 +476,27 @@ function sync(req, expectedVersion) { //let! _aux = createAux client dbUri collName auxRu return! createProc log client collUri } -module private Index = +module private Tip = let private get (client: IDocumentClient) (stream: CollectionStream, maybePos: Position option) = let coll = DocDbCollection(client, stream.collectionUri) let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null let ro = Client.RequestOptions(PartitionKey=PartitionKey(stream.name), AccessCondition = ac) - coll.TryReadDocument(WipBatch.WellKnownDocumentId, ro) + coll.TryReadDocument(Tip.WellKnownDocumentId, ro) let private loggedGet (get : CollectionStream * Position option -> Async<_>) (stream: CollectionStream, maybePos: Position option) (log: ILogger) = async { let log = log |> Log.prop "stream" stream.name - let! t, (ru, res : ReadResult) = get (stream,maybePos) |> Stopwatch.Time + let! t, (ru, res : ReadResult) = get (stream,maybePos) |> Stopwatch.Time let log count bytes (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru }) match res with | ReadResult.NotModified -> - (log 0 0 Log.IndexNotModified).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru) + (log 0 0 Log.TipNotModified).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 302, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.NotFound -> - (log 0 0 Log.IndexNotFound).Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) + (log 0 0 Log.TipNotFound).Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 404, (let e = t.Elapsed in e.TotalMilliseconds), ru) | ReadResult.Found doc -> let log = - let (Log.BatchLen bytes), count = Enum.Projections doc.c, doc.c.Length - log bytes count Log.Index - let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataProjections doc.c |> Log.prop "etag" doc._etag - log.Information("Eqx {action:l} {res} {ms}ms rc={ru}", "Index", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) + let (Log.BatchLen bytes), count = Enum.Unfolds doc.c, doc.c.Length + log bytes count Log.Tip + let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propDataUnfolds doc.c |> Log.prop "etag" doc._etag + log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } type [] Result = NotModified | NotFound | Found of Position * IIndexedEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.UnChanged @@ -511,7 +506,7 @@ module private Index = match res with | ReadResult.NotModified -> return Result.NotModified | ReadResult.NotFound -> return Result.NotFound - | ReadResult.Found doc -> return Result.Found (doc.ToPosition(), Enum.EventsAndProjections doc |> Array.ofSeq) } + | ReadResult.Found doc -> return Result.Found (doc.ToPosition(), Enum.EventsAndUnfolds doc |> Array.ofSeq) } module private Query = open Microsoft.Azure.Documents.Linq @@ -525,8 +520,8 @@ module private Index = let feedOptions = new Client.FeedOptions(PartitionKey=PartitionKey(stream.name), MaxItemCount=Nullable maxItems) client.CreateDocumentQuery(stream.collectionUri, querySpec, feedOptions).AsDocumentQuery() - // Unrolls the Batches in a response - note when reading backawards, the events are emitted in reverse order of index - let private handleSlice direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) + // Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index + let private handleResponse direction (stream: CollectionStream) (startPos: Position option) (query: IDocumentQuery) (log: ILogger) : Async = async { let! ct = Async.CancellationToken let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time @@ -535,16 +530,16 @@ module private Index = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = stream.name; interval = t; bytes = bytes; count = count; ru = ru } // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if batches.Length = 0 && count = 0 && ru = 0. then log else let evt = Log.Slice (direction, reqMetric) in log |> Log.event evt + let log = if batches.Length = 0 && count = 0 && ru = 0. then log else let evt = Log.Response (direction, reqMetric) in log |> Log.event evt let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i }) (log |> Log.prop "startIndex" (match startPos with Some { index = i } -> Nullable i | _ -> Nullable()) |> Log.prop "bytes" bytes) - .Information("Eqx {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}", - "Query", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) + .Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}", + "Response", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru) let maybePosition = batches |> Array.tryPick (fun x -> x.TryToPosition()) return events, maybePosition, ru } - let private runQuery (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) + let private run (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async) (maxPermittedBatchReads: int option) (query: IDocumentQuery) : AsyncSeq = @@ -560,14 +555,15 @@ module private Index = yield! loop (batchCount + 1) } loop 0 - let private logBatchRead direction batchSize streamName interval (responsesCount, events : IIndexedEvent []) nextI (ru: float) (log : ILogger) = + let private logQuery direction batchSize streamName interval (responsesCount, events : IIndexedEvent []) nextI (ru: float) (log : ILogger) = let (Log.BatchLen bytes), count = events, events.Length let reqMetric : Log.Measurement = { stream = streamName; interval = interval; bytes = bytes; count = count; ru = ru } - let action = match direction with Direction.Forward -> "LoadF" | Direction.Backward -> "LoadB" + let action = match direction with Direction.Forward -> "QueryF" | Direction.Backward -> "QueryB" // TODO investigate whether there is a way to avoid the potential cost (or whether there is significance to it) of these null responses - let log = if count = 0 && ru = 0. then log else let evt = Log.Event.Batch (direction, responsesCount, reqMetric) in log |> Log.event evt - (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize).Information( - "Eqx {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", + let log = if count = 0 && ru = 0. then log else let evt = Log.Event.Query (direction, responsesCount, reqMetric) in log |> Log.event evt + let evt = Log.Event.Query (direction, responsesCount, reqMetric) + (log |> Log.prop "bytes" bytes |> Log.prop "batchSize" batchSize |> Log.event evt).Information( + "EqxCosmos {action:l} {stream} v{nextI} {count}/{responses} {ms}ms rc={ru}", action, streamName, nextI, count, responsesCount, (let e = interval.Elapsed in e.TotalMilliseconds), ru) let private calculateUsedVersusDroppedPayload stopIndex (xs: IIndexedEvent[]) : int * int = @@ -580,45 +576,45 @@ module private Index = if x.Index = stopIndex then found <- true used, dropped - let walk (log : ILogger) client retryPolicy maxItems maxRequests direction (stream: CollectionStream) startPos predicate - : Async = async { + let walk<'event> (log : ILogger) client retryPolicy maxItems maxRequests direction (stream: CollectionStream) startPos + (tryDecode : IIndexedEvent -> 'event option, isOrigin: 'event -> bool) + : Async = async { let responseCount = ref 0 - let mergeBatches (log : ILogger) (batchesBackward : AsyncSeq) - : Async = async { - let mutable lastResponse = None - let mutable maybeIndexDocument = None - let mutable ru = 0.0 + let mergeBatches (log : ILogger) (batchesBackward: AsyncSeq) = async { + let mutable lastResponse, mapbeTipPos, ru = None, None, 0. let! events = batchesBackward |> AsyncSeq.map (fun (events, maybePos, r) -> - if maybeIndexDocument = None then maybeIndexDocument <- maybePos + if mapbeTipPos = None then mapbeTipPos <- maybePos lastResponse <- Some events; ru <- ru + r incr responseCount - events) + events |> Array.map (fun x -> x, tryDecode x)) |> AsyncSeq.concatSeq - |> AsyncSeq.takeWhileInclusive (fun x -> - if not (predicate x) then true // continue the search - else + |> AsyncSeq.takeWhileInclusive (function + | x, Some e when isOrigin e -> match lastResponse with - | None -> log.Information("Eqx Stop stream={stream} at={index}", stream.name, x.Index) + | None -> log.Information("EqxCosmos Stop stream={stream} at={index} {case}", stream.name, x.Index, x.EventType) | Some batch -> let used, residual = batch |> calculateUsedVersusDroppedPayload x.Index - log.Information("Eqx Stop stream={stream} at={index} used={used} residual={residual}", stream.name, x.Index, used, residual) - false) + log.Information("EqxCosmos Stop stream={stream} at={index} {case} used={used} residual={residual}", + stream.name, x.Index, x.EventType, used, residual) + false + | _ -> true) (*continue the search*) |> AsyncSeq.toArrayAsync - return events, maybeIndexDocument, ru } + return events, mapbeTipPos, ru } use query = mkQuery client maxItems stream direction startPos - let pullSlice = handleSlice direction stream startPos + let pullSlice = handleResponse direction stream startPos let retryingLoggingReadSlice query = Log.withLoggedRetries retryPolicy "readAttempt" (pullSlice query) let log = log |> Log.prop "batchSize" maxItems |> Log.prop "stream" stream.name let readlog = log |> Log.prop "direction" direction - let batches : AsyncSeq = runQuery readlog retryingLoggingReadSlice maxRequests query - let! t, (events, maybeIndexDocument, ru) = mergeBatches log batches |> Stopwatch.Time + let batches : AsyncSeq = run readlog retryingLoggingReadSlice maxRequests query + let! t, (events, maybeTipPos, ru) = mergeBatches log batches |> Stopwatch.Time query.Dispose() - let pos = match maybeIndexDocument with Some p -> p | None -> Position.FromMaxIndex events + let raws, decoded = (Array.map fst events), (events |> Seq.choose snd |> Array.ofSeq) + let pos = match maybeTipPos with Some p -> p | None -> Position.FromMaxIndex raws - log |> logBatchRead direction maxItems stream.name t (!responseCount,events) pos.index ru - return pos, events } + log |> logQuery direction maxItems stream.name t (!responseCount,raws) pos.index ru + return pos, decoded } type [] Token = { stream: CollectionStream; pos: Position } module Token = @@ -639,7 +635,6 @@ open FSharp.Control open Microsoft.Azure.Documents open Serilog open System -open System.Collections.Generic [] module Internal = @@ -647,12 +642,13 @@ module Internal = type InternalSyncResult = Written of Storage.StreamToken | ConflictUnknown of Storage.StreamToken | Conflict of Storage.StreamToken * IIndexedEvent[] [] - type LoadFromTokenResult = Unchanged | Found of Storage.StreamToken * IIndexedEvent[] + type LoadFromTokenResult<'event> = Unchanged | Found of Storage.StreamToken * 'event[] /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) type EqxConnection(client: IDocumentClient, ?readRetryPolicy: IRetryPolicy, ?writeRetryPolicy: IRetryPolicy) = member __.Client = client - member __.ReadRetryPolicy = readRetryPolicy + member __.TipRetryPolicy = readRetryPolicy + member __.QueryRetryPolicy = readRetryPolicy member __.WriteRetryPolicy = writeRetryPolicy /// Defines the policies in force regarding how to constrain query responses @@ -670,51 +666,39 @@ type EqxBatchingPolicy member __.MaxRequests = maxRequests type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = - let eventTypesPredicate resolved = - let acc = System.Collections.Generic.HashSet() - fun (x: IIndexedEvent) -> - acc.Add x.EventType |> ignore - resolved acc - let (|Satisfies|_|) predicate (xs:IIndexedEvent[]) = - match Array.tryFindIndexBack predicate xs with + let (|FromUnfold|_|) (tryDecode: #IEvent -> 'event option) (isOrigin: 'event -> bool) (xs:#IEvent[]) : Option<'event[]> = + match Array.tryFindIndexBack (tryDecode >> Option.exists isOrigin) xs with | None -> None - | Some index -> Array.sub xs index (xs.Length - index) |> Some - let loadBackwardsStopping log predicate stream: Async = async { - let! pos, events = Query.walk log conn.Client conn.ReadRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward stream None predicate + | Some index -> xs |> Seq.skip index |> Seq.choose tryDecode |> Array.ofSeq |> Some + member __.LoadBackwardsStopping log stream (tryDecode,isOrigin): Async = async { + let! pos, events = Query.walk log conn.Client conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward stream None (tryDecode,isOrigin) Array.Reverse events return Token.create stream pos, events } - member __.LoadBackwardsStopping log predicate stream: Async = - let predicate = eventTypesPredicate predicate - loadBackwardsStopping log predicate stream - member __.Read log batchingOverride stream direction startPos predicate: Async = async { - let batching = defaultArg batchingOverride batching - let! pos, events = Query.walk log conn.Client conn.ReadRetryPolicy batching.MaxItems batching.MaxRequests direction stream startPos predicate + member __.Read log stream direction startPos (tryDecode,isOrigin) : Async = async { + let! pos, events = Query.walk log conn.Client conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests direction stream startPos (tryDecode,isOrigin) return Token.create stream pos, events } - member __.LoadFromProjectionsOrRollingSnapshots log predicate (stream,maybePos): Async = async { - let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream maybePos - let predicate = eventTypesPredicate predicate + member __.LoadFromUnfoldsOrRollingSnapshots log (stream,maybePos) (tryDecode,isOrigin): Async = async { + let! res = Tip.tryLoad log conn.TipRetryPolicy conn.Client stream maybePos match res with - | Index.Result.NotFound -> return Token.create stream Position.FromKnownEmpty, Array.empty - | Index.Result.NotModified -> return invalidOp "Not handled" - | Index.Result.Found (pos, Satisfies predicate enoughEvents) -> return Token.create stream pos, enoughEvents - | _ -> return! loadBackwardsStopping log predicate stream } + | Tip.Result.NotFound -> return Token.create stream Position.FromKnownEmpty, Array.empty + | Tip.Result.NotModified -> return invalidOp "Not handled" + | Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return Token.create stream pos, span + | _ -> return! __.LoadBackwardsStopping log stream (tryDecode,isOrigin) } member __.GetPosition(log, stream, ?pos): Async = async { - let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream pos + let! res = Tip.tryLoad log conn.TipRetryPolicy conn.Client stream pos match res with - | Index.Result.NotFound -> return Token.create stream Position.FromKnownEmpty - | Index.Result.NotModified -> return Token.create stream pos.Value - | Index.Result.Found (pos, _projectionsAndEvents) -> return Token.create stream pos } - member __.LoadFromToken log (stream,pos) predicate: Async = async { - let predicate = eventTypesPredicate predicate - let! res = Index.tryLoad log None(* TODO conn.ReadRetryPolicy*) conn.Client stream (Some pos) + | Tip.Result.NotFound -> return Token.create stream Position.FromKnownEmpty + | Tip.Result.NotModified -> return Token.create stream pos.Value + | Tip.Result.Found (pos, _unfoldsAndEvents) -> return Token.create stream pos } + member __.LoadFromToken(log, (stream,pos), (tryDecode, isOrigin)): Async> = async { + let! res = Tip.tryLoad log conn.TipRetryPolicy conn.Client stream (Some pos) match res with - | Index.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.FromKnownEmpty,Array.empty) - | Index.Result.NotModified -> return LoadFromTokenResult.Unchanged - | Index.Result.Found (pos, Satisfies predicate enoughEvents) -> return LoadFromTokenResult.Found (Token.create stream pos, enoughEvents) - | _ -> - let! res = __.Read log None stream Direction.Forward (Some pos) (fun _ -> false) - return LoadFromTokenResult.Found res } - member __.Sync log stream (expectedVersion, batch: Store.WipBatch): Async = async { + | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.FromKnownEmpty,Array.empty) + | Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged + | Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return LoadFromTokenResult.Found (Token.create stream pos, span) + | _ -> let! res = __.Read log stream Direction.Forward (Some pos) (tryDecode,isOrigin) + return LoadFromTokenResult.Found res } + member __.Sync log stream (expectedVersion, batch: Store.Tip): Async = async { let! wr = Sync.batch log conn.WriteRetryPolicy conn.Client stream (expectedVersion,batch) match wr with | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (Token.create stream pos',events) @@ -724,22 +708,17 @@ type EqxGateway(conn : EqxConnection, batching : EqxBatchingPolicy) = type private Category<'event, 'state>(gateway : EqxGateway, codec : UnionCodec.IUnionEncoder<'event, byte[]>) = let tryDecode (x: #IEvent) = codec.TryDecode { caseName = x.EventType; payload = x.Data } let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: IIndexedEvent seq) : 'state = Seq.choose tryDecode events |> fold initial - let respond (fold: 'state -> 'event seq -> 'state) initial events : 'state = - fold initial (Seq.choose tryDecode events) - member __.Load includeProjections collectionStream fold initial predicate (log : ILogger): Async = async { + member __.Load includeUnfolds collectionStream fold initial isOrigin (log : ILogger): Async = async { let! token, events = - if not includeProjections then gateway.LoadBackwardsStopping log predicate collectionStream - else gateway.LoadFromProjectionsOrRollingSnapshots log predicate (collectionStream,None) - return token, respond fold initial events } - member __.LoadFromToken (Token.Unpack streamPos, state: 'state as current) fold predicate (log : ILogger): Async = async { - let! res = gateway.LoadFromToken log streamPos predicate + if not includeUnfolds then gateway.LoadBackwardsStopping log collectionStream (tryDecode,isOrigin) + else gateway.LoadFromUnfoldsOrRollingSnapshots log (collectionStream,None) (tryDecode,isOrigin) + return token, fold initial events } + member __.LoadFromToken (Token.Unpack streamPos, state: 'state as current) fold isOrigin (log : ILogger): Async = async { + let! res = gateway.LoadFromToken(log, streamPos, (tryDecode,isOrigin)) match res with | LoadFromTokenResult.Unchanged -> return current - | LoadFromTokenResult.Found (token',events) -> return token', respond fold state events } - member __.Sync (Token.Unpack (stream,pos), state as current) (project: 'state -> 'event seq -> 'event seq) - (expectedVersion : int64 option, events, state') - fold predicate log - : Async> = async { + | LoadFromTokenResult.Found (token', events') -> return token', fold state events' } + member __.Sync(Token.Unpack (stream,pos), state as current, expectedVersion, events, unfold, fold, isOrigin, log): Async> = async { let encodeEvent (x : 'event) : IEvent = let e = codec.Encode x { new IEvent with @@ -747,14 +726,14 @@ type private Category<'event, 'state>(gateway : EqxGateway, codec : UnionCodec.I member __.Data = e.payload member __.Meta = null } let state' = fold state (Seq.ofList events) - let eventsEncoded, projectionsEncoded = Seq.map encodeEvent events |> Array.ofSeq, Seq.map encodeEvent (project state' events) + let eventsEncoded, projectionsEncoded = Seq.map encodeEvent events |> Array.ofSeq, Seq.map encodeEvent (unfold state' events) let baseIndex = pos.index + int64 (List.length events) - let projections = Sync.mkProjections baseIndex projectionsEncoded + let projections = Sync.mkUnfold baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections let! res = gateway.Sync log stream (expectedVersion,batch) match res with - | InternalSyncResult.Conflict (token',events') -> return Storage.SyncResult.Conflict (async { return token', respond fold state events' }) - | InternalSyncResult.ConflictUnknown _token' -> return Storage.SyncResult.Conflict (__.LoadFromToken current fold predicate log) + | InternalSyncResult.Conflict (token',TryDecodeFold fold state events') -> return Storage.SyncResult.Conflict (async { return token', events' }) + | InternalSyncResult.ConflictUnknown _token' -> return Storage.SyncResult.Conflict (__.LoadFromToken current fold isOrigin log) | InternalSyncResult.Written token' -> return Storage.SyncResult.Written (token', state') } module Caching = @@ -817,17 +796,17 @@ module Caching = CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _ type private Folder<'event, 'state> - ( category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, - predicate : System.Collections.Generic.HashSet -> bool, - mkCollectionStream : string -> Store.CollectionStream, - // Whether or not a projection function is supplied controls whether reads consult the index or not - ?project: ('state -> 'event seq -> 'event seq), + ( category: Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, + isOrigin: 'event -> bool, + mkCollectionStream: string -> Store.CollectionStream, + // Whether or not an `unfold` function is supplied controls whether reads do a point read before querying + ?unfold: ('state -> 'event list -> 'event seq), ?readCache) = interface ICategory<'event, 'state> with member __.Load streamName (log : ILogger): Async = let collStream = mkCollectionStream streamName - let batched = category.Load (Option.isSome project) collStream fold initial predicate log - let cached tokenAndState = category.LoadFromToken tokenAndState fold predicate log + let batched = category.Load (Option.isSome unfold) collStream fold initial isOrigin log + let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log match readCache with | None -> batched | Some (cache : Caching.Cache, prefix : string) -> @@ -836,8 +815,8 @@ type private Folder<'event, 'state> | Some tokenAndState -> cached tokenAndState member __.TrySync (log : ILogger) (Token.Unpack (_stream,pos) as streamToken,state) (events : 'event list) : Async> = async { - let! syncRes = category.Sync (streamToken,state) (defaultArg project (fun _ _ -> Seq.empty)) (Some pos.index, events, fold state events) fold predicate log - match syncRes with + let! res = category.Sync((streamToken,state), Some pos.index, events, (defaultArg unfold (fun _ _ -> Seq.empty)), fold, isOrigin, log) + match res with | Storage.SyncResult.Conflict resync -> return Storage.SyncResult.Conflict resync | Storage.SyncResult.Written (token',state') -> return Storage.SyncResult.Written (token',state') } @@ -855,22 +834,21 @@ type EqxStore(gateway: EqxGateway, collections: EqxCollections) = [] type CachingStrategy = - /// Retain a single set of State, together with the associated etags + /// Retain a single 'state per streamName, together with the associated etag /// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to - /// track all relevant data in the state, and/or have the `project` function ensure all relevant events get indexed quickly + /// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip | SlidingWindow of Caching.Cache * window: TimeSpan [] type AccessStrategy<'event,'state> = - /// Require a configurable Set of Event Types to have been accumulated from a) projections + b) searching backward in the event stream - /// until `resolved` deems it so; fold foward based on those - /// When saving, `project` the 'state to seed the set of events that `resolved` will see first - | Projections of resolved: (ISet -> bool) * project: ('state -> 'event seq) + /// Allow events that pass the `isOrigin` test to be used in lieu of folding all the events from the start of the stream + /// When saving, `unfold` the 'state, saving in the Tip + | Unfolded of isOrigin: ('event -> bool) * unfold: ('state -> 'event seq) /// Simplified version of projection that only has a single Projection Event Type /// Provides equivalent performance to Projections, just simplified function signatures - | Projection of eventType: string * ('state -> 'event) - /// Simplified version - | AnyKnownEventType of eventTypes: System.Collections.Generic.ISet + | Snapshot of isValid: ('event -> bool) * generate: ('state -> 'event) + /// Trust every event type as being an origin + | AnyKnownEventType type EqxStreamBuilder<'event, 'state>(store : EqxStore, codec, fold, initial, ?access, ?caching) = member __.Create streamName : Equinox.IStream<'event, 'state> = @@ -878,21 +856,14 @@ type EqxStreamBuilder<'event, 'state>(store : EqxStore, codec, fold, initial, ?a match caching with | None -> None | Some (CachingStrategy.SlidingWindow(cache, _)) -> Some(cache, null) - let predicate, projectOption = + let isOrigin, projectOption = match access with | None -> (fun _ -> false), None - | Some (AccessStrategy.Projections (predicate,project)) -> - predicate, - Some (fun state _events -> project state) - | Some (AccessStrategy.Projection (et,compact)) -> - (fun (ets: System.Collections.Generic.HashSet) -> ets.Contains et), - Some (fun state _events -> seq [compact state]) - | Some (AccessStrategy.AnyKnownEventType knownEventTypes) -> - (fun (ets: System.Collections.Generic.HashSet) -> knownEventTypes.Overlaps ets), - Some (fun _ events -> Seq.last events |> Seq.singleton) + | Some (AccessStrategy.Unfolded (isOrigin, unfold)) -> isOrigin, Some (fun state _events -> unfold state) + | Some (AccessStrategy.Snapshot (isValid,generate)) -> isValid, Some (fun state _events -> seq [generate state]) + | Some (AccessStrategy.AnyKnownEventType) -> (fun _ -> true), Some (fun _ events -> Seq.last events |> Seq.singleton) let category = Category<'event, 'state>(store.Gateway, codec) - let folder = Folder<'event, 'state>(category, fold, initial, predicate, store.Collections.CollectionForStream, ?project=projectOption, ?readCache = readCacheOption) - + let folder = Folder<'event, 'state>(category, fold, initial, isOrigin, store.Collections.CollectionForStream, ?unfold=projectOption, ?readCache = readCacheOption) let category : ICategory<_,_> = match caching with | None -> folder :> _ @@ -965,7 +936,7 @@ type EqxConnector match tags with None -> () | Some tags -> for key, value in tags do yield sprintf "%s=%s" key value } let sanitizedName = name.Replace('\'','_').Replace(':','_') // sic; Align with logging for ES Adapter let client = new Client.DocumentClient(uri, key, connPolicy, Nullable(defaultArg defaultConsistencyLevel ConsistencyLevel.Session)) - log.ForContext("Uri", uri).Information("Eqx connecting to Cosmos with Connection Name {connectionName}", sanitizedName) + log.ForContext("Uri", uri).Information("EqxCosmos connecting to Cosmos with Connection Name {connectionName}", sanitizedName) do! client.OpenAsync() |> Async.AwaitTaskCorrect return client :> IDocumentClient } @@ -1030,7 +1001,7 @@ type EqxContext match maxCount with | Some limit -> maxCountPredicate limit | None -> fun _ -> false - return! gateway.Read logger None stream direction startPos isOrigin } + return! gateway.Read logger stream direction startPos (Some,isOrigin) } /// Establishes the current position of the stream in as effficient a manner as possible /// (The ideal situation is that the preceding token is supplied as input in order to avail of 1RU low latency state checks) diff --git a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs index 8e0622993..63da90de1 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosCoreIntegration.fs @@ -101,7 +101,7 @@ type Tests(testOutputHelper) = // If a fail triggers a rerun, we need to dump the previous log entries captured capture.Clear() let! pos = Events.getNextIndex ctx streamName - test <@ [EqxAct.IndexNotFound] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotFound] = capture.ExternalCalls @> 0L =! pos verifyRequestChargesMax 1 // for a 404 by definition capture.Clear() @@ -132,7 +132,7 @@ type Tests(testOutputHelper) = capture.Clear() let! res = Events.getNextIndex ctx streamName - test <@ [EqxAct.Index] = capture.ExternalCalls @> + test <@ [EqxAct.Tip] = capture.ExternalCalls @> verifyRequestChargesMax 2 capture.Clear() pos =! res @@ -148,12 +148,12 @@ type Tests(testOutputHelper) = capture.Clear() let! pos = ctx.Sync(stream,?position=None) - test <@ [EqxAct.Index] = capture.ExternalCalls @> + test <@ [EqxAct.Tip] = capture.ExternalCalls @> verifyRequestChargesMax 50 // 41 observed // for a 200, you'll pay a lot (we omitted to include the position that NonIdempotentAppend yielded) capture.Clear() let! _pos = ctx.Sync(stream,pos) - test <@ [EqxAct.IndexNotModified] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotModified] = capture.ExternalCalls @> verifyRequestChargesMax 1 // for a 302 by definition - when an etag IfNotMatch is honored, you only pay one RU capture.Clear() } @@ -205,7 +205,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res - test <@ List.replicate 2 EqxAct.SliceForward @ [EqxAct.BatchForward] = capture.ExternalCalls @> + test <@ List.replicate 2 EqxAct.ResponseForward @ [EqxAct.QueryForward] = capture.ExternalCalls @> verifyRequestChargesMax 8 // observed 6.14 // was 3 } @@ -222,7 +222,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res // 2 items atm - test <@ [EqxAct.SliceForward; EqxAct.SliceForward; EqxAct.BatchForward] = capture.ExternalCalls @> + test <@ [EqxAct.ResponseForward; EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> verifyRequestChargesMax 7 // observed 6.14 // was 6 } @@ -239,7 +239,7 @@ type Tests(testOutputHelper) = verifyCorrectEvents 1L expected res // TODO [implement and] prove laziness - test <@ List.replicate 2 EqxAct.SliceForward @ [EqxAct.BatchForward] = capture.ExternalCalls @> + test <@ List.replicate 2 EqxAct.ResponseForward @ [EqxAct.QueryForward] = capture.ExternalCalls @> verifyRequestChargesMax 10 // observed 8.99 // was 3 } @@ -259,7 +259,7 @@ type Tests(testOutputHelper) = verifyCorrectEventsBackward 4L expected res - test <@ List.replicate 3 EqxAct.SliceBackward @ [EqxAct.BatchBackward] = capture.ExternalCalls @> + test <@ List.replicate 3 EqxAct.ResponseBackward @ [EqxAct.QueryBackward] = capture.ExternalCalls @> verifyRequestChargesMax 10 // observed 8.98 // was 3 } diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs index 152a4ade1..d652c7c4c 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixturesInfrastructure.fs @@ -52,35 +52,38 @@ module SerilogHelpers = | _ -> None open Equinox.Cosmos [] - type EqxAct = Append | Resync | Conflict | SliceForward | SliceBackward | BatchForward | BatchBackward | Index | IndexNotFound | IndexNotModified | SliceWaste - let (|EqxAction|) (evt : Equinox.Cosmos.Log.Event) = - match evt with - | Log.WriteSuccess _ -> EqxAct.Append - | Log.WriteResync _ -> EqxAct.Resync - | Log.WriteConflict _ -> EqxAct.Conflict - | Log.Slice (Direction.Forward,{count = 0}) -> EqxAct.SliceWaste // TODO remove, see comment where these are emitted - | Log.Slice (Direction.Forward,_) -> EqxAct.SliceForward - | Log.Slice (Direction.Backward,{count = 0}) -> EqxAct.SliceWaste // TODO remove, see comment where these are emitted - | Log.Slice (Direction.Backward,_) -> EqxAct.SliceBackward - | Log.Batch (Direction.Forward,_,_) -> EqxAct.BatchForward - | Log.Batch (Direction.Backward,_,_) -> EqxAct.BatchBackward - | Log.Index _ -> EqxAct.Index - | Log.IndexNotFound _ -> EqxAct.IndexNotFound - | Log.IndexNotModified _ -> EqxAct.IndexNotModified + type EqxAct = + | Tip | TipNotFound | TipNotModified + | ResponseForward | ResponseBackward | ResponseWaste + | QueryForward | QueryBackward + | Append | Resync | Conflict + let (|EqxAction|) = function + | Log.Tip _ -> EqxAct.Tip + | Log.TipNotFound _ -> EqxAct.TipNotFound + | Log.TipNotModified _ -> EqxAct.TipNotModified + | Log.Response (Direction.Forward, {count = 0}) -> EqxAct.ResponseWaste // TODO remove, see comment where these are emitted + | Log.Response (Direction.Forward,_) -> EqxAct.ResponseForward + | Log.Response (Direction.Backward, {count = 0}) -> EqxAct.ResponseWaste // TODO remove, see comment where these are emitted + | Log.Response (Direction.Backward,_) -> EqxAct.ResponseBackward + | Log.Query (Direction.Forward,_,_) -> EqxAct.QueryForward + | Log.Query (Direction.Backward,_,_) -> EqxAct.QueryBackward + | Log.SyncSuccess _ -> EqxAct.Append + | Log.SyncResync _ -> EqxAct.Resync + | Log.SyncConflict _ -> EqxAct.Conflict let inline (|Stats|) ({ ru = ru }: Equinox.Cosmos.Log.Measurement) = ru - let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosSliceRc|) = function - | Log.Index (Stats s) - | Log.IndexNotFound (Stats s) - | Log.IndexNotModified (Stats s) - | Log.Batch (_,_, (Stats s)) -> CosmosReadRc s - | Log.WriteSuccess (Stats s) - | Log.WriteConflict (Stats s) -> CosmosWriteRc s - | Log.WriteResync (Stats s) -> CosmosResyncRc s + let (|CosmosReadRc|CosmosWriteRc|CosmosResyncRc|CosmosResponseRc|) = function + | Log.Tip (Stats s) + | Log.TipNotFound (Stats s) + | Log.TipNotModified (Stats s) // slices are rolled up into batches so be sure not to double-count - | Log.Slice (_,Stats s) -> CosmosSliceRc s + | Log.Response (_,Stats s) -> CosmosResponseRc s + | Log.Query (_,_, (Stats s)) -> CosmosReadRc s + | Log.SyncSuccess (Stats s) + | Log.SyncConflict (Stats s) -> CosmosWriteRc s + | Log.SyncResync (Stats s) -> CosmosResyncRc s /// Facilitates splitting between events with direct charges vs synthetic events Equinox generates to avoid double counting let (|CosmosRequestCharge|EquinoxChargeRollup|) = function - | CosmosSliceRc _ -> + | CosmosResponseRc _ -> EquinoxChargeRollup | CosmosReadRc rc | CosmosWriteRc rc | CosmosResyncRc rc as e -> CosmosRequestCharge (e,rc) diff --git a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs index 1ccbbc205..494a7a953 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosIntegration.fs @@ -21,12 +21,12 @@ module Cart = let projection = "Compacted",snd snapshot let createServiceWithProjection connection batchSize log = let store = createEqxStore connection batchSize - let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection projection).Create + let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Snapshot snapshot).Create Backend.Cart.Service(log, resolveStream) let createServiceWithProjectionAndCaching connection batchSize log cache = let store = createEqxStore connection batchSize let sliding20m = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Projection projection, sliding20m).Create + let resolveStream = EqxStreamBuilder(store, codec, fold, initial, AccessStrategy.Snapshot snapshot, sliding20m).Create Backend.Cart.Service(log, resolveStream) module ContactPreferences = @@ -37,7 +37,7 @@ module ContactPreferences = let resolveStream = EqxStreamBuilder(gateway, codec, fold, initial).Create Backend.ContactPreferences.Service(log, resolveStream) let createService createGateway log = - let resolveStream = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType (System.Collections.Generic.HashSet ["contactPreferencesChanged"])).Create + let resolveStream = EqxStreamBuilder(createGateway 1, codec, fold, initial, AccessStrategy.AnyKnownEventType).Create Backend.ContactPreferences.Service(log, resolveStream) #nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests) @@ -69,7 +69,7 @@ type Tests(testOutputHelper) = // The command processing should trigger only a single read and a single write call let addRemoveCount = 6 do! addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId service addRemoveCount - test <@ [EqxAct.SliceWaste; EqxAct.BatchBackward; EqxAct.Append] = capture.ExternalCalls @> + test <@ [EqxAct.ResponseWaste; EqxAct.QueryBackward; EqxAct.Append] = capture.ExternalCalls @> // Restart the counting capture.Clear() @@ -80,7 +80,7 @@ type Tests(testOutputHelper) = // Need to read 4 batches to read 11 events in batches of 3 let expectedBatches = ceil(float expectedEventCount/float batchSize) |> int - test <@ List.replicate (expectedBatches-1) EqxAct.SliceBackward @ [EqxAct.SliceBackward; EqxAct.BatchBackward] = capture.ExternalCalls @> + test <@ List.replicate (expectedBatches-1) EqxAct.ResponseBackward @ [EqxAct.ResponseBackward; EqxAct.QueryBackward] = capture.ExternalCalls @> } [] @@ -163,7 +163,7 @@ type Tests(testOutputHelper) = && [EqxAct.Resync] = c2 @> } - let singleBatchBackwards = [EqxAct.SliceBackward; EqxAct.BatchBackward] + let singleBatchBackwards = [EqxAct.ResponseBackward; EqxAct.QueryBackward] let batchBackwardsAndAppend = singleBatchBackwards @ [EqxAct.Append] [] @@ -186,7 +186,7 @@ type Tests(testOutputHelper) = let! result = service.Read email test <@ value = result @> - test <@ [EqxAct.Index; EqxAct.Append; EqxAct.Index] = capture.ExternalCalls @> + test <@ [EqxAct.Tip; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> } [] @@ -203,17 +203,17 @@ type Tests(testOutputHelper) = let! _ = service2.Read cartId // ... should see a single read as we are writes are cached - test <@ [EqxAct.IndexNotFound; EqxAct.Append; EqxAct.Index] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotFound; EqxAct.Append; EqxAct.Tip] = capture.ExternalCalls @> // Add two more - the roundtrip should only incur a single read capture.Clear() do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 - test <@ [EqxAct.Index; EqxAct.Append] = capture.ExternalCalls @> + test <@ [EqxAct.Tip; EqxAct.Append] = capture.ExternalCalls @> // While we now have 12 events, we should be able to read them with a single call capture.Clear() let! _ = service2.Read cartId - test <@ [EqxAct.Index] = capture.ExternalCalls @> + test <@ [EqxAct.Tip] = capture.ExternalCalls @> } [] @@ -231,17 +231,17 @@ type Tests(testOutputHelper) = let! _ = service2.Read cartId // ... should see a single Cached Indexed read given writes are cached and writer emits etag - test <@ [EqxAct.IndexNotFound; EqxAct.Append; EqxAct.IndexNotModified] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotFound; EqxAct.Append; EqxAct.TipNotModified] = capture.ExternalCalls @> // Add two more - the roundtrip should only incur a single read, which should be cached by virtue of being a second one in successono capture.Clear() do! addAndThenRemoveItemsManyTimes context cartId skuId service1 1 - test <@ [EqxAct.IndexNotModified; EqxAct.Append] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotModified; EqxAct.Append] = capture.ExternalCalls @> // While we now have 12 events, we should be able to read them with a single call capture.Clear() let! _ = service2.Read cartId let! _ = service2.Read cartId // First is cached because writer emits etag, second remains cached - test <@ [EqxAct.IndexNotModified; EqxAct.IndexNotModified] = capture.ExternalCalls @> + test <@ [EqxAct.TipNotModified; EqxAct.TipNotModified] = capture.ExternalCalls @> } \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs index 06b795722..63fe8565c 100644 --- a/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs +++ b/tests/Equinox.Cosmos.Integration/JsonConverterTests.fs @@ -36,7 +36,7 @@ type Base64ZipUtf8Tests() = [] let ``serializes, achieving compression`` () = let encoded = unionEncoder.Encode(A { embed = String('x',5000) }) - let e : Store.Projection = + let e : Store.Unfold = { i = 42L t = encoded.caseName d = encoded.payload @@ -53,14 +53,14 @@ type Base64ZipUtf8Tests() = if hasNulls then () else let encoded = unionEncoder.Encode value - let e : Store.Projection = + let e : Store.Unfold = { i = 42L t = encoded.caseName d = encoded.payload m = null } let ser = JsonConvert.SerializeObject(e) test <@ ser.Contains("\"d\":\"") @> - let des = JsonConvert.DeserializeObject(ser) + let des = JsonConvert.DeserializeObject(ser) let d : Equinox.UnionCodec.EncodedUnion<_> = { caseName = des.t; payload=des.d } let decoded = unionEncoder.Decode d test <@ value = decoded @> \ No newline at end of file