Skip to content

Commit

Permalink
Added: initial cache implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
DSilence committed Sep 21, 2019
1 parent 495c80d commit ad6c052
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 104 deletions.
12 changes: 12 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Tutorial", "samples\Tutoria
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Tools.TestHarness", "tools\Equinox.Tools.TestHarness\Equinox.Tools.TestHarness.fsproj", "{C2EE7D8E-6982-41AD-ADCC-E6F586E93524}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.System.Runtime.Caching", "src\Equinox.System.Runtime.Caching\Equinox.System.Runtime.Caching.fsproj", "{50F987AC-5062-4E2F-BD8D-59AF9CE97103}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Microsoft.Extensions.Caching", "src\Equinix.Microsoft.Extensions.Caching\Equinox.Microsoft.Extensions.Caching.fsproj", "{4A1E9731-5E49-418F-AF23-4BF684A7B88F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -132,6 +136,14 @@ Global
{C2EE7D8E-6982-41AD-ADCC-E6F586E93524}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C2EE7D8E-6982-41AD-ADCC-E6F586E93524}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C2EE7D8E-6982-41AD-ADCC-E6F586E93524}.Release|Any CPU.Build.0 = Release|Any CPU
{50F987AC-5062-4E2F-BD8D-59AF9CE97103}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{50F987AC-5062-4E2F-BD8D-59AF9CE97103}.Debug|Any CPU.Build.0 = Debug|Any CPU
{50F987AC-5062-4E2F-BD8D-59AF9CE97103}.Release|Any CPU.ActiveCfg = Release|Any CPU
{50F987AC-5062-4E2F-BD8D-59AF9CE97103}.Release|Any CPU.Build.0 = Release|Any CPU
{4A1E9731-5E49-418F-AF23-4BF684A7B88F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4A1E9731-5E49-418F-AF23-4BF684A7B88F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A1E9731-5E49-418F-AF23-4BF684A7B88F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A1E9731-5E49-418F-AF23-4BF684A7B88F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
1 change: 1 addition & 0 deletions samples/Infrastructure/Infrastructure.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Equinox.System.Runtime.Caching\Equinox.System.Runtime.Caching.fsproj" />
<ProjectReference Include="..\..\src\Equinox\Equinox.fsproj" />
<ProjectReference Include="..\..\src\Equinox.Cosmos\Equinox.Cosmos.fsproj" />
<ProjectReference Include="..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
Expand Down
4 changes: 2 additions & 2 deletions samples/Infrastructure/Storage.fs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ module Cosmos =
let conn = connector.Connect("equinox-tool", discovery) |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.System.Runtime.Caching.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)
else CachingStrategy.NoCaching
StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dName, cName)
Expand Down Expand Up @@ -135,7 +135,7 @@ module EventStore =
let conn = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously
let cacheStrategy =
if cache then
let c = Caching.Cache("equinox-tool", sizeMb = 50)
let c = Equinox.System.Runtime.Caching.Cache("equinox-tool", sizeMb = 50)
CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) |> Some
else None
StorageConfig.Es ((createGateway conn batchSize), cacheStrategy, unfolds)
1 change: 1 addition & 0 deletions samples/Tutorial/Tutorial.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<ProjectReference Include="..\..\src\Equinox.Cosmos\Equinox.Cosmos.fsproj" />
<ProjectReference Include="..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" />
<ProjectReference Include="..\..\src\Equinox.System.Runtime.Caching\Equinox.System.Runtime.Caching.fsproj" />
<ProjectReference Include="..\..\src\Equinox\Equinox.fsproj" />
</ItemGroup>

Expand Down
37 changes: 37 additions & 0 deletions src/Equinix.Microsoft.Extensions.Caching/Cache.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
module Equinox.Microsoft.Extensions.Caching

open System
open Equinox.Store
open Microsoft.Extensions.Caching.Memory

type Cache(cache: IMemoryCache, valueSizeCalculator: (Object -> int64) option) =
new (cache: IMemoryCache) = Cache(cache, None)

