Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#145 Define ICache, centralize implementation in Equinox.Core #161

Merged
merged 6 commits into from
Oct 13, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module Cosmos =
let conn = connector.Connect("equinox-tool", discovery) |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.Cache("equinox-tool", sizeMb = 50)
bartelink marked this conversation as resolved.
Show resolved Hide resolved
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dName, cName)
Expand Down Expand Up @@ -135,7 +135,7 @@ module EventStore =
let conn = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Es ((createGateway conn batchSize), cacheStrategy, unfolds)
58 changes: 58 additions & 0 deletions src/Equinox.Core/Cache.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
namespace Equinox.Core

open System

type CacheItemOptions =
| AbsoluteExpiration of DateTimeOffset
| RelativeExpiration of TimeSpan

[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken: StreamToken, initialState: 'state, supersedes: StreamToken -> StreamToken -> bool) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer(other: CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value: StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState


type ICache =
abstract member UpdateIfNewer: cacheItemOptions:CacheItemOptions -> key: string -> CacheEntry<'state> -> Async<unit>
abstract member TryGet: key: string -> Async<(StreamToken * 'state) option>

namespace Equinox
open System.Runtime.Caching
bartelink marked this conversation as resolved.
Show resolved Hide resolved
open Equinox.Core
type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)

let getPolicy (cacheItemOption: CacheItemOptions)=
match cacheItemOption with
| AbsoluteExpiration absolute -> new CacheItemPolicy(AbsoluteExpiration = absolute)
| RelativeExpiration relative -> new CacheItemPolicy(SlidingExpiration = relative)

interface ICache with

member this.UpdateIfNewer cacheItemOptions key entry =
let policy = getPolicy cacheItemOptions
match cache.AddOrGetExisting(key, box entry, policy) with
| null ->
async.Return ()
DSilence marked this conversation as resolved.
Show resolved Hide resolved
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
async.Return ()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x

member this.TryGet key =
async.Return (
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x
)
2 changes: 2 additions & 0 deletions src/Equinox.Core/Equinox.Core.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
<Compile Include="Stream.fs" />
<Compile Include="Retry.fs" />
<Compile Include="AsyncCacheCell.fs" />
<Compile Include="Cache.fs" />
</ItemGroup>

<ItemGroup>
Expand All @@ -28,6 +29,7 @@

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' != 'netstandard2.0' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="System.Runtime.Caching" Version="4.6.0" />
DSilence marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>

</Project>
80 changes: 30 additions & 50 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ open Microsoft.Azure.Documents
open Serilog
open System
open System.Collections.Concurrent
open System.Runtime.Caching
DSilence marked this conversation as resolved.
Show resolved Hide resolved

/// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts)
type Connection(client: Client.DocumentClient, [<O; D(null)>]?readRetryPolicy: IRetryPolicy, [<O; D(null)>]?writeRetryPolicy) =
Expand Down Expand Up @@ -874,62 +875,39 @@ type private Category<'event, 'state>(gateway : Gateway, codec : IUnionEncoder<'
| InternalSyncResult.Written token' -> return SyncResult.Written (token', state') }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry =
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
member __.TryGet (key : string) =
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> unit) =
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
async{
DSilence marked this conversation as resolved.
Show resolved Hide resolved
let! _ = tee streamName tokenAndState
return tokenAndState
}
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger) : Async<StreamToken * 'state> =
interceptAsync (inner.Load containerStream log) (snd containerStream)
member __.TrySync (log : ILogger) (Token.Unpack (_container,stream,_) as streamToken,state) (events : 'event list)
: Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream)
| SyncResult.Written (token', state') ->return SyncResult.Written (intercept stream (token', state')) }
| SyncResult.Conflict resync -> return SyncResult.Conflict(interceptAsync resync stream)
| SyncResult.Written(token', state')
->
DSilence marked this conversation as resolved.
Show resolved Hide resolved
let! intercepted = intercept stream (token', state')
return SyncResult.Written(intercepted) }


