Skip to content

Commit

Permalink
Add consistency check
Browse files Browse the repository at this point in the history
  • Loading branch information
mahdihasnat committed Sep 24, 2024
1 parent c951eaf commit 61f713b
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions Kafka/Transition.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,20 @@ let rec refreshLogs (logKeys: List<LogKey>) node (f : Node -> TransitionResult)
| x :: xs ->
refreshLog x node (fun node -> refreshLogs xs node f)

let transition (node: Node) (action: Choice<Message<InputMessageBody>,unit>) : TransitionResult =
let assertConsistency (node: Node) : unit =
node.CachedMessages
|> Map.forall (fun logKey values ->
values
|> List.map fst
|> List.sort
|> fun offsets ->
offsets = List.init offsets.Length (fun i -> Offset i)
)
|> fun x ->
assert x

let transition (node: Node) (action: Choice<Message<InputMessageBody>,unit>) : TransitionResult =
assertConsistency node
match action with
| Choice2Of2 unit ->
(node, List.empty)
Expand Down Expand Up @@ -124,7 +136,7 @@ let transition (node: Node) (action: Choice<Message<InputMessageBody>,unit>) : T
offsets
|> Map.choosei (fun key offset ->
node.CachedMessages.TryFind key
|> Option.map (List.filter (fun (offset', _) -> offset <= offset'))
|> Option.map (List.filter (fun (offset', _) -> offset <= offset') >> List.sortBy fst)
)
let replyMessageBody: OutputMessageBody =
PollAck (messageId, messages)
Expand Down Expand Up @@ -194,3 +206,7 @@ let transition (node: Node) (action: Choice<Message<InputMessageBody>,unit>) : T
|> Option.get
|> fun f -> f node actualValue

let transitionOuter (node: Node) (action: Choice<Message<InputMessageBody>,unit>) : TransitionResult =
let oldNode = node
let node, messages = transition node action
node, messages

0 comments on commit 61f713b

Please sign in to comment.