Skip to content

Commit

Permalink
Split out Storage Infrastructure.fs
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Sep 22, 2019
1 parent e82d46c commit 1079f76
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 203 deletions.
2 changes: 1 addition & 1 deletion samples/Store/Integration/LogIntegration.fs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Samples.Store.Integration.LogIntegration

open Equinox.Cosmos.Integration
open Equinox.Store
open Equinox.Storage
open FSharp.UMX
open Swensen.Unquote
open System
Expand Down
70 changes: 36 additions & 34 deletions src/Equinox.Cosmos/Cosmos.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace Equinox.Cosmos.Store

open Equinox.Storage
open Equinox.Store
open FsCodec
open Microsoft.Azure.Documents
Expand Down Expand Up @@ -748,10 +749,10 @@ module internal Tip =

type [<NoComparison>] Token = { container: Container; stream: string; pos: Position }
module Token =
let create (container,stream) pos : Equinox.Store.StreamToken =
let create (container,stream) pos : StreamToken =
{ value = box { container = container; stream = stream; pos = pos }
version = pos.index }
let (|Unpack|) (token: Equinox.Store.StreamToken) : Container*string*Position = let t = unbox<Token> token.value in t.container,t.stream,t.pos
let (|Unpack|) (token: StreamToken) : Container*string*Position = let t = unbox<Token> token.value in t.container,t.stream,t.pos
let supersedes (Unpack (_,_,currentPos)) (Unpack (_,_,xPos)) =
let currentVersion, newVersion = currentPos.index, xPos.index
let currentETag, newETag = currentPos.etag, xPos.etag
Expand All @@ -763,13 +764,14 @@ module Internal =
type InternalSyncResult = Written of StreamToken | ConflictUnknown of StreamToken | Conflict of StreamToken * ITimelineEvent<byte[]>[]

[<RequireQualifiedAccess; NoComparison; NoEquality>]
type LoadFromTokenResult<'event> = Unchanged | Found of Equinox.Store.StreamToken * 'event[]
type LoadFromTokenResult<'event> = Unchanged | Found of StreamToken * 'event[]

namespace Equinox.Cosmos