let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, Container*string>)
: ICategory<'event, 'state, Container*string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>
Expand All @@ -939,16 +917,18 @@ type private Folder<'event, 'state>
?readCache) =
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> =
let batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> batched
| Some (cache : Caching.Cache, prefix : string) ->
match cache.TryGet(prefix + snd containerStream) with
| None -> batched
| Some tokenAndState -> cached tokenAndState
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> = async {
let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
let! cacheItem = cache.TryGet(prefix + snd containerStream)
match cacheItem with
| None -> return batched
| Some tokenAndState -> return! cached tokenAndState
}
DSilence marked this conversation as resolved.
Show resolved Hide resolved
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async<SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
match res with
Expand Down Expand Up @@ -1003,7 +983,7 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag
/// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
/// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip
| SlidingWindow of Caching.Cache * window: TimeSpan
| SlidingWindow of ICache * window: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
Expand Down
71 changes: 24 additions & 47 deletions src/Equinox.EventStore/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -442,44 +442,16 @@ type private Category<'event, 'state>(context : Context, codec : FsCodec.IUnionE
return SyncResult.Written (token', fold state (Seq.ofList events)) }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry =
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
member __.TryGet (key : string) =
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> unit) =
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, string>, tee : string -> StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
async{
DSilence marked this conversation as resolved.
Show resolved Hide resolved
let! _ = tee streamName tokenAndState
return tokenAndState
}
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
return! intercept streamName tokenAndState }
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async<StreamToken * 'state> =
interceptAsync (inner.Load streamName log) streamName
Expand All @@ -489,26 +461,31 @@ module Caching =
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream.name)
| SyncResult.Written (token', state') -> return SyncResult.Written (token', state') }


let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: ICategory<'event, 'state, string>)
: ICategory<'event, 'state, string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
DSilence marked this conversation as resolved.
Show resolved Hide resolved
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: 'state -> 'event seq -> 'state, initial: 'state, ?readCache) =
let loadAlgorithm streamName initial log =
let batched = category.Load fold initial streamName log
let cached token state = category.LoadFromToken fold state streamName token log
match readCache with
| None -> batched
| Some (cache : Caching.Cache, prefix : string) ->
match cache.TryGet(prefix + streamName) with
| None -> batched
| Some (token, state) -> cached token state
async {
DSilence marked this conversation as resolved.
Show resolved Hide resolved
let! batched = category.Load fold initial streamName log
let cached token state = category.LoadFromToken fold state streamName token log
match readCache with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
let! cacheItem = cache.TryGet(prefix + streamName)
match cacheItem with
| None -> return batched
| Some (token, state) -> return! cached token state
}
interface ICategory<'event, 'state, string> with
member __.Load (streamName : string) (log : ILogger) : Async<StreamToken * 'state> =
loadAlgorithm streamName initial log
Expand All @@ -520,9 +497,9 @@ type private Folder<'event, 'state>(category : Category<'event, 'state>, fold: '

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type CachingStrategy =
| SlidingWindow of Caching.Cache * window: TimeSpan
| SlidingWindow of ICache * window: TimeSpan
/// Prefix is used to segregate multiple folds per stream when they are stored in the cache
| SlidingWindowPrefixed of Caching.Cache * window: TimeSpan * prefix: string
| SlidingWindowPrefixed of ICache * window: TimeSpan * prefix: string

type Resolver<'event,'state>
( context : Context, codec, fold, initial,
Expand Down
2 changes: 1 addition & 1 deletion tests/Equinox.Cosmos.Integration/CosmosIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ type Tests(testOutputHelper) =
let ``Can roundtrip against Cosmos, correctly using Snapshotting and Cache to avoid redundant reads`` context skuId = Async.RunSynchronously <| async {
let! conn = connectToSpecifiedCosmosOrSimulator log
let batchSize = 10
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let createServiceCached () = Cart.createServiceWithSnapshotStrategyAndCaching conn batchSize log cache
let service1, service2 = createServiceCached (), createServiceCached ()
capture.Clear()
Expand Down
4 changes: 2 additions & 2 deletions tests/Equinox.EventStore.Integration/EventStoreIntegration.fs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ type Tests(testOutputHelper) =
let log, capture = createLoggerWithCapture ()
let! conn = connectToLocalEventStoreNode log
let batchSize = 10
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let gateway = createGesGateway conn batchSize
let createServiceCached () = Cart.createServiceWithCaching log gateway cache
let service1, service2 = createServiceCached (), createServiceCached ()
Expand Down Expand Up @@ -277,7 +277,7 @@ type Tests(testOutputHelper) =
let batchSize = 10
let gateway = createGesGateway conn batchSize
let service1 = Cart.createServiceWithCompaction log gateway
let cache = Caching.Cache("cart", sizeMb = 50)
let cache = Equinox.Cache("cart", sizeMb = 50)
let gateway = createGesGateway conn batchSize
let service2 = Cart.createServiceWithCompactionAndCaching log gateway cache

Expand Down