interface ICache with
member this.UpdateIfNewer cacheItemOptions key entry =
let setExpiration (cacheEntry: ICacheEntry) (cacheItemOption: CacheItemOptions) =
match cacheItemOption with
| AbsoluteExpiration absolute -> cacheEntry.AbsoluteExpiration <- Nullable absolute
| RelativeExpiration relative -> cacheEntry.SlidingExpiration <- Nullable relative
match cache.GetOrCreate (key, fun cacheItem ->
let (_, targetObject) = entry.Value
cacheItem.Size <-
match valueSizeCalculator with
| Some (calculator) -> Nullable(calculator targetObject)
| None -> Nullable<int64>()
setExpiration cacheItem cacheItemOptions
entry)
with
| null -> async.Return()
| :? (CacheEntry<'state>) as existingEntry
-> existingEntry.UpdateIfNewer entry
async.Return()
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x

member this.TryGet key =
async.Return (
match cache.Get key with
| null -> None
| :? (CacheEntry<'state>) as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<GenerateDocumentationFile Condition=" '$(Configuration)' == 'Release' ">true</GenerateDocumentationFile>
<WarningLevel>5</WarningLevel>
<IsTestProject>false</IsTestProject>
<DisableImplicitFSharpCoreReference>true</DisableImplicitFSharpCoreReference>
<DisableImplicitSystemValueTupleReference>true</DisableImplicitSystemValueTupleReference>
</PropertyGroup>
<ItemGroup>
<Compile Include="Cache.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="2.1.0" />
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.0.0-beta2-18618-05" PrivateAssets="All" />
<PackageReference Include="MinVer" Version="1.0.0" PrivateAssets="All" />

<PackageReference Include="FSharp.Core" Version="3.1.2.5" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageReference Include="FSharp.Core" Version="4.3.4" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageReference Include="Serilog" Version="2.7.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Equinox\Equinox.fsproj" />
</ItemGroup>
</Project>
81 changes: 30 additions & 51 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
Expand Up @@ -770,7 +770,7 @@ namespace Equinox.Cosmos

open Equinox
open Equinox.Cosmos.Store
open Equinox.Store.Infrastructure
open Equinox.Store
open FSharp.Control
open Microsoft.Azure.Documents
open Serilog
Expand Down Expand Up @@ -875,62 +875,39 @@ type private Category<'event, 'state>(gateway : Gateway, codec : FsCodec.IUnionE
| InternalSyncResult.Written token' -> return Store.SyncResult.Written (token', state') }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : Store.StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : Store.StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

type Cache(name, sizeMb : int) =
let cache =
let config = System.Collections.Specialized.NameValueCollection(1)
config.Add("cacheMemoryLimitMegabytes", string sizeMb);
new MemoryCache(name, config)
member __.UpdateIfNewer (policy : CacheItemPolicy) (key : string) entry =
match cache.AddOrGetExisting(key, box entry, policy) with
| null -> ()
| :? CacheEntry<'state> as existingEntry -> existingEntry.UpdateIfNewer entry
| x -> failwithf "UpdateIfNewer Incompatible cache entry %A" x
member __.TryGet (key : string) =
match cache.Get key with
| null -> None
| :? CacheEntry<'state> as existingEntry -> Some existingEntry.Value
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: Store.ICategory<'event, 'state, Container*string>, tee : string -> Store.StreamToken * 'state -> unit) =
type CategoryTee<'event, 'state>(inner: Store.ICategory<'event, 'state, Container*string>, tee : string -> Store.StreamToken * 'state -> Async<unit>) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
async{
let! _ = tee streamName tokenAndState
return tokenAndState
}
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
return! intercept streamName tokenAndState }
interface Store.ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger) : Async<Store.StreamToken * 'state> =
interceptAsync (inner.Load containerStream log) (snd containerStream)
member __.TrySync (log : ILogger) (Token.Unpack (_container,stream,_) as streamToken,state) (events : 'event list)
: Async<Store.SyncResult<'state>> = async {
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| Store.SyncResult.Conflict resync -> return Store.SyncResult.Conflict (interceptAsync resync stream)
| Store.SyncResult.Written (token', state') ->return Store.SyncResult.Written (intercept stream (token', state')) }
| Store.SyncResult.Conflict resync -> return Store.SyncResult.Conflict(interceptAsync resync stream)
| Store.SyncResult.Written(token', state')
->
let! intercepted = intercept stream (token', state')
return Store.SyncResult.Written(intercepted) }


let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(cache: ICache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: Store.ICategory<'event, 'state, Container*string>)
: Store.ICategory<'event, 'state, Container*string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
let cacheEntryGenerator (initialToken: StreamToken, initialState: 'state) = new CacheEntry<'state>(initialToken, initialState, Token.supersedes)
let policy = CacheItemOptions.RelativeExpiration(slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = cacheEntryGenerator >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _

type private Folder<'event, 'state>
Expand All @@ -940,16 +917,18 @@ type private Folder<'event, 'state>
?readCache) =
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface Store.ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<Store.StreamToken * 'state> =
let batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> batched
| Some (cache : Caching.Cache, prefix : string) ->
match cache.TryGet(prefix + snd containerStream) with
| None -> batched
| Some tokenAndState -> cached tokenAndState
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
member __.Load containerStream (log : ILogger): Async<Store.StreamToken * 'state> = async {
let! batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
| None -> return batched
| Some (cache : ICache, prefix : string) ->
let! cacheItem = cache.TryGet(prefix + snd containerStream)
match cacheItem with
| None -> return batched
| Some tokenAndState -> return! cached tokenAndState
}
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async<Store.SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
match res with
Expand Down Expand Up @@ -1004,7 +983,7 @@ type CachingStrategy =
/// Retain a single 'state per streamName, together with the associated etag
/// NB while a strategy like EventStore.Caching.SlidingWindowPrefixed is obviously easy to implement, the recommended approach is to
/// track all relevant data in the state, and/or have the `unfold` function ensure _all_ relevant events get held in the `u`nfolds in tip
| SlidingWindow of Caching.Cache * window: TimeSpan
| SlidingWindow of ICache * window: TimeSpan

[<NoComparison; NoEquality; RequireQualifiedAccess>]
type AccessStrategy<'event,'state> =
Expand Down
Loading

0 comments on commit ad6c052

Please sign in to comment.