Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache: Add ReadThrough mode #386

Merged
merged 22 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- `Equinox`: `Decider.Transact`, `TransactAsync`, `TransactExAsync` overloads [#325](https://github.com/jet/equinox/pull/325)
- `Equinox`: `StreamId` replaces usage of `FsCodec.StreamName` [#353](https://github.com/jet/equinox/pull/353) [#378](https://github.com/jet/equinox/pull/378)
- `Equinox.LoadOption.RequireLeader`: support for requesting a consistent read of a stream [#341](https://github.com/jet/equinox/pull/341)
- `Equinox.LoadOption.AllowStale`: Read mode that limits reads to a maximum of one retrieval per the defined time window [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Core`: `Category` base class, with `Decider` and `Stream` helper `module`s [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.DeciderCore`: C# friendly equivalent of `Decider` (i.e. `Func` and `Task`) [#338](https://github.com/jet/equinox/pull/338)
- `Equinox.ISyncContext.StreamEventBytes`: Exposes stored size of events in the stream (initial impl provides it for `DynamoStore` only) [#326](https://github.com/jet/equinox/pull/326)
Expand All @@ -27,6 +28,7 @@ The `Unreleased` section name is replaced by the expected version of next releas
- Performance: Switch surface APIs to `struct` Tuples and Options where relevant, some due to `struct` changes in [`FsCodec` #82](https://github.com/jet/FsCodec/pull/82), and use `task` in hot paths [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: Merge `ResolveOption` and `XXXStoreCategory.FromMemento` as `LoadOption` [#308](https://github.com/jet/equinox/pull/308)
- `Equinox`: Merge `XXXStoreCategory.Resolve(sn, ?ResolveOption)` and `XXXStoreCategory.FromMemento` as option `LoadOption` parameter on all `Transact` and `Query` methods [#308](https://github.com/jet/equinox/pull/308)
- `Equinox.LoadOption`: Rename `AllowStale` to `AnyCachedValue` [#386](https://github.com/jet/equinox/pull/386)
- `Equinox.Decider`: `log` is now supplied via `Equinox.Category` [#337](https://github.com/jet/equinox/pull/337)
- `Equinox.Decider`: `maxAttempts` with a default policy and an optional argument on `Transact*` APIs [#337](https://github.com/jet/equinox/pull/337)
- `Equinox`: push `Serilog` dependency out to `Equinox.Core` [#337](https://github.com/jet/equinox/pull/337)
Expand Down
6 changes: 3 additions & 3 deletions DOCUMENTATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ let create resolve = Service(streamId >> resolve Category)
```

`Read` above will do a roundtrip to the Store in order to fetch the most recent
state (in `AllowStale` mode, the store roundtrip can be optimized out by
state (in `AnyCachedValue` or `AllowStale` modes, the store roundtrip can be optimized out by
reading through the cache). This Synchronous Read can be used to
[Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency)
to establish a state incorporating the effects of any Command invocation you
Expand Down Expand Up @@ -1313,7 +1313,7 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St

member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
return interpretMany Fold.fold (Seq.map interpret commands) state }, opt)
Expand Down Expand Up @@ -1375,7 +1375,7 @@ type Accumulator<'event, 'state>(fold : 'state -> 'event seq -> 'state, originSt
type Service ... =
member _.Run(cartId, optimistic, commands : Command seq, ?prepare) : Async<Fold.State> =
let decider = resolve cartId
let opt = if optimistic then Some Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.Transact(fun state -> async {
match prepare with None -> () | Some prep -> do! prep
let acc = Accumulator(Fold.fold, state)
Expand Down
4 changes: 2 additions & 2 deletions samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
return interpretMany Fold.fold (Seq.map interpret commands) state }
#endif
let decider = resolve cartId
let opt = if optimistic then Equinox.AllowStale else Equinox.RequireLoad
let opt = if optimistic then Equinox.AnyCachedValue else Equinox.RequireLoad
decider.TransactAsync(interpret, opt)

member x.ExecuteManyAsync(cartId, optimistic, commands: Command seq, ?prepare): Async<unit> =
Expand All @@ -166,7 +166,7 @@ type Service internal (resolve: CartId -> Equinox.Decider<Events.Event, Fold.Sta
decider.Query id
member _.ReadStale cartId =
let decider = resolve cartId
decider.Query(id, Equinox.LoadOption.AllowStale)
decider.Query(id, Equinox.LoadOption.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
2 changes: 1 addition & 1 deletion samples/Store/Domain/ContactPreferences.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type Service internal (resolve: ClientId -> Equinox.Decider<Events.Event, Fold.S

member _.ReadStale(email) =
let decider = resolve email
decider.Query(id, Equinox.AllowStale)
decider.Query(id, Equinox.AnyCachedValue)

let create resolve =
Service(streamId >> resolve Category)
1 change: 1 addition & 0 deletions src/Equinox.Core/AsyncCacheCell.fs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type AsyncLazy<'T>(workflow: unit -> Task<'T>) =
/// Await the outcome of the computation.
member _.Await() = workflow.Value

/// Singleton Empty value
static member val Empty = AsyncLazy(fun () -> Task.FromException<'T>(System.InvalidOperationException "Uninitialized AsyncLazy"))

/// Generic async lazy caching implementation that admits expiration/recomputation/retry on exception semantics.
Expand Down
119 changes: 88 additions & 31 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,120 @@ module internal CacheItemOptions =
| RelativeExpiration relative -> CacheItemPolicy(SlidingExpiration = relative)

type ICache =
abstract member TryGet:
key: string
-> Task<struct (StreamToken * 'state) voption>
abstract member UpdateIfNewer:
key: string
* isStale: System.Func<StreamToken, StreamToken, bool>
abstract member Load: key: string
* maxAge: TimeSpan
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* token: StreamToken
* state: 'state
-> Task<unit>
* loadOrReload: (struct (StreamToken * 'state) voption -> Task<struct (StreamToken * 'state)>)
-> Task<struct (StreamToken * 'state)>
abstract member Save: key: string
* isStale: Func<StreamToken, StreamToken, bool>
* options: CacheItemOptions
* timestamp: int64
* token: StreamToken * state: 'state
-> unit

namespace Equinox

open Equinox.Core
open Equinox.Core.Tracing
open System
open System.Threading.Tasks

type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state) =
type internal CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, initialTimestamp: int64) =
let mutable currentToken = initialToken
let mutable currentState = initialState
member x.Value: struct (StreamToken * 'state) =
lock x <| fun () ->
currentToken, currentState
member x.UpdateIfNewer(isStale: System.Func<StreamToken, StreamToken, bool>, other: CacheEntry<'state>) =
let mutable verifiedTimestamp = initialTimestamp
let tryGet () =
if verifiedTimestamp = 0 then ValueNone
else ValueSome (struct (currentToken, currentState))
let mutable cell = AsyncLazy<struct(int64 * (struct (StreamToken * 'state)))>.Empty
static member CreateEmpty() =
new CacheEntry<'state>(Unchecked.defaultof<StreamToken>, Unchecked.defaultof<'state>, 0)
member x.TryGetValue(): (struct (StreamToken * 'state)) voption =
lock x tryGet
member x.MergeUpdates(isStale: Func<StreamToken, StreamToken, bool>, timestamp, token, state) =
lock x <| fun () ->
let struct (candidateToken, state) = other.Value
if not (isStale.Invoke(currentToken, candidateToken)) then
currentToken <- candidateToken
if not (isStale.Invoke(currentToken, token)) then
currentToken <- token
currentState <- state
if verifiedTimestamp < timestamp then // Don't count attempts to overwrite with stale state as verification
verifiedTimestamp <- timestamp
// Follows high level flow of AsyncCacheCell.Await - read the comments there, and the AsyncCacheCell tests first!
member x.ReadThrough(maxAge: TimeSpan, isStale, load) = task {
let cacheEntryValidityCheckTimestamp = System.Diagnostics.Stopwatch.GetTimestamp()
let isWithinMaxAge cachedValueTimestamp = Stopwatch.TicksToSeconds(cacheEntryValidityCheckTimestamp - cachedValueTimestamp) <= maxAge.TotalSeconds
let fetchStateConsistently () = struct (cell, tryGet (), isWithinMaxAge verifiedTimestamp)
match lock x fetchStateConsistently with
| _, ValueSome cachedValue, true ->
return cachedValue
| ourInitialCellState, maybeBaseState, _ -> // If it's not good enough for us, trigger a request (though someone may have beaten us to that)

// Inspect/await any concurrent attempt to see if it is sufficient for our needs
match! ourInitialCellState.TryAwaitValid() with
| ValueSome (fetchCommencedTimestamp, res) when isWithinMaxAge fetchCommencedTimestamp -> return res
| _ ->

// .. it wasn't; join the race to dispatch a request (others following us will share our fate via the TryAwaitValid)
let newInstance = AsyncLazy(load maybeBaseState)
let _ = System.Threading.Interlocked.CompareExchange(&cell, newInstance, ourInitialCellState)
let! timestamp, (token, state as res) = cell.Await()
x.MergeUpdates(isStale, timestamp, token, state) // merge observed result into the cache
return res }

type Cache private (inner: System.Runtime.Caching.MemoryCache) =
let tryLoad key =
match inner.Get key with
| null -> ValueNone
| :? CacheEntry<'state> as existingEntry -> existingEntry.TryGetValue()
| x -> failwith $"tryLoad Incompatible cache entry %A{x}"
let addOrGet key options entry =
match inner.AddOrGetExisting(key, entry, CacheItemOptions.toPolicy options) with
| null -> Ok entry
| :? CacheEntry<'state> as existingEntry -> Error existingEntry
| x -> failwith $"addOrGet Incompatible cache entry %A{x}"
let getElseAddEmptyEntry key options =
match addOrGet key options (CacheEntry<'state>.CreateEmpty()) with
| Ok fresh -> fresh
| Error existingEntry -> existingEntry
let addOrMergeCacheEntry isStale key options timestamp struct (token, state) =
let entry = CacheEntry(token, state, timestamp)
match addOrGet key options entry with
| Ok _ -> () // Our fresh one got added
| Error existingEntry -> existingEntry.MergeUpdates(isStale, timestamp, token, state)
new (name, sizeMb: int) =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
Cache(new System.Runtime.Caching.MemoryCache(name, config))
interface ICache with
member _.TryGet key =
match inner.Get key with
| null -> ValueNone |> Task.FromResult
| :? CacheEntry<'state> as existingEntry -> ValueSome existingEntry.Value |> Task.FromResult
| x -> failwithf "TryGet Incompatible cache entry %A" x
member _.UpdateIfNewer(key, isStale, options, token, state) =
let freshEntry = CacheEntry(token, state)
match inner.AddOrGetExisting(key, freshEntry, CacheItemOptions.toPolicy options) with
| null -> Task.FromResult()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer(isStale, freshEntry); Task.FromResult()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
// if there's a non-zero maxAge, concurrent read attempts share the roundtrip (and its fate, if it throws)
member _.Load(key, maxAge, isStale, options, loadOrReload) = task {
let loadOrReload maybeBaseState () = task {
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome maybeBaseState) |> ignore
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙏

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW not certain whether there's value in separated metrics for incremental loads due to caching vs AllowStale roundtrips saved, or whether a cache hit is the main point - will be pondering

let ts = System.Diagnostics.Stopwatch.GetTimestamp()
let! res = loadOrReload maybeBaseState
return struct (ts, res) }
if maxAge = TimeSpan.Zero then // Boring algorithm that has each caller independently load/reload the data and then cache it
let maybeBaseState = tryLoad key
let! timestamp, res = loadOrReload maybeBaseState ()
addOrMergeCacheEntry isStale key options timestamp res
return res
else // ensure we have an entry in the cache for this key; coordinate retrieval through that
let cacheSlot = getElseAddEmptyEntry key options
return! cacheSlot.ReadThrough(maxAge, isStale, loadOrReload) }
// Newer values get saved; equal values update the last retrieval timestamp
member _.Save(key, isStale, options, timestamp, token, state) =
addOrMergeCacheEntry isStale key options timestamp (token, state)

type [<NoComparison; NoEquality; RequireQualifiedAccess>] CachingStrategy =
/// Retain a single 'state per streamName.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| SlidingWindow of ICache * window: TimeSpan
/// Retain a single 'state per streamName.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs a roundtrip to load any subsequently-added events.
/// Unless a <c>LoadOption</c> is used, cache hits still incur a roundtrip to load any subsequently-added events.
| FixedTimeSpan of ICache * period: TimeSpan
/// Prefix is used to segregate multiple folded states per stream when they are stored in the cache.
/// Semantics are otherwise identical to <c>SlidingWindow</c>.
Expand Down
26 changes: 8 additions & 18 deletions src/Equinox.Core/Caching.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
module Equinox.Core.Caching

open Equinox.Core.Tracing
open Serilog
open System
open System.Threading
Expand All @@ -12,32 +11,23 @@ type IReloadable<'state> =

let private tee f (inner: CancellationToken -> Task<struct (StreamToken * 'state)>) ct = task {
let! tokenAndState = inner ct
do! f tokenAndState
f tokenAndState
return tokenAndState }

type private Decorator<'event, 'state, 'context, 'cat when 'cat :> ICategory<'event, 'state, 'context> and 'cat :> IReloadable<'state> >
bartelink marked this conversation as resolved.
Show resolved Hide resolved
(category: 'cat, cache: ICache, isStale, createKey, createOptions) =
let tryRead key = task {
let! cacheItem = cache.TryGet key
let act = System.Diagnostics.Activity.Current
if act <> null then act.AddCacheHit(ValueOption.isSome cacheItem) |> ignore
return cacheItem }
let save key (inner: CancellationToken -> Task<struct (StreamToken * 'state)>) ct = task {
let! struct (token, state) as res = inner ct
do! cache.UpdateIfNewer(key, isStale, createOptions (), token, state)
return res }
interface ICategory<'event, 'state, 'context> with
member _.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct) = task {
let key = createKey streamName
match! tryRead key with
| ValueNone -> return! save key (fun ct -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)) ct
| ValueSome tokenAndState when maxAge = TimeSpan.MaxValue -> return tokenAndState // read already updated TTL, no need to write
| ValueSome (token, state) -> return! save key (fun ct -> category.Reload(log, streamName, requireLeader, token, state, ct)) ct }
let loadOrReload = function
| ValueNone -> category.Load(log, categoryName, streamId, streamName, maxAge, requireLeader, ct)
| ValueSome (struct (token, state)) -> category.Reload(log, streamName, requireLeader, token, state, ct)
return! cache.Load(createKey streamName, maxAge, isStale, createOptions (), loadOrReload) }
member _.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) = task {
let save struct (token, state) = cache.UpdateIfNewer(createKey streamName, isStale, createOptions (), token, state)
let timestamp = System.Diagnostics.Stopwatch.GetTimestamp() // NB take the timestamp before any potential write takes place
let save struct (token, state) = cache.Save(createKey streamName, isStale, createOptions (), timestamp, token, state)
match! category.TrySync(log, categoryName, streamId, streamName, context, maybeInit, streamToken, state, events, ct) with
| SyncResult.Written tokenAndState' ->
do! save tokenAndState'
save tokenAndState'
return SyncResult.Written tokenAndState'
| SyncResult.Conflict resync ->
return SyncResult.Conflict (tee save resync) }
Expand Down
6 changes: 3 additions & 3 deletions src/Equinox.CosmosStore/CosmosStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,14 +1328,14 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag.
/// Each cache hit for a stream renews the retention period for the defined <c>window</c>.
/// Upon expiration of the defined <c>window</c> from the point at which the cache was entry was last used, a full reload is triggered.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in Tip
| SlidingWindow of ICache * window: TimeSpan
/// Retain a single 'state per streamName, together with the associated etag.
/// Upon expiration of the defined <c>period</c>, a full reload is triggered.
/// Typically combined with `Equinox.LoadOption.AllowStale` to minimize loads.
/// Unless <c>LoadOption.AllowStale</c> is used, each cache hit still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
/// Typically combined with an `Equinox.LoadOption` to minimize loads.
/// Unless <c>LoadOption.AnyCachedValue</c> or <c>AllowStale</c> are used, cache hits still incurs an etag-contingent Tip read (at a cost of a roundtrip with a 1RU charge if unmodified).
| FixedTimeSpan of ICache * period: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
Expand Down
Loading