diff --git a/CHANGELOG.md b/CHANGELOG.md index e840fc02d..dc6e2e9c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,7 +29,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - Rename `Equinox.Cosmos.Connector` -> `Equinox.CosmosStore.CosmosStoreClientFactory` - Remove exceptions from 304/404 paths when reading Tip [#257](https://github.com/jet/equinox/pull/257) - Reorganized `QueryRetryPolicy` to handle `IAsyncEnumerable` coming in Cosmos SDK V4 [#246](https://github.com/jet/equinox/pull/246) :pray: [@ylibrach](https://github.com/ylibrach) - - Added Secondary store fallback for Event loading, enabling Streams to be hot-migrated (archived to a secondary/clone, then pruned from the primary/active) between Primary and Secondary stores [#247](https://github.com/jet/equinox/pull/247) + - Added Secondary store fallback for Event loading, enabling Streams to be hot-migrated (archived to a secondary/clone, then pruned from the primary/active) between Primary and Secondary stores [#247](https://github.com/jet/equinox/pull/247), [259](https://github.com/jet/equinox/pull/259) - Replaced `BatchingPolicy`, `RetryPolicy` with `TipOptions`, `QueryOptions` to better align with Cosmos SDK V4 [#253](https://github.com/jet/equinox/pull/253) - Added support for accumulating events in Tip [#251](https://github.com/jet/equinox/pull/251) see also [#110](https://github.com/jet/equinox/pull/110) - target `EventStore.Client` v `20.6` (instead of v `5.0.x`) [#224](https://github.com/jet/equinox/pull/224) diff --git a/src/Equinox.CosmosStore/CosmosStore.fs b/src/Equinox.CosmosStore/CosmosStore.fs index 6a49ea65c..7121dd1ae 100644 --- a/src/Equinox.CosmosStore/CosmosStore.fs +++ b/src/Equinox.CosmosStore/CosmosStore.fs @@ -449,7 +449,8 @@ function sync(req, expIndex, expEtag, maxEventsInTip, maxStringifyLen) { tip.e = []; } // TODO Carry forward `u` items not present in `batch`, together with supporting catchup events from preceding batches - // Replace all the unfolds // TODO: should remove only unfolds being superseded + // Replace all the unfolds + // TODO: should remove only unfolds being superseded tip.u = req.u; // As we've mutated the document in a manner that can conflict with other writers, our write needs to be contingent on no competing updates having taken place const isAccepted = __.replaceDocument(tip._self, tip, { etag: tip._etag }, callback); @@ -521,11 +522,11 @@ module internal Sync = |> match result with | Result.Written pos -> Log.prop "nextExpectedVersion" pos >> Log.event (Log.SyncSuccess (mkMetric ru)) - | Result.ConflictUnknown pos -> - Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru)) - | Result.Conflict (pos, xs) -> + | Result.ConflictUnknown pos' -> + Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncConflict (mkMetric ru)) + | Result.Conflict (pos', xs) -> (if verbose then Log.propData "conflicts" xs else id) - >> Log.prop "nextExpectedVersion" pos >> propConflict >> Log.event (Log.SyncResync(mkMetric ru)) + >> Log.prop "nextExpectedVersion" pos' >> propConflict >> Log.event (Log.SyncResync (mkMetric ru)) log.Information("EqxCosmos {action:l} {stream} {count}+{ucount} {ms:f1}ms {ru}RU {bytes:n0}b {exp}", "Sync", stream, count, req.u.Length, (let e = t.Elapsed in e.TotalMilliseconds), ru, bytes, exp) return result } @@ -650,7 +651,7 @@ module internal Tip = let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru) return ru, res } - type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[] + type [] Result = NotModified | NotFound | Found of Position * i : int64 * ITimelineEvent[] /// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with Result.NotModified let tryLoad (log : ILogger) retryPolicy containerStream (maybePos: Position option, maxIndex): Async = async { let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get containerStream maybePos) log @@ -659,7 +660,7 @@ module internal Tip = | ReadResult.NotFound -> return Result.NotFound | ReadResult.Found tip -> let minIndex = maybePos |> Option.map (fun x -> x.index) - return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds(tip, ?maxIndex = maxIndex, ?minIndex = minIndex) |> Array.ofSeq) } + return Result.Found (Position.fromTip tip, tip.i, Enum.EventsAndUnfolds(tip, ?maxIndex = maxIndex, ?minIndex = minIndex) |> Array.ofSeq) } module internal Query = @@ -686,7 +687,7 @@ module internal Tip = let queryString = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c %s ORDER BY c.i %s" whereClause order let prams = Seq.map snd args (QueryDefinition queryString, prams) ||> Seq.fold (fun q wp -> q |> wp) - log.Debug("EqxCosmos Query {query} on {stream}", query.QueryText, stream) + log.Debug("EqxCosmos Query {query} on {stream}; minIndex={minIndex} maxIndex={maxIndex}", query.QueryText, stream, minIndex, maxIndex) let qro = QueryRequestOptions(PartitionKey = Nullable (PartitionKey stream), MaxItemCount = Nullable maxItems) container.GetItemQueryIterator(query, requestOptions = qro) @@ -737,9 +738,9 @@ module internal Tip = used, dropped [] - type ScanResult<'event> = { found : bool; index : int64; next : int64; maybeTipPos : Position option; events : 'event[] } + type ScanResult<'event> = { found : bool; minIndex : int64; next : int64; maybeTipPos : Position option; events : 'event[] } - let scanTip (tryDecode: #IEventData -> 'event option, isOrigin: 'event -> bool) (pos : Position, xs: #ITimelineEvent[]) : ScanResult<'event> = + let scanTip (tryDecode: #IEventData -> 'event option, isOrigin: 'event -> bool) (pos : Position, i : int64, xs: #ITimelineEvent[]) : ScanResult<'event> = let items = ResizeArray() let isOrigin' e = match tryDecode e with @@ -748,7 +749,7 @@ module internal Tip = items.Insert(0, e) // WalkResult always renders events ordered correctly - here we're aiming to align with Enum.EventsAndUnfolds isOrigin e let f, e = xs |> Seq.tryFindBack isOrigin' |> Option.isSome, items.ToArray() - { found = f; maybeTipPos = Some pos; index = pos.index; next = pos.index + 1L; events = e } + { found = f; maybeTipPos = Some pos; minIndex = i; next = pos.index + 1L; events = e } // Yields events in ascending Index order let scan<'event> (log : ILogger) (container,stream) includeTip maxItems maxRequests direction @@ -791,7 +792,7 @@ module internal Tip = let minMax = (None, raws) ||> Array.fold (fun acc x -> let i = x.Index in Some (match acc with None -> i, i | Some (n, x) -> min n i, max x i)) let version = match minMax with Some (_, max) -> max + 1L | None -> 0L log |> logQuery direction maxItems stream t (responseCount,raws) version ru - return minMax |> Option.map (fun (i,m) -> { found = found; index = i; next = m + 1L; maybeTipPos = maybeTipPos; events = decoded }) } + return minMax |> Option.map (fun (i,m) -> { found = found; minIndex = i; next = m + 1L; maybeTipPos = maybeTipPos; events = decoded }) } let walkLazy<'event> (log : ILogger) (container,stream) maxItems maxRequests (tryDecode : ITimelineEvent -> 'event option, isOrigin: 'event -> bool) @@ -845,17 +846,18 @@ module internal Tip = /// 2) Querying Secondary for predecessors of what's obtained from 1 let load (log : ILogger) (minIndex, maxIndex) (tip : ScanResult<'event> option) (primary : int64 option * int64 option -> Async option>) - (secondary : (int64 option * int64 option -> Async option>) option) + // Choice1Of2 -> indicates whether it's acceptable to ignore missing events; Choice2Of2 -> Fallback store + (secondary : Choice Async option>>) : Async = async { let minI = defaultArg minIndex 0L match tip with - | Some { index = i; maybeTipPos = Some p; events = e } when i <= minI -> return p, e | Some { found = true; maybeTipPos = Some p; events = e } -> return p, e + | Some { minIndex = i; maybeTipPos = Some p; events = e } when i <= minI -> return p, e | _ -> let i, events, pos = match tip with - | Some { index = i; maybeTipPos = p; events = e } -> Some i, e, p + | Some { minIndex = i; maybeTipPos = p; events = e } -> Some i, e, p | None -> maxIndex, Array.empty, None let! primary = primary (minIndex, i) let events, pos = @@ -869,22 +871,22 @@ module internal Tip = .Debug(message) match primary, secondary with - | Some { index = i }, _ when i <= minI -> return pos, events // primary had required earliest event Index, no need to look at secondary | Some { found = true }, _ -> return pos, events // origin found in primary, no need to look in secondary - | _, None -> - // TODO Add TipOptions parameter to opt-into this vs throwing + | Some { minIndex = i }, _ when i <= minI -> return pos, events // primary had required earliest event Index, no need to look at secondary + | _, Choice1Of2 allowMissing -> logMissing (minIndex, i) "Origin event not found; no secondary container supplied" - return pos, events - | _, Some secondary -> + if allowMissing then return pos, events + else return failwithf "Origin event not found; no secondary container supplied" + | _, Choice2Of2 secondary -> - let maxIndex = match primary with Some p -> Some p.index | None -> maxIndex // if no batches in primary, high water mark from tip is max + let maxIndex = match primary with Some p -> Some p.minIndex | None -> maxIndex // if no batches in primary, high water mark from tip is max let! secondary = secondary (minIndex, maxIndex) let events = match secondary with | Some s -> Array.append s.events events | None -> events match secondary with - | Some { index = i } when i <= minI -> () + | Some { minIndex = i } when i <= minI -> () | Some { found = true } -> () | _ -> logMissing (minIndex, maxIndex) "Origin event not found in secondary container" return pos, events } @@ -901,16 +903,16 @@ module Delete = let log = log |> Log.prop "stream" stream let deleteItem id count : Async = async { let ro = ItemRequestOptions(EnableContentResponseOnWrite = Nullable false) // https://devblogs.microsoft.com/cosmosdb/enable-content-response-on-write/ - let! t, res = container.DeleteItemAsync(id, PartitionKey stream, ro, cancellationToken = ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time + let! t, res = container.DeleteItemAsync(id, PartitionKey stream, ro, ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let rc, ms = res.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds) let reqMetric : Log.Measurement = { stream = stream; interval = t; bytes = -1; count = count; ru = rc } let log = let evt = Log.Delete reqMetric in log |> Log.event evt log.Information("EqxCosmos {action:l} {id} {ms}ms rc={ru}", "Delete", id, ms, rc) - return res.RequestCharge + return rc } let log = log |> Log.prop "beforePos" beforePos let query : FeedIterator = - let qro = QueryRequestOptions(PartitionKey = Nullable(PartitionKey stream), MaxItemCount = Nullable maxItems) + let qro = QueryRequestOptions(PartitionKey = Nullable (PartitionKey stream), MaxItemCount = Nullable maxItems) container.GetItemQueryIterator<_>(QueryDefinition "SELECT c.id, c.i, c.n FROM c", requestOptions = qro) let mapPage i (t : StopwatchInterval) (page : FeedResponse) = let batches, rc, ms = Array.ofSeq page, page.RequestCharge, (let e = t.Elapsed in e.TotalMilliseconds) @@ -942,7 +944,6 @@ module Delete = let! pt, outcomes = let isTip (x : BatchIndices) = x.id = Tip.WellKnownDocumentId let isRelevant x = isTip x || x.i < beforePos - let hasRelevantItems (batches, _) = batches |> Array.exists isRelevant let handle (batches : BatchIndices[], rc) = async { let mutable delCharges, batchesDeleted, eventsDeleted, eventsDeferred = 0., 0, 0, 0 let mutable tipI, lwm = None, None @@ -962,6 +963,7 @@ module Delete = lwm <- Some x.n return rc, (tipI, lwm), (delCharges, batchesDeleted, eventsDeleted, eventsDeferred) } + let hasRelevantItems (batches, _rc) = batches |> Array.exists isRelevant query |> Query.feedIteratorMapTi mapPage |> AsyncSeq.takeWhile hasRelevantItems @@ -1032,18 +1034,20 @@ type TipOptions /// Maximum serialized size (length of JSON.stringify representation) to permit to accumulate in Tip before they get moved out to a standalone Batch. Default: 30_000. []?maxJsonLength, []?readRetryPolicy, - []?writeRetryPolicy) = - let maxEvents, maxJsonLength = defaultArg maxEvents 0, defaultArg maxJsonLength 30_000 + []?writeRetryPolicy, + []?ignoreMissingEvents) = /// Maximum number of events permitted in Tip. When this is exceeded, events are moved out to a standalone Batch. Default: 0 - member __.MaxEvents = maxEvents + member val MaxEvents = defaultArg maxEvents 0 /// Maximum serialized size (length of JSON.stringify representation) to permit to accumulate in Tip before they get moved out to a standalone Batch. Default: 30_000. - member __.MaxJsonLength = maxJsonLength - member __.ReadRetryPolicy = readRetryPolicy - member __.WriteRetryPolicy = writeRetryPolicy + member val MaxJsonLength = defaultArg maxJsonLength 30_000 + member val ReadRetryPolicy = readRetryPolicy + member val WriteRetryPolicy = writeRetryPolicy + member val IgnoreMissingEvents = defaultArg ignoreMissingEvents false type StoreClient(container : Container, fallback : Container option, query : QueryOptions, tip : TipOptions) = let loadTip log stream pos = Tip.tryLoad log tip.ReadRetryPolicy (container, stream) (pos, None) + let ignoreMissing = tip.IgnoreMissingEvents // Always yields events forward, regardless of direction member internal __.Read(log, stream, direction, (tryDecode, isOrigin), ?minIndex, ?maxIndex, ?tip): Async = async { @@ -1052,8 +1056,8 @@ type StoreClient(container : Container, fallback : Container option, query : Que let walk log gateway = Query.scan log (gateway,stream) includeTip query.MaxItems query.MaxRequests direction (tryDecode, isOrigin) let walkFallback = match fallback with - | None -> None - | Some f -> walk (log |> Log.prop "secondary" true) f |> Some + | None -> Choice1Of2 ignoreMissing + | Some f -> Choice2Of2 (walk (log |> Log.prop "secondary" true) f) let log = log |> Log.prop "stream" stream let! pos, events = Query.load log (minIndex, maxIndex) tip (walk log container) walkFallback @@ -1061,34 +1065,34 @@ type StoreClient(container : Container, fallback : Container option, query : Que member __.ReadLazy(log, batching: QueryOptions, stream, direction, (tryDecode,isOrigin), ?minIndex, ?maxIndex) : AsyncSeq<'event[]> = Query.walkLazy log (container,stream) batching.MaxItems batching.MaxRequests (tryDecode,isOrigin) (direction, minIndex, maxIndex) - member con.Load(log, (stream, maybePos), (tryDecode, isOrigin), checkUnfolds): Async = - if not checkUnfolds then con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin)) + member store.Load(log, (stream, maybePos), (tryDecode, isOrigin), checkUnfolds : bool): Async = + if not checkUnfolds then store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin)) else async { match! loadTip log stream maybePos with | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty, Array.empty | Tip.Result.NotModified -> return invalidOp "Not applicable" - | Tip.Result.Found (pos, xs) -> return! con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip=(pos, xs)) } + | Tip.Result.Found (pos, i, xs) -> return! store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip = (pos, i, xs)) } member __.GetPosition(log, stream, ?pos): Async = async { match! loadTip log stream pos with | Tip.Result.NotFound -> return Token.create stream Position.fromKnownEmpty | Tip.Result.NotModified -> return Token.create stream pos.Value - | Tip.Result.Found (pos, _unfoldsAndEvents) -> return Token.create stream pos } - member con.Reload(log, (stream, pos), (tryDecode, isOrigin), ?preview): Async> = - let query (pos, xs) = async { - let! res = con.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), tip=(pos, xs), minIndex = pos.index) + | Tip.Result.Found (pos, _i, _unfoldsAndEvents) -> return Token.create stream pos } + member store.Reload(log, (stream, pos), (tryDecode, isOrigin), ?preview): Async> = + let query (pos, i, xs) = async { + let! res = store.Read(log, stream, Direction.Backward, (tryDecode, isOrigin), minIndex = i, tip = (pos, i, xs)) return LoadFromTokenResult.Found res } match preview with - | Some (pos, xs) -> query (pos, xs) + | Some (pos, i, xs) -> query (pos, i, xs) | None -> async { match! loadTip log stream (Some pos) with | Tip.Result.NotFound -> return LoadFromTokenResult.Found (Token.create stream Position.fromKnownEmpty, Array.empty) | Tip.Result.NotModified -> return LoadFromTokenResult.Unchanged - | Tip.Result.Found (pos,xs) -> return! query (pos, xs) } + | Tip.Result.Found (pos, i, xs) -> return! query (pos, i, xs) } member internal __.Sync(log, stream, exp, batch: Tip): Async = async { if Array.isEmpty batch.e && Array.isEmpty batch.u then invalidOp "Must write either events or unfolds." match! Sync.batch log (tip.WriteRetryPolicy, tip.MaxEvents, tip.MaxJsonLength) (container, stream) (exp, batch) with - | Sync.Result.Conflict (pos',events) -> return InternalSyncResult.Conflict (pos',events) + | Sync.Result.Conflict (pos', events) -> return InternalSyncResult.Conflict (pos', events) | Sync.Result.ConflictUnknown pos' -> return InternalSyncResult.ConflictUnknown (Token.create stream pos') | Sync.Result.Written pos' -> return InternalSyncResult.Written (Token.create stream pos') } @@ -1100,7 +1104,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let! token, events = store.Load(log, (stream, None), (codec.TryDecode,isOrigin), checkUnfolds) return token, fold initial events } member __.Reload(log, (Token.Unpack (stream, pos) as streamToken), state, fold, isOrigin, ?preloaded): Async = async { - match! store.Reload(log, (stream, pos), (codec.TryDecode,isOrigin), ?preview = preloaded) with + match! store.Reload(log, (stream, pos), (codec.TryDecode, isOrigin), ?preview = preloaded) with | LoadFromTokenResult.Unchanged -> return streamToken, state | LoadFromTokenResult.Found (token', events) -> return token', fold state events } member cat.Sync(log, token, state, events, mapUnfolds, fold, isOrigin, context, compressUnfolds): Async> = async { @@ -1119,7 +1123,7 @@ type internal Category<'event, 'state, 'context>(store : StoreClient, codec : IE let projections = Sync.mkUnfold compressor baseIndex projectionsEncoded let batch = Sync.mkBatch stream eventsEncoded projections match! store.Sync(log, stream, exp, batch) with - | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', tipEvents))) + | InternalSyncResult.Conflict (pos', tipEvents) -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin, (pos', pos.index, tipEvents))) | InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (cat.Reload(log, token, state, fold, isOrigin)) | InternalSyncResult.Written token' -> return SyncResult.Written (token', state') } @@ -1230,10 +1234,13 @@ type CosmosStoreContext(connection : CosmosStoreConnection, ?queryOptions, ?tipO /// NOTE Equinox.Cosmos versions <= 3.0.0 cannot read events in Tip, hence using a non-zero value will not be interoperable. ?tipMaxEvents, /// Maximum serialized size (length of JSON.stringify representation) permitted in Tip before they get moved out to a standalone Batch. Default: 30_000. - ?tipMaxJsonLength) = + ?tipMaxJsonLength, + /// Inhibit throwing when events are missing, but no fallback Container has been supplied + ?ignoreMissingEvents) = let queryOptions = QueryOptions(?defaultMaxItems = defaultMaxItems, ?getDefaultMaxItems = getDefaultMaxItems, ?maxRequests = maxRequests) - let tipOptions = TipOptions(?maxEvents = tipMaxEvents, ?maxJsonLength = tipMaxJsonLength) + let tipOptions = TipOptions(?maxEvents = tipMaxEvents, ?maxJsonLength = tipMaxJsonLength, ?ignoreMissingEvents = ignoreMissingEvents) CosmosStoreContext(connection, queryOptions, tipOptions) + member val Connection = connection member val QueryOptions = queryOptions |> Option.defaultWith QueryOptions member val TipOptions = tipOptions |> Option.defaultWith TipOptions member internal __.ResolveContainerClientAndStreamIdAndInit(categoryName, streamId) = @@ -1500,9 +1507,9 @@ type EventsContext internal | Some init -> do! init () let batch = Sync.mkBatch stream events Seq.empty match! store.Sync(log, stream, SyncExp.Version position.index, batch) with - | InternalSyncResult.Written (Token.Unpack (_,pos)) -> return AppendResult.Ok pos - | InternalSyncResult.Conflict (pos,events) -> return AppendResult.Conflict (pos, events) - | InternalSyncResult.ConflictUnknown (Token.Unpack (_,pos)) -> return AppendResult.ConflictUnknown pos } + | InternalSyncResult.Written (Token.Unpack (_, pos)) -> return AppendResult.Ok pos + | InternalSyncResult.Conflict (pos, events) -> return AppendResult.Conflict (pos, events) + | InternalSyncResult.ConflictUnknown (Token.Unpack (_, pos)) -> return AppendResult.ConflictUnknown pos } /// Low level, non-idempotent call appending events to a stream without a concurrency control mechanism in play /// NB Should be used sparingly; Equinox.Stream enables building equivalent equivalent idempotent handling with minimal code. diff --git a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs index 2534344cf..5066f127f 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosCoreIntegration.fs @@ -326,41 +326,57 @@ type Tests(testOutputHelper) = let prune (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { if eventsInTip then () else // TODO - let ctx = createPrimaryEventsContext log 10 (if eventsInTip then 1 else 0) - let! expected = add6EventsIn2Batches ctx streamName + let ctx, ctxUnsafe = createPrimaryEventsContextWithUnsafe log 10 0 + let! expected = add6EventsIn2BatchesEx ctx streamName 4 - // Trigger deletion of first batch + // We should still the correct high-water mark even if we don't delete anything + capture.Clear() + let! deleted, deferred, trimmedPos = Events.prune ctx streamName 0L + test <@ deleted = 0 && deferred = 0 && trimmedPos = 0L @> + test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @> + verifyRequestChargesMax 3 // 2.86 + + // Trigger deletion of first batch, but as we're in the middle of the next Batch... + capture.Clear() let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L - test <@ deleted = 1 && deferred = 4 && trimmedPos = 1L @> + test <@ deleted = 4 && deferred = 1 && trimmedPos = 4L @> test <@ [EqxAct.PruneResponse; EqxAct.Delete; EqxAct.Prune] = capture.ExternalCalls @> - verifyRequestChargesMax 17 // 13.33 + 2.9 + verifyRequestChargesMax 17 // [13.33; 2.9] - let! res = Events.get ctx streamName 0L Int32.MaxValue - verifyCorrectEvents 1L (Array.skip 1 expected) res + let pos = 4L + let! res = Events.get ctxUnsafe streamName 0L Int32.MaxValue + verifyCorrectEvents pos (Array.skip (int pos) expected) res // Repeat the process, but this time there should be no actual deletes capture.Clear() + let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L + test <@ deleted = 0 && deferred = 1 && trimmedPos = pos @> + test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @> + verifyRequestChargesMax 3 // 2.86 + + // We should still get the high-water mark even if we asked for less + capture.Clear() let! deleted, deferred, trimmedPos = Events.prune ctx streamName 4L - test <@ deleted = 0 && deferred = 3 && trimmedPos = 1L @> + test <@ deleted = 0 && deferred = 0 && trimmedPos = pos @> test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @> verifyRequestChargesMax 3 // 2.86 - let! res = Events.get ctx streamName 0L Int32.MaxValue - verifyCorrectEvents 1L (Array.skip 1 expected) res + let! res = Events.get ctxUnsafe streamName 0L Int32.MaxValue + verifyCorrectEvents pos (Array.skip (int pos) expected) res // Delete second batch capture.Clear() let! deleted, deferred, trimmedPos = Events.prune ctx streamName 6L - test <@ deleted = 5 && deferred = 0 && trimmedPos = 6L @> + test <@ deleted = 2 && deferred = 0 && trimmedPos = 6L @> test <@ [EqxAct.PruneResponse; EqxAct.Delete; EqxAct.Prune] = capture.ExternalCalls @> verifyRequestChargesMax 17 // 13.33 + 2.86 - let! res = Events.get ctx streamName 0L Int32.MaxValue + let! res = Events.get ctxUnsafe streamName 0L Int32.MaxValue test <@ [||] = res @> // Attempt to repeat capture.Clear() - let! deleted, deferred, trimmedPos = Events.prune ctx streamName 6L + let! deleted, deferred, trimmedPos = Events.prune ctx streamName 5L test <@ deleted = 0 && deferred = 0 && trimmedPos = 6L @> test <@ [EqxAct.PruneResponse; EqxAct.Prune] = capture.ExternalCalls @> verifyRequestChargesMax 3 // 2.83 @@ -372,23 +388,24 @@ type Tests(testOutputHelper) = let fallback (eventsInTip, TestStream streamName) = Async.RunSynchronously <| async { if eventsInTip then () else // TODO - let ctx1 = createPrimaryEventsContext log defaultQueryMaxItems 0 + let ctx1, ctx1Unsafe = createPrimaryEventsContextWithUnsafe log 10 0 let ctx2 = createSecondaryEventsContext log defaultQueryMaxItems let ctx12 = createFallbackEventsContext log defaultQueryMaxItems - let! expected = add6EventsIn2Batches ctx1 streamName + let! expected = add6EventsIn2BatchesEx ctx1 streamName 4 // Add the same events to the secondary container let! _ = add6EventsIn2Batches ctx2 streamName // Trigger deletion of first batch from primary let! deleted, deferred, trimmedPos = Events.prune ctx1 streamName 5L - test <@ deleted = 1 && deferred = 4 && trimmedPos = 1L @> + test <@ deleted = 4 && deferred = 1 && trimmedPos = 4L @> // Prove it's gone capture.Clear() - let! res = Events.get ctx1 streamName 0L Int32.MaxValue + let! res = Events.get ctx1Unsafe streamName 0L Int32.MaxValue test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - verifyCorrectEvents 1L (Array.skip 1 expected) res + let pos = 4L + verifyCorrectEvents pos (Array.skip (int pos) expected) res verifyRequestChargesMax 4 // 3.04 // Prove the full set exists in the secondary @@ -408,17 +425,23 @@ type Tests(testOutputHelper) = // Delete second batch in primary capture.Clear() let! deleted, deferred, trimmedPos = Events.prune ctx1 streamName 6L - test <@ deleted = 5 && deferred = 0 && trimmedPos = 6L @> + test <@ deleted = 2 && deferred = 0 && trimmedPos = 6L @> // Nothing left in primary capture.Clear() - let! res = Events.get ctx1 streamName 0L Int32.MaxValue + let! res = Events.get ctx1Unsafe streamName 0L Int32.MaxValue test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> test <@ [||] = res @> verifyRequestChargesMax 3 // 2.99 + // But primary retains high-water mark + capture.Clear() + let! res = Events.getNextIndex ctx1 streamName + test <@ [EqxAct.Tip] = capture.ExternalCalls @> + test <@ 6L = res @> + verifyRequestChargesMax 1 + // Fallback queries secondary (unless we actually delete the Tip too) - // TODO demonstrate Primary read is only of Tip when using snapshots capture.Clear() let! res = Events.get ctx12 streamName 0L Int32.MaxValue test <@ [EqxAct.ResponseForward; EqxAct.QueryForward; EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> diff --git a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs index 4d7cb7dc3..9a497dd7c 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosFixtures.fs @@ -43,9 +43,12 @@ let connectWithFallback log = let client = createClient log name discovery CosmosStoreConnection(client, databaseId, containerId, containerId2 = containerId2) +let createPrimaryContextIgnoreMissing conn queryMaxItems tipMaxEvents ignoreMissing = + CosmosStoreContext.Create(conn, defaultMaxItems = queryMaxItems, tipMaxEvents = tipMaxEvents, ignoreMissingEvents = ignoreMissing) + let createPrimaryContextEx log queryMaxItems tipMaxEvents = let conn = connectPrimary log - CosmosStoreContext.Create(conn, defaultMaxItems = queryMaxItems, tipMaxEvents = tipMaxEvents) + createPrimaryContextIgnoreMissing conn queryMaxItems tipMaxEvents false let defaultTipMaxEvents = 10 @@ -66,6 +69,13 @@ let createPrimaryEventsContext log queryMaxItems tipMaxItems = let context = createPrimaryContextEx log queryMaxItems tipMaxItems Equinox.CosmosStore.Core.EventsContext(context, log) +let createPrimaryEventsContextWithUnsafe log queryMaxItems tipMaxItems = + let conn = connectPrimary log + let create ignoreMissing = + let ctx = createPrimaryContextIgnoreMissing conn queryMaxItems tipMaxItems ignoreMissing + Equinox.CosmosStore.Core.EventsContext(ctx, log) + create false, create true + let createSecondaryEventsContext log queryMaxItems = let context = createSecondaryContext log queryMaxItems Equinox.CosmosStore.Core.EventsContext(context, log) diff --git a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs index b80655d96..91f4ea1da 100644 --- a/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs +++ b/tests/Equinox.CosmosStore.Integration/CosmosIntegration.fs @@ -210,7 +210,6 @@ type Tests(testOutputHelper) = if not eventsInTip then (* Verify pruning does not affect the copies of the events maintained as Unfolds *) - // Needs to share the same context (with inner CosmosClient) for the session token to be threaded through // If we run on an independent context, we won't see (and hence prune) the full set of events let ctx = Core.EventsContext(context, log) @@ -220,14 +219,23 @@ type Tests(testOutputHelper) = let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 15L test <@ deleted = 15 && deferred = 0 && trimmedPos = 15L @> - // Prove they're gone + // Prove we notice they're gone capture.Clear() - let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue |> Async.Catch + test <@ match res with + | Choice2Of2 e -> e.Message.StartsWith "Origin event not found; no secondary container supplied" + | x -> failwithf "Unexpected %A" x @> test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - test <@ [||] = res @> verifyRequestChargesMax 3 // 2.99 - // But we can still read (there's no cache so we'll definitely be reading) + // But not forgotten + capture.Clear() + let! pos = Core.Events.getNextIndex ctx streamName + test <@ [EqxAct.Tip] = capture.ExternalCalls @> // Note in the current impl, this read is not cached + test <@ 15L = pos @> + verifyRequestChargesMax 1 + + // And we can still read the Snapshot from the Tip's unfolds (there's no caching so we'll definitely be reading) capture.Clear() let! _ = service.Read id test <@ value = result @> @@ -363,11 +371,13 @@ type Tests(testOutputHelper) = let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 12L test <@ deleted = 12 && deferred = 0 && trimmedPos = 12L @> - // Prove they're gone + // Show alarms are raised when they're gone capture.Clear() - let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue |> Async.Catch + test <@ match res with + | Choice2Of2 e -> e.Message.StartsWith "Origin event not found; no secondary container supplied" + | x -> failwithf "Unexpected %A" x @> test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - test <@ [||] = res @> verifyRequestChargesMax 3 // 2.99 // But we can still read (there's no cache so we'll definitely be reading) @@ -379,7 +389,7 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` (eventsInTip, cartContext, skuId) = Async.RunSynchronously <| async { - let context = createPrimaryContextEx log 10 (if eventsInTip then 1 else 0) + let context = createPrimaryContextEx log 10 (if eventsInTip then 10 else 0) let cache = Equinox.Cache("cart", sizeMb = 50) let createServiceCached () = Cart.createServiceWithSnapshotStrategyAndCaching log context cache let service1, service2 = createServiceCached (), createServiceCached () @@ -422,11 +432,13 @@ type Tests(testOutputHelper) = let! deleted, deferred, trimmedPos = Core.Events.prune ctx streamName 13L test <@ deleted = 13 && deferred = 0 && trimmedPos = 13L @> - // Prove they're gone + // Show that we hear about it if we try to load the events capture.Clear() - let! res = Core.Events.get ctx streamName 0L Int32.MaxValue + let! res = Core.Events.get ctx streamName 0L Int32.MaxValue |> Async.Catch + test <@ match res with + | Choice2Of2 e -> e.Message.StartsWith "Origin event not found; no secondary container supplied" + | x -> failwithf "Unexpected %A" x @> test <@ [EqxAct.ResponseForward; EqxAct.QueryForward] = capture.ExternalCalls @> - test <@ [||] = res @> verifyRequestChargesMax 3 // 2.99 // But we can still read (service2 shares the cache so is aware of the last writes, and pruning does not invalidate the Tip)