Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove hardwired top-level backoff #51

Merged
merged 1 commit into from
Nov 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 16 additions & 23 deletions src/Equinox/Equinox.fs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ type ICategory<'event, 'state> =
-> events: 'event list * state: 'state
-> Async<Storage.SyncResult<'state>>

/// Defines a hook enabling retry and backoff policies to be specified
type IRetryPolicy = abstract member Execute: log: ILogger * attemptNumber: int * call: Async<'T> -> Async<'T>

// Exception yielded by Handler.Decide after `count` attempts have yielded conflicts at the point of syncing with the Store
exception FlowAttemptsExceededException of count: int
exception MaxResyncsExhaustedException of count: int

/// Internal implementation of the Store agnostic load + run/render. See Handler for App-facing APIs.
module private Flow =
Expand All @@ -92,25 +95,14 @@ module private Flow =
member __.State = snd __.Memento
member __.CreateContext(): Context<'event, 'state> =
Context<'event, 'state>(fold, __.State)
member __.TryOrResync attempt (log : ILogger) eventsAndState =
member __.TryOrResync (retryPolicy : IRetryPolicy) (attemptNumber: int) (log : ILogger) eventsAndState =
let resyncInPreparationForRetry resync = async {
// According to https://github.com/EventStore/EventStore/issues/1652, backoffs should not be necessary for EventStore
// as the fact we use a Master connection to read Resync data should make it unnecessary
// However, empirically, the backoffs are needed in app code and hence need to live here for now
// TODO: make each store inject backoffs iff necessary
// See https://github.com/jet/equinox/issues/38
if attempt <> 1 then
match Backoff.defaultExponentialBoundedRandomized (attempt-2) with
| None -> ()
| Some ms ->
log.Information("Resync backoff for {Ms}", ms)
do! Async.Sleep ms
let! streamState' = resync
let! streamState' = retryPolicy.Execute(log, attemptNumber, resync)
tokenAndState <- streamState'
return false }
tryOr log eventsAndState resyncInPreparationForRetry
member __.TryOrThrow log eventsAndState attempt =
let throw _ = async { return raise <| FlowAttemptsExceededException attempt }
let throw _ = async { return raise <| MaxResyncsExhaustedException attempt }
tryOr log eventsAndState throw |> Async.Ignore

/// Obtain a representation of the current state and metadata from the underlying storage stream
Expand All @@ -124,7 +116,7 @@ module private Flow =
/// 2a. if no changes required, exit with known state
/// 2b. if saved without conflict, exit with updated state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
let run (sync : SyncState<'event, 'state>) (maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>)
let run (sync : SyncState<'event, 'state>) (retryPolicy: IRetryPolicy, maxSyncAttempts : int) (log : ILogger) (decide : Context<'event, 'state> -> Async<'result * 'event list>)
: Async<'result> =
if maxSyncAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxSyncAttempts", maxSyncAttempts, "should be >= 1")
/// Run a decision cycle - decide what events should be appended given the presented state
Expand All @@ -140,7 +132,7 @@ module private Flow =
do! sync.TryOrThrow log (events, ctx.State) attempt
return outcome
else
let! committed = sync.TryOrResync attempt log (events, ctx.State)
let! committed = sync.TryOrResync retryPolicy attempt log (events, ctx.State)
if not committed then
log.Debug "Resyncing and retrying"
return! loop (attempt + 1)
Expand All @@ -151,10 +143,10 @@ module private Flow =

/// Internal implementation providing a handler not associated with a specific log or stream
/// Not 'just' making it public; the plan is to have Stream.Handler represent the public interface until further significant patterns present
type HandlerImpl<'event, 'state>(fold, maxAttempts) =
type HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy) =
let execAsync stream log f = async { let! syncState = load fold log stream in return! f syncState }
let exec stream log f = execAsync stream log <| fun syncState -> async { return f syncState }
let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState maxAttempts log decideAsync }
let runFlow stream log decideAsync = execAsync stream log <| fun syncState -> async { return! run syncState (retryPolicy,maxAttempts) log decideAsync }

member __.Decide(stream : IStream<'event, 'state>, log : ILogger, flow: Context<'event, 'state> -> 'result) : Async<'result> =
runFlow stream log <| fun ctx -> async { let result = flow ctx in return result, ctx.Accumulated }
Expand Down Expand Up @@ -187,15 +179,16 @@ module Stream =
let ofMemento (memento : Storage.StreamToken * 'state) (x : IStream<_,_>) : IStream<'event, 'state> = InitializedStream(x, memento) :> _

/// Core Application-facing API. Wraps the handling of decision or query flow in a manner that is store agnostic
type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int) =
let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts)
type Handler<'event, 'state>(fold, log, stream : IStream<'event, 'state>, maxAttempts : int, ?retryPolicy) =
let retryPolicy = defaultArg retryPolicy ({ new IRetryPolicy with member __.Execute(_log, _attemptNumber, f) = async { return! f } })
let inner = Flow.HandlerImpl<'event, 'state>(fold, maxAttempts, retryPolicy)

/// 0. Invoke the supplied `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.Decide(flow : Context<'event, 'state> -> 'result) : Async<'result> =
inner.Decide(stream,log,flow)
/// 0. Invoke the supplied _Async_ `decide` function 1. attempt to sync the accumulated events to the stream 2. (contigent on success of 1) yield the outcome
/// Tries up to `maxAttempts` times in the case of a conflict, throwing FlowAttemptsExceededException` to signal failure.
/// Tries up to `maxAttempts` times in the case of a conflict, throwing MaxResyncsExhaustedException` to signal failure.
member __.DecideAsync(flowAsync : Context<'event, 'state> -> Async<'result>) : Async<'result> =
inner.DecideAsync(stream,log,flowAsync)
/// Low Level helper to allow one to obtain the complete state of a stream (including the position) in order to pass it within the application
Expand Down
1 change: 0 additions & 1 deletion src/Equinox/Equinox.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="Equinox.fs" />
</ItemGroup>

Expand Down
64 changes: 0 additions & 64 deletions src/Equinox/Infrastructure.fs

This file was deleted.