open Equinox
open Equinox.Cosmos.Store
open Equinox.Store.Infrastructure
open Equinox.Storage
open Equinox.Store
open FsCodec
open FSharp.Control
open Microsoft.Azure.Documents
Expand Down Expand Up @@ -804,23 +806,23 @@ type Gateway(conn : Connection, batching : BatchingPolicy) =
| None -> None
| Some index -> xs |> Seq.skip index |> Seq.choose tryDecode |> Array.ofSeq |> Some
member __.Client = conn.Client
member __.LoadBackwardsStopping log (container, stream) (tryDecode,isOrigin): Async<Store.StreamToken * 'event[]> = async {
member __.LoadBackwardsStopping log (container, stream) (tryDecode,isOrigin): Async<StreamToken * 'event[]> = async {
let! pos, events = Query.walk log (container,stream) conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests Direction.Backward None (tryDecode,isOrigin)
Array.Reverse events
return Token.create (container,stream) pos, events }
member __.Read log (container,stream) direction startPos (tryDecode,isOrigin) : Async<Store.StreamToken * 'event[]> = async {
member __.Read log (container,stream) direction startPos (tryDecode,isOrigin) : Async<StreamToken * 'event[]> = async {
let! pos, events = Query.walk log (container,stream) conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests direction startPos (tryDecode,isOrigin)
return Token.create (container,stream) pos, events }
member __.ReadLazy (batching: BatchingPolicy) log (container,stream) direction startPos (tryDecode,isOrigin) : AsyncSeq<'event[]> =
Query.walkLazy log (container,stream) conn.QueryRetryPolicy batching.MaxItems batching.MaxRequests direction startPos (tryDecode,isOrigin)
member __.LoadFromUnfoldsOrRollingSnapshots log (containerStream,maybePos) (tryDecode,isOrigin): Async<Store.StreamToken * 'event[]> = async {
member __.LoadFromUnfoldsOrRollingSnapshots log (containerStream,maybePos) (tryDecode,isOrigin): Async<StreamToken * 'event[]> = async {
let! res = Tip.tryLoad log conn.TipRetryPolicy containerStream maybePos
match res with
| Tip.Result.NotFound -> return Token.create containerStream Position.fromKnownEmpty, Array.empty
| Tip.Result.NotModified -> return invalidOp "Not handled"
| Tip.Result.Found (pos, FromUnfold tryDecode isOrigin span) -> return Token.create containerStream pos, span
| _ -> return! __.LoadBackwardsStopping log containerStream (tryDecode,isOrigin) }
member __.GetPosition(log, containerStream, ?pos): Async<Store.StreamToken> = async {
member __.GetPosition(log, containerStream, ?pos): Async<StreamToken> = async {
let! res = Tip.tryLoad log conn.TipRetryPolicy containerStream pos
match res with
| Tip.Result.NotFound -> return Token.create containerStream Position.fromKnownEmpty
Expand All @@ -846,17 +848,17 @@ type Gateway(conn : Connection, batching : BatchingPolicy) =

type private Category<'event, 'state>(gateway : Gateway, codec : IUnionEncoder<'event, byte[]>) =
let (|TryDecodeFold|) (fold: 'state -> 'event seq -> 'state) initial (events: ITimelineEvent<byte[]> seq) : 'state = Seq.choose codec.TryDecode events |> fold initial
member __.Load includeUnfolds containerStream fold initial isOrigin (log : ILogger): Async<Store.StreamToken * 'state> = async {
member __.Load includeUnfolds containerStream fold initial isOrigin (log : ILogger): Async<StreamToken * 'state> = async {
let! token, events =
if not includeUnfolds then gateway.LoadBackwardsStopping log containerStream (codec.TryDecode,isOrigin)
else gateway.LoadFromUnfoldsOrRollingSnapshots log (containerStream,None) (codec.TryDecode,isOrigin)
return token, fold initial events }
member __.LoadFromToken (Token.Unpack streamPos, state: 'state as current) fold isOrigin (log : ILogger): Async<Store.StreamToken * 'state> = async {
member __.LoadFromToken (Token.Unpack streamPos, state: 'state as current) fold isOrigin (log : ILogger): Async<StreamToken * 'state> = async {
let! res = gateway.LoadFromToken(log, streamPos, (codec.TryDecode,isOrigin))
match res with
| LoadFromTokenResult.Unchanged -> return current
| LoadFromTokenResult.Found (token', events') -> return token', fold state events' }
member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log): Async<Store.SyncResult<'state>> = async {
member __.Sync(Token.Unpack (container,stream,pos), state as current, events, mapUnfolds, fold, isOrigin, log): Async<SyncResult<'state>> = async {
let state' = fold state (Seq.ofList events)
let exp,events,eventsEncoded,projectionsEncoded =
match mapUnfolds with
Expand All @@ -870,22 +872,22 @@ type private Category<'event, 'state>(gateway : Gateway, codec : IUnionEncoder<'
let batch = Sync.mkBatch stream eventsEncoded projections
let! res = gateway.Sync log (container,stream) (exp,batch)
match res with
| InternalSyncResult.Conflict (token',TryDecodeFold fold state events') -> return Store.SyncResult.Conflict (async { return token', events' })
| InternalSyncResult.ConflictUnknown _token' -> return Store.SyncResult.Conflict (__.LoadFromToken current fold isOrigin log)
| InternalSyncResult.Written token' -> return Store.SyncResult.Written (token', state') }
| InternalSyncResult.Conflict (token',TryDecodeFold fold state events') -> return SyncResult.Conflict (async { return token', events' })
| InternalSyncResult.ConflictUnknown _token' -> return SyncResult.Conflict (__.LoadFromToken current fold isOrigin log)
| InternalSyncResult.Written token' -> return SyncResult.Written (token', state') }

module Caching =
open System.Runtime.Caching
[<AllowNullLiteral>]
type CacheEntry<'state>(initialToken : Store.StreamToken, initialState :'state) =
type CacheEntry<'state>(initialToken : StreamToken, initialState :'state) =
let mutable currentToken, currentState = initialToken, initialState
member __.UpdateIfNewer (other : CacheEntry<'state>) =
lock __ <| fun () ->
let otherToken, otherState = other.Value
if otherToken |> Token.supersedes currentToken then
currentToken <- otherToken
currentState <- otherState
member __.Value : Store.StreamToken * 'state =
member __.Value : StreamToken * 'state =
lock __ <| fun () ->
currentToken, currentState

Expand All @@ -906,29 +908,29 @@ module Caching =
| x -> failwithf "TryGet Incompatible cache entry %A" x

/// Forwards all state changes in all streams of an ICategory to a `tee` function
type CategoryTee<'event, 'state>(inner: Store.ICategory<'event, 'state, Container*string>, tee : string -> Store.StreamToken * 'state -> unit) =
type CategoryTee<'event, 'state>(inner: ICategory<'event, 'state, Container*string>, tee : string -> StreamToken * 'state -> unit) =
let intercept streamName tokenAndState =
tee streamName tokenAndState
tokenAndState
let interceptAsync load streamName = async {
let! tokenAndState = load
return intercept streamName tokenAndState }
interface Store.ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger) : Async<Store.StreamToken * 'state> =
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger) : Async<StreamToken * 'state> =
interceptAsync (inner.Load containerStream log) (snd containerStream)
member __.TrySync (log : ILogger) (Token.Unpack (_container,stream,_) as streamToken,state) (events : 'event list)
: Async<Store.SyncResult<'state>> = async {
: Async<SyncResult<'state>> = async {
let! syncRes = inner.TrySync log (streamToken, state) events
match syncRes with
| Store.SyncResult.Conflict resync -> return Store.SyncResult.Conflict (interceptAsync resync stream)
| Store.SyncResult.Written (token', state') ->return Store.SyncResult.Written (intercept stream (token', state')) }
| SyncResult.Conflict resync -> return SyncResult.Conflict (interceptAsync resync stream)
| SyncResult.Written (token', state') ->return SyncResult.Written (intercept stream (token', state')) }

let applyCacheUpdatesWithSlidingExpiration
(cache: Cache)
(prefix: string)
(slidingExpiration : TimeSpan)
(category: Store.ICategory<'event, 'state, Container*string>)
: Store.ICategory<'event, 'state, Container*string> =
(category: ICategory<'event, 'state, Container*string>)
: ICategory<'event, 'state, Container*string> =
let policy = new CacheItemPolicy(SlidingExpiration = slidingExpiration)
let addOrUpdateSlidingExpirationCacheEntry streamName = CacheEntry >> cache.UpdateIfNewer policy (prefix + streamName)
CategoryTee<'event,'state>(category, addOrUpdateSlidingExpirationCacheEntry) :> _
Expand All @@ -939,8 +941,8 @@ type private Folder<'event, 'state>
mapUnfolds: Choice<unit,('event list -> 'state -> 'event seq),('event list -> 'state -> 'event list * 'event list)>,
?readCache) =
let inspectUnfolds = match mapUnfolds with Choice1Of3 () -> false | _ -> true
interface Store.ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<Store.StreamToken * 'state> =
interface ICategory<'event, 'state, Container*string> with
member __.Load containerStream (log : ILogger): Async<StreamToken * 'state> =
let batched = category.Load inspectUnfolds containerStream fold initial isOrigin log
let cached tokenAndState = category.LoadFromToken tokenAndState fold isOrigin log
match readCache with
Expand All @@ -950,11 +952,11 @@ type private Folder<'event, 'state>
| None -> batched
| Some tokenAndState -> cached tokenAndState
member __.TrySync (log : ILogger) (streamToken,state) (events : 'event list)
: Async<Store.SyncResult<'state>> = async {
: Async<SyncResult<'state>> = async {
let! res = category.Sync((streamToken,state), events, mapUnfolds, fold, isOrigin, log)
match res with
| Store.SyncResult.Conflict resync -> return Store.SyncResult.Conflict resync
| Store.SyncResult.Written (token',state') -> return Store.SyncResult.Written (token',state') }
| SyncResult.Conflict resync -> return SyncResult.Conflict resync
| SyncResult.Written (token',state') -> return SyncResult.Written (token',state') }

/// Holds Container state, coordinating initialization activities
type private ContainerWrapper(container : Container, ?initContainer : Container -> Async<unit>) =
Expand Down Expand Up @@ -1034,16 +1036,16 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching,
| Some (AccessStrategy.RollingUnfolds (isOrigin,transmute)) -> isOrigin, Choice3Of3 transmute
let cosmosCat = Category<'event, 'state>(context.Gateway, codec)
let folder = Folder<'event, 'state>(cosmosCat, fold, initial, isOrigin, mapUnfolds, ?readCache = readCacheOption)
let category : Store.ICategory<_,_,Container*string> =
let category : ICategory<_,_,Container*string> =
match caching with
| CachingStrategy.NoCaching -> folder :> _
| CachingStrategy.SlidingWindow(cache, window) ->
Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder

let resolveStream (streamId, maybeContainerInitializationGate) =
{ new Store.IStream<'event, 'state> with
{ new IStream<'event, 'state> with
member __.Load log = category.Load streamId log
member __.TrySync (log: ILogger) (token: Store.StreamToken, originState: 'state) (events: 'event list) =
member __.TrySync (log: ILogger) (token: StreamToken, originState: 'state) (events: 'event list) =
match maybeContainerInitializationGate with
| None -> category.TrySync log (token, originState) events
| Some init -> async {
Expand All @@ -1057,11 +1059,11 @@ type Resolver<'event, 'state>(context : Context, codec, fold, initial, caching,
match resolveTarget target, option with
| streamArgs,None -> resolveStream streamArgs
| (containerStream,maybeInit),Some AssumeEmpty ->
Store.Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit))
Stream.ofMemento (Token.create containerStream Position.fromKnownEmpty,initial) (resolveStream (containerStream,maybeInit))

member __.FromMemento(Token.Unpack (container,stream,_pos) as streamToken,state) =
let skipInitialization = None
Store.Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization))
Stream.ofMemento (streamToken,state) (resolveStream ((container,stream),skipInitialization))

[<RequireQualifiedAccess; NoComparison>]
type Discovery =
Expand Down
Loading

0 comments on commit 1079f76

Please sign in to comment.