-
Notifications
You must be signed in to change notification settings - Fork 68
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
12 changed files
with
108 additions
and
117 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
2 changes: 0 additions & 2 deletions
2
src/Equinox.Storage/StorageStream.fs → src/Equinox.Storage/Stream.fs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
namespace Equinox.Storage | ||
|
||
open Equinox.Store | ||
open Serilog | ||
open System | ||
open System.Diagnostics | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,75 +1,104 @@ | ||
/// Internal implementation of the Store agnostic load + run/render. See Stream.fs for App-facing APIs. | ||
module internal Equinox.Flow | ||
/// Internal data structures/impl. While these are intended to be legible, understanding the abstractions involved is only necessary if you are implementing a Store or a decorator thereof. | ||
/// i.e., if you're seeking to understand the main usage flows of the Equinox library, that's in Equinox.fs, not here | ||
namespace Equinox.Storage | ||
|
||
open Equinox.Store | ||
open Serilog | ||
|
||
/// Represents stream and folding state between the load and run/render phases | ||
type SyncState<'event, 'state> | ||
( originState : StreamToken * 'state, | ||
trySync : ILogger -> StreamToken * 'state -> 'event list -> Async<SyncResult<'state>>) = | ||
let mutable tokenAndState = originState | ||
/// Store-specific opaque token to be used for synchronization purposes | ||
[<NoComparison>] | ||
type StreamToken = { value : obj; version: int64 } | ||
|
||
member __.Memento = tokenAndState | ||
member __.State = snd __.Memento | ||
member __.Version = (fst __.Memento).version | ||
/// Internal type used to represent the outcome of a TrySync operation | ||
[<NoEquality; NoComparison; RequireQualifiedAccess>] | ||
type SyncResult<'state> = | ||
/// The write succeeded (the supplied token and state can be used to efficiently continue the processing iff desired) | ||
| Written of StreamToken * 'state | ||
/// The set of changes supplied to TrySync conflict with the present state of the underlying stream based on the configured policy for that store | ||
/// The inner is Async as some stores (and/or states) are such that determining the conflicting state (iff required) needs an extra trip to obtain | ||
| Conflict of Async<StreamToken * 'state> | ||
|
||
member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async { | ||
let! res = trySync log tokenAndState events | ||
match res with | ||
| SyncResult.Conflict resync -> | ||
return! handleFailureResync resync | ||
| SyncResult.Written (token', streamState') -> | ||
tokenAndState <- token', streamState' | ||
return true } | ||
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> = | ||
let resyncInPreparationForRetry resync = async { | ||
let! streamState' = runResync log attemptNumber resync | ||
tokenAndState <- streamState' | ||
return false } | ||
__.TryOr(log, events, resyncInPreparationForRetry) | ||
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code. | ||
type IStream<'event, 'state> = | ||
/// Obtain the state from the target stream | ||
abstract Load: log: ILogger | ||
-> Async<StreamToken * 'state> | ||
/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream | ||
/// SyncResult.Written: implies the state is now the value represented by the Result's value | ||
/// SyncResult.Conflict: implies the `events` were not synced; if desired the consumer can use the included resync workflow in order to retry | ||
abstract TrySync: log: ILogger | ||
-> token: StreamToken * originState: 'state | ||
-> events: 'event list | ||
-> Async<SyncResult<'state>> | ||
|
||
/// Process a command, ensuring a consistent final state is established on the stream. | ||
/// 1. make a decision predicated on the known state | ||
/// 2a. if no changes required, exit with known state | ||
/// 2b. if saved without conflict, exit with updated state | ||
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state | ||
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) | ||
(syncState : SyncState<'event, 'state>) | ||
(decide : 'state -> Async<'result * 'event list>) | ||
: Async<'result> = | ||
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") | ||
/// Run a decision cycle - decide what events should be appended given the presented state | ||
let rec loop attempt: Async<'result> = async { | ||
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) | ||
let! result, events = decide syncState.State | ||
if List.isEmpty events then | ||
log.Debug "No events generated" | ||
return result | ||
elif attempt = maxSyncAttempts then | ||
log.Debug "Max Sync Attempts exceeded" | ||
let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false }) | ||
/// Internal implementation of the Store agnostic load + run/render. See Equinox.fs for App-facing APIs. | ||
module internal Flow = | ||
|
||
if not comitted then | ||
return raise (createMaxAttemptsExhaustedException attempt) | ||
else | ||
/// Represents stream and folding state between the load and run/render phases | ||
type SyncState<'event, 'state> | ||
( originState : StreamToken * 'state, | ||
trySync : ILogger -> StreamToken * 'state -> 'event list -> Async<SyncResult<'state>>) = | ||
let mutable tokenAndState = originState | ||
|
||
member __.Memento = tokenAndState | ||
member __.State = snd __.Memento | ||
member __.Version = (fst __.Memento).version | ||
|
||
member __.TryOr(log, events, handleFailureResync : (Async<StreamToken*'state> -> Async<bool>)) : Async<bool> = async { | ||
let! res = trySync log tokenAndState events | ||
match res with | ||
| SyncResult.Conflict resync -> | ||
return! handleFailureResync resync | ||
| SyncResult.Written (token', streamState') -> | ||
tokenAndState <- token', streamState' | ||
return true } | ||
member __.TryOrResync(runResync, attemptNumber: int, log : ILogger, events) : Async<bool> = | ||
let resyncInPreparationForRetry resync = async { | ||
let! streamState' = runResync log attemptNumber resync | ||
tokenAndState <- streamState' | ||
return false } | ||
__.TryOr(log, events, resyncInPreparationForRetry) | ||
|
||
/// Process a command, ensuring a consistent final state is established on the stream. | ||
/// 1. make a decision predicated on the known state | ||
/// 2a. if no changes required, exit with known state | ||
/// 2b. if saved without conflict, exit with updated state | ||
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state | ||
let run (log : ILogger) (maxSyncAttempts : int, resyncRetryPolicy, createMaxAttemptsExhaustedException) | ||
(syncState : SyncState<'event, 'state>) | ||
(decide : 'state -> Async<'result * 'event list>) | ||
: Async<'result> = | ||
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1") | ||
/// Run a decision cycle - decide what events should be appended given the presented state | ||
let rec loop attempt: Async<'result> = async { | ||
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) | ||
let! result, events = decide syncState.State | ||
if List.isEmpty events then | ||
log.Debug "No events generated" | ||
return result | ||
else | ||
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) | ||
if not committed then | ||
log.Debug "Resyncing and retrying" | ||
return! loop (attempt + 1) | ||
elif attempt = maxSyncAttempts then | ||
log.Debug "Max Sync Attempts exceeded" | ||
let! comitted = syncState.TryOr(log, events, fun _resync -> async { return false }) | ||
|
||
if not comitted then | ||
return raise (createMaxAttemptsExhaustedException attempt) | ||
else | ||
return result | ||
else | ||
return result } | ||
/// Commence, processing based on the incoming state | ||
loop 1 | ||
let! committed = syncState.TryOrResync(resyncRetryPolicy, attempt, log, events) | ||
if not committed then | ||
log.Debug "Resyncing and retrying" | ||
return! loop (attempt + 1) | ||
else | ||
return result } | ||
/// Commence, processing based on the incoming state | ||
loop 1 | ||
|
||
let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async { | ||
let! streamState = stream.Load log | ||
let syncState = SyncState(streamState, stream.TrySync) | ||
return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide } | ||
let transact (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) (stream : IStream<_,_>, log) decide : Async<'result> = async { | ||
let! streamState = stream.Load log | ||
let syncState = SyncState(streamState, stream.TrySync) | ||
return! run log (maxAttempts,resyncRetryPolicy,createMaxAttemptsExhaustedException) syncState decide } | ||
|
||
let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async { | ||
let! streamState = stream.Load log | ||
let syncState = SyncState(streamState, stream.TrySync) | ||
return project syncState } | ||
let query (stream : IStream<'event, 'state>, log : ILogger, project: SyncState<'event,'state> -> 'result) : Async<'result> = async { | ||
let! streamState = stream.Load log | ||
let syncState = SyncState(streamState, stream.TrySync) | ||
return project syncState } |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters