Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Propulsion/Sinks.fs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ module Events =
let next: Event[] -> int64 = Streams.StreamSpan.next
/// The Index of the first event as supplied to this handler
let index: Event[] -> int64 = Streams.StreamSpan.index
/// Mark stream completed
let complete: Event[] -> int64 = Streams.StreamSpan.complete

/// Internal helpers used to compute buffer sizes for stats
module Event =
Expand Down
3 changes: 3 additions & 0 deletions src/Propulsion/Streams.fs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ module StreamSpan =

let inline index (span: FsCodec.ITimelineEvent<'F>[]) = span[0].Index
let inline next (span: FsCodec.ITimelineEvent<'F>[]) = let l = span[span.Length - 1] in if l.IsUnfold then l.Index else l.Index + 1L
let inline complete (span: FsCodec.ITimelineEvent<'F>[]) = -2L // Sentinel value for completed stream
let inline dropBefore i = function
| [||] as xs -> xs
| xs when next xs < i -> Array.empty
Expand Down Expand Up @@ -339,6 +340,8 @@ module Scheduling =
| Error malformed ->
// Flag that the data at the head of the stream is triggering a non-transient error condition from the handler, preventing any further handler dispatches for `stream`
merge stream (StreamState<'Format>.Create(ValueNone, null, Revision.initial, malformed = malformed)) |> ignore
| Ok (-2L, _dispatchedRevision as up: HandlerProgress) ->
states.Remove stream |> ignore // Explicitly drop state for stream; user indicated it has been fully processed.
| Ok (updatedPos, _dispatchedRevision as up: HandlerProgress) ->
// Ensure we have a position (in case it got purged); Drop any events or unfolds implied by updatedPos
merge stream (StreamState<'Format>.Create(ValueSome updatedPos, null, Revision.initial))
Expand Down
Loading