From 453672d219c6d88a1121034926553d348d304164 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 22 Sep 2019 21:27:56 +0100 Subject: [PATCH] Remove Store namespace; Files reorg --- DOCUMENTATION.md | 14 +- src/Equinox.Cosmos/Cosmos.fs | 2 - src/Equinox.EventStore/EventStore.fs | 1 - src/Equinox.MemoryStore/MemoryStore.fs | 1 - src/Equinox.Storage/Equinox.Storage.fsproj | 2 +- .../{StorageStream.fs => Stream.fs} | 2 - src/Equinox.Storage/Types.fs | 1 - src/Equinox/{Stream.fs => Equinox.fs} | 5 +- src/Equinox/Equinox.fsproj | 5 +- src/Equinox/Flow.fs | 157 +++++++++++------- src/Equinox/Interfaces.fs | 31 ---- .../EventStoreTokenTests.fs | 4 +- 12 files changed, 108 insertions(+), 117 deletions(-) rename src/Equinox.Storage/{StorageStream.fs => Stream.fs} (98%) rename src/Equinox/{Stream.fs => Equinox.fs} (94%) delete mode 100644 src/Equinox/Interfaces.fs diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 7a30ea46d..6f6f19bd8 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -229,14 +229,14 @@ while these are not omnipresent, for the purposes of this discussion we’ll tre ### Flows, Streams and Accumulators -Equinox’s Command Handling consists of < 250 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch are in a normal application are: +Equinox’s Command Handling consists of < 200 lines including interfaces and comments in https://github.com/jet/equinox/tree/master/src/Equinox - the elements you'll touch in a normal application are: -- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs#L36) - internal implementation of Optimistic Concurrency Control / retry loop used by `Stream`. It's recommended to at least scan this file as it defines the Transaction semantics everything is coming together in service of. -- [`type Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L11) - surface API one uses to `Transact` or `Query` against a specific stream -- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Stream.fs#L39) - used to identify the Stream pertaining to the relevant Aggregate that `resolveStream` will use to hydrate a `Stream` -- _[`type Accumulator`](https://github.com/jet/equinox/blob/master/src/Equinox/Accumulator.fs) - optional `type` that can be used to manage application-local State in some flavors of Service__ +- [`module Flow`](https://github.com/jet/equinox/blob/master/src/Equinox/Flow.fs#L34) - internal implementation of Optimistic Concurrency Control / retry loop used by `Stream`. It's recommended to at least scan this file as it defines the Transaction semantics everything is coming together in service of. +- [`type Stream`](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L11) - surface API one uses to `Transact` or `Query` against a specific stream +- [`type Target` Discriminated Union](https://github.com/jet/equinox/blob/master/src/Equinox/Equinox.fs#L42) - used to identify the Stream pertaining to the relevant Aggregate that `resolveStream` will use to hydrate a `Stream` +- _[`type Accumulator`](https://github.com/jet/equinox/blob/master/src/Equinox/Accumulator.fs) - optional `type` that can be used to manage application-local State in some complex flavors of Service__ -Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the few hundred lines can tell many of the thousands of words about to follow! +Its recommended to read the examples in conjunction with perusing the code in order to see the relatively simple implementations that underlie the abstractions; the 3 files can tell many of the thousands of words about to follow! #### Stream Members @@ -342,7 +342,7 @@ type Service(log, stream, ?maxAttempts) = inner.Query id ``` -The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Stream.fs`](src/Equinox/Stream.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs). +The `Stream`-related functions in a given Aggregate establish the access patterns used across when Service methods access streams (see below). Typically these are relatively straightforward calls forwarding to a `Equinox.Stream` equivalent (see [`src/Equinox/Equinox.fs`](src/Equinox/Equinox.fs)), which in turn use the Optimistic Concurrency retry-loop in [`src/Equinox/Flow.fs`](src/Equinox/Flow.fs). `Read` above will do a roundtrip to the Store in order to fetch the most recent state (while this can be optimized by reading through the cache, each invocation will hit the store regardless). This Synchronous Read can be used to [Read-your-writes](https://en.wikipedia.org/wiki/Consistency_model#Read-your-writes_Consistency) to establish a state incorporating the effects of any Command invocation you know to have been completed. diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 39e107590..9c7cdd6cf 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -1,7 +1,6 @@ namespace Equinox.Cosmos.Store open Equinox.Storage -open Equinox.Store open FsCodec open Microsoft.Azure.Documents open Newtonsoft.Json @@ -771,7 +770,6 @@ namespace Equinox.Cosmos open Equinox open Equinox.Cosmos.Store open Equinox.Storage -open Equinox.Store open FsCodec open FSharp.Control open Microsoft.Azure.Documents diff --git a/src/Equinox.EventStore/EventStore.fs b/src/Equinox.EventStore/EventStore.fs index 8bc9fbb43..c05681237 100644 --- a/src/Equinox.EventStore/EventStore.fs +++ b/src/Equinox.EventStore/EventStore.fs @@ -2,7 +2,6 @@ open Equinox open Equinox.Storage -open Equinox.Store open EventStore.ClientAPI open Serilog // NB must shadow EventStore.ClientAPI.ILogger open System diff --git a/src/Equinox.MemoryStore/MemoryStore.fs b/src/Equinox.MemoryStore/MemoryStore.fs index f3450f85f..64c9a2a5b 100644 --- a/src/Equinox.MemoryStore/MemoryStore.fs +++ b/src/Equinox.MemoryStore/MemoryStore.fs @@ -5,7 +5,6 @@ namespace Equinox.MemoryStore open Equinox open Equinox.Storage -open Equinox.Store open Serilog open System.Runtime.InteropServices diff --git a/src/Equinox.Storage/Equinox.Storage.fsproj b/src/Equinox.Storage/Equinox.Storage.fsproj index 9b23c57dc..95edfada1 100644 --- a/src/Equinox.Storage/Equinox.Storage.fsproj +++ b/src/Equinox.Storage/Equinox.Storage.fsproj @@ -12,7 +12,7 @@ - + diff --git a/src/Equinox.Storage/StorageStream.fs b/src/Equinox.Storage/Stream.fs similarity index 98% rename from src/Equinox.Storage/StorageStream.fs rename to src/Equinox.Storage/Stream.fs index 3a8ff13cc..b82f636c8 100644 --- a/src/Equinox.Storage/StorageStream.fs +++ b/src/Equinox.Storage/Stream.fs @@ -1,8 +1,6 @@ /// Low level stream builders, generally consumed via Store-specific Stream Builders that layer policies such as Caching in at the Category level module Equinox.Storage.Stream -open Equinox.Store - /// Represents a specific stream in a ICategory type private Stream<'event, 'state, 'streamId>(category : ICategory<'event, 'state, 'streamId>, streamId: 'streamId) = interface IStream<'event, 'state> with diff --git a/src/Equinox.Storage/Types.fs b/src/Equinox.Storage/Types.fs index e715732a5..649d01181 100644 --- a/src/Equinox.Storage/Types.fs +++ b/src/Equinox.Storage/Types.fs @@ -1,6 +1,5 @@ namespace Equinox.Storage -open Equinox.Store open Serilog open System open System.Diagnostics diff --git a/src/Equinox/Stream.fs b/src/Equinox/Equinox.fs similarity index 94% rename from src/Equinox/Stream.fs rename to src/Equinox/Equinox.fs index fd25faf46..3c97a00a8 100644 --- a/src/Equinox/Stream.fs +++ b/src/Equinox/Equinox.fs @@ -1,5 +1,6 @@ namespace Equinox +open Equinox.Storage open System.Runtime.InteropServices // Exception yielded by Stream.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store @@ -8,7 +9,7 @@ type MaxResyncsExhaustedException(count) = /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Stream<'event, 'state> - ( log, stream : Store.IStream<'event, 'state>, maxAttempts : int, + ( log, stream : Storage.IStream<'event, 'state>, maxAttempts : int, []?mkAttemptsExhaustedException, []?resyncPolicy) = let transact f = @@ -34,7 +35,7 @@ type Stream<'event, 'state> /// Low-level helper to allow one to obtain a reference to a stream and state pair (including the position) in order to pass it as a continuation within the application /// Such a memento is then held within the application and passed in lieu of a StreamId to the StreamResolver in order to avoid having to reload state - member __.CreateMemento(): Async = Flow.query(stream, log, fun syncState -> syncState.Memento) + member __.CreateMemento(): Async = Flow.query(stream, log, fun syncState -> syncState.Memento) /// Store-agnostic way to specify a target Stream to a Resolver [] diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index fabaf5de3..ee83af6a5 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -9,14 +9,13 @@ - - + - + diff --git a/src/Equinox/Flow.fs b/src/Equinox/Flow.fs index ebfabe6a1..2b408f884 100644 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Flow.fs @@ -1,75 +1,104 @@ -/// Internal implementation of the Store agnostic load + run/render. See Stream.fs for App-facing APIs. -module internal Equinox.Flow +/// Internal data structures/impl. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implementing a Store or a decorator thereof. +/// i.e., if you're seeking to understand the main usage flows of the Equinox library, that's in Equinox.fs, not here +namespace Equinox.Storage -open Equinox.Store open Serilog -/// Represents stream and folding state between the load and run/render phases -type SyncState<'event, 'state> - ( originState : StreamToken * 'state, - trySync : ILogger -> StreamToken * 'state -> 'event list -> Async>) = - let mutable tokenAndState = originState +/// Store-specific opaque token to be used for synchronization purposes +[] +type StreamToken = { value : obj; version: int64 } - member __.Memento = tokenAndState - member __.State = snd __.Memento - member __.Version = (fst __.Memento).version +/// Internal type used to represent the outcome of a TrySync operation +[] +type SyncResult<'state> = + /// The write succeeded (the supplied token and state can be used to efficiently continue the processing iff desired) + | Written of StreamToken * 'state + /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store + /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (iff required) needs an extra trip to obtain + | Conflict of Async - member __.TryOr(log, events, handleFailureResync : (Async -> Async)) : Async = async { - let! res = trySync log tokenAndState events - match res with - | SyncResult.Conflict resync -> - return! handleFailureResync resync - | SyncResult.Written (token', streamState') -> - tokenAndState <- token', streamState' - return true } - member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async = - let resyncInPreparationForRetry resync = async { - let! streamState' = runResync log attemptNumber resync - tokenAndState <- streamState' - return false } - __.TryOr(log, events, resyncInPreparationForRetry) +/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. +type IStream<'event, 'state> = + /// Obtain the state from the target stream + abstract Load: log: ILogger + -> Async + /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream + /// SyncResult.Written: implies the state is now the value represented by the Result's value + /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry + abstract TrySync: log: ILogger + -> token: StreamToken * originState: 'state + -> events: 'event list + -> Async> -/// Process a command, ensuring a consistent final state is established on the stream. -/// 1. make a decision predicated on the known state -/// 2a. if no changes required, exit with known state -/// 2b. if saved without conflict, exit with updated state -/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state -let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) - (syncState : SyncState<'event, 'state>) - (decide : 'state -> Async<'result * 'event list>) - : Async<'result> = - if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") - /// Run a decision cycle - decide what events should be appended given the presented state - let rec loop attempt: Async<'result> = async { - let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - let! result, events = decide syncState.State - if List.isEmpty events then - log.Debug "No events generated" - return result - elif attempt = maxSyncAttempts then - log.Debug "Max Sync Attempts exceeded" - let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false }) +/// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs. +module internal Flow = - if not comitted then - return raise (createMaxAttemptsExhaustedException attempt) - else + /// Represents stream and folding state between the load and run/render phases + type SyncState<'event, 'state> + ( originState : StreamToken * 'state, + trySync : ILogger -> StreamToken * 'state -> 'event list -> Async>) = + let mutable tokenAndState = originState + + member __.Memento = tokenAndState + member __.State = snd __.Memento + member __.Version = (fst __.Memento).version + + member __.TryOr(log, events, handleFailureResync : (Async -> Async)) : Async = async { + let! res = trySync log tokenAndState events + match res with + | SyncResult.Conflict resync -> + return! handleFailureResync resync + | SyncResult.Written (token', streamState') -> + tokenAndState <- token', streamState' + return true } + member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async = + let resyncInPreparationForRetry resync = async { + let! streamState' = runResync log attemptNumber resync + tokenAndState <- streamState' + return false } + __.TryOr(log, events, resyncInPreparationForRetry) + + /// Process a command, ensuring a consistent final state is established on the stream. + /// 1. make a decision predicated on the known state + /// 2a. if no changes required, exit with known state + /// 2b. if saved without conflict, exit with updated state + /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state + let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) + (syncState : SyncState<'event, 'state>) + (decide : 'state -> Async<'result * 'event list>) + : Async<'result> = + if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") + /// Run a decision cycle - decide what events should be appended given the presented state + let rec loop attempt: Async<'result> = async { + let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) + let! result, events = decide syncState.State + if List.isEmpty events then + log.Debug "No events generated" return result - else - let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) - if not committed then - log.Debug "Resyncing and retrying" - return! loop (attempt + 1) + elif attempt = maxSyncAttempts then + log.Debug "Max Sync Attempts exceeded" + let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false }) + + if not comitted then + return raise (createMaxAttemptsExhaustedException attempt) + else + return result else - return result } - /// Commence, processing based on the incoming state - loop 1 + let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) + if not committed then + log.Debug "Resyncing and retrying" + return! loop (attempt + 1) + else + return result } + /// Commence, processing based on the incoming state + loop 1 -let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async { - let! streamState = stream.Load log - let syncState = SyncState(streamState, stream.TrySync) - return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide } + let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async { + let! streamState = stream.Load log + let syncState = SyncState(streamState, stream.TrySync) + return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide } -let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async { - let! streamState = stream.Load log - let syncState = SyncState(streamState, stream.TrySync) - return project syncState } \ No newline at end of file + let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async { + let! streamState = stream.Load log + let syncState = SyncState(streamState, stream.TrySync) + return project syncState } \ No newline at end of file diff --git a/src/Equinox/Interfaces.fs b/src/Equinox/Interfaces.fs deleted file mode 100644 index dbbbcc829..000000000 --- a/src/Equinox/Interfaces.fs +++ /dev/null @@ -1,31 +0,0 @@ -/// Internal data stuctures. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implemening a Store or a decorator thereof. -/// i.e., if you're seeking to understand the core deliverables of the Equinox library, that's in Stream.fs, not here -namespace Equinox.Store - -open Serilog - -/// Store-specific opaque token to be used for synchronization purposes -[] -type StreamToken = { value : obj; version: int64 } - -/// Internal type used to represent the outcome of a TrySync operation -[] -type SyncResult<'state> = - /// The write succeeded (the supplied token and state can be used to efficiently continue the processing iff desired) - | Written of StreamToken * 'state - /// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store - /// The inner is Async as some stores (and/or states) are such that determining the conflicting state (iff required) needs an extra trip to obtain - | Conflict of Async - -/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. -type IStream<'event, 'state> = - /// Obtain the state from the target stream - abstract Load: log: ILogger - -> Async - /// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream - /// SyncResult.Written: implies the state is now the value represented by the Result's value - /// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry - abstract TrySync: log: ILogger - -> token: StreamToken * originState: 'state - -> events: 'event list - -> Async> \ No newline at end of file diff --git a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs index 60f674224..a29dad3ab 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs @@ -1,12 +1,12 @@ module Equinox.EventStore.Tests.EventStoreTokenTests -open Equinox open Equinox.EventStore +open Equinox.Storage open FsCheck.Xunit open Swensen.Unquote.Assertions open Xunit -let unpack (Token.Unpack token : Store.StreamToken) = +let unpack (Token.Unpack token : StreamToken) = token.pos.streamVersion, token.pos.compactionEventNumber, token.pos.batchCapacityLimit [