Skip to content

Commit 53fd6f7

Browse files
committed
Experimental implementation of ObjectDiffusion V2 (Inbound side)
1 parent f462de1 commit 53fd6f7

File tree

7 files changed

+1917
-0
lines changed

7 files changed

+1917
-0
lines changed

ouroboros-consensus/ouroboros-consensus.cabal

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ library
193193
Ouroboros.Consensus.MiniProtocol.LocalTxSubmission.Server
194194
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1
195195
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V1.State
196+
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
197+
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Decision
198+
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
199+
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.State
200+
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types
196201
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.API
197202
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.ObjectPool.PerasCert
198203
Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Outbound
@@ -354,6 +359,8 @@ library
354359
primitive,
355360
psqueues ^>=0.2.3,
356361
quiet ^>=0.2,
362+
random,
363+
random-shuffle,
357364
rawlock ^>=0.1.1,
358365
resource-registry ^>=0.1,
359366
semialign >=1.1,
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
{-# LANGUAGE BlockArguments #-}
2+
{-# LANGUAGE DataKinds #-}
3+
{-# LANGUAGE GADTs #-}
4+
{-# LANGUAGE ImportQualifiedPost #-}
5+
{-# LANGUAGE KindSignatures #-}
6+
{-# LANGUAGE LambdaCase #-}
7+
{-# LANGUAGE NamedFieldPuns #-}
8+
{-# LANGUAGE ScopedTypeVariables #-}
9+
10+
module Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2
11+
( -- * ObjectDiffusion Inbound client
12+
objectDiffusionInbound
13+
14+
-- * PeerStateAPI
15+
, withObjectDiffusionInboundPeer
16+
, PeerStateAPI
17+
18+
-- * Supporting types
19+
, module V2
20+
, PeerDecisionChannelsVar
21+
, newPeerDecisionChannelsVar
22+
, DecisionPolicy (..)
23+
) where
24+
25+
import Control.Concurrent.Class.MonadSTM (MonadSTM, atomically)
26+
import Control.Monad.Class.MonadThrow
27+
import Control.Tracer (Tracer, traceWith)
28+
import Data.List.NonEmpty qualified as NonEmpty
29+
import Data.Set qualified as Set
30+
import Network.TypedProtocol
31+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Registry
32+
import Ouroboros.Consensus.MiniProtocol.ObjectDiffusion.Inbound.V2.Types as V2
33+
import Ouroboros.Network.ControlMessage (ControlMessage (..), ControlMessageSTM)
34+
import Ouroboros.Network.Protocol.ObjectDiffusion.Inbound
35+
36+
-- | A object-diffusion inbound side (client).
37+
--
38+
-- The steps are as follow
39+
-- 1. Block on next decision from the decision logic
40+
-- 2. Handle any available reply (`goCollect`)
41+
-- 3. Request new objects if possible (`goReqObjects`)
42+
-- 4. Request new ids (also responsible for ack) (`goReqIds`)
43+
-- 5. Signal psaOnDecisionCompleted (as part of `goReqIds{Blocking,NonBlocking}`)
44+
-- And loop again
45+
--
46+
-- The architecture/code org of this module should make sure we don't go again
47+
-- into `goIdle` until `psaOnDecisionCompleted` has been called
48+
--
49+
-- NOTE: each `go____` function is responsible for calling the next one in order
50+
-- to continue the protocol.
51+
-- E.g. `goReqObjects` will call `goReqIds` whatever the outcome of its logic is.
52+
objectDiffusionInbound ::
53+
forall objectId object m.
54+
( MonadThrow m
55+
, MonadSTM m
56+
) =>
57+
Tracer m (TraceObjectDiffusionInbound objectId object) ->
58+
ControlMessageSTM m ->
59+
PeerStateAPI m objectId object ->
60+
ObjectDiffusionInboundPipelined objectId object m ()
61+
objectDiffusionInbound
62+
tracer
63+
controlMessageSTM
64+
PeerStateAPI
65+
{ psaReadDecision
66+
, psaOnDecisionCompleted
67+
, psaOnRequestIds
68+
, psaOnRequestObjects
69+
, psaOnReceiveIds
70+
, psaOnReceiveObjects
71+
} =
72+
ObjectDiffusionInboundPipelined $ goIdle Zero
73+
where
74+
goIdle :: forall (n :: N). Nat n -> InboundStIdle n objectId object m ()
75+
goIdle n = WithEffect $ do
76+
ctrlMsg <- atomically controlMessageSTM
77+
traceWith tracer $ TraceObjectDiffusionInboundReceivedControlMessage ctrlMsg
78+
case ctrlMsg of
79+
-- The peer selection governor is asking us to terminate the connection.
80+
Terminate ->
81+
pure $ terminateAfterDrain n
82+
-- Otherwise, we can continue the protocol normally.
83+
_continue -> do
84+
-- Block on next decision.
85+
decision <- psaReadDecision
86+
traceWith tracer (TraceObjectDiffusionInboundReceivedDecision decision)
87+
pure $ goCollect n decision
88+
89+
-- \| Block until all replies of pipelined requests have been received, then
90+
-- sends `MsgDone` to terminate the protocol.
91+
terminateAfterDrain ::
92+
Nat n -> InboundStIdle n objectId object m ()
93+
terminateAfterDrain = \case
94+
Zero -> WithEffect $ do
95+
traceWith tracer TraceObjectDiffusionInboundTerminated
96+
pure $ SendMsgDone ()
97+
Succ n -> CollectPipelined Nothing $ \_ignoredMsg -> terminateAfterDrain n
98+
99+
-- \| Handle potential available replies before continuing with `goReqObjects`.
100+
--
101+
-- If there are no pipelined requests, this will directly call `goReqObjects`.
102+
-- If there are pipelined requests, it will collect as many replies as
103+
-- possible before continuing with `goReqObjects` once no more replies are
104+
-- immediately available.
105+
goCollect :: Nat n -> PeerDecision objectId object -> InboundStIdle n objectId object m ()
106+
goCollect Zero decision =
107+
goReqObjects Zero decision
108+
goCollect (Succ n) decision =
109+
CollectPipelined
110+
(Just $ goReqObjects (Succ n) decision)
111+
( \case
112+
CollectObjectIds numIdsRequested ids -> WithEffect $ do
113+
psaOnReceiveIds numIdsRequested ids
114+
pure $ goCollect n decision
115+
CollectObjects _objectIds objects -> WithEffect $ do
116+
-- TODO: We could try to validate objects here, i.e.
117+
-- as early as possible, instead of validating them when adding
118+
-- them to the ObjectPool, in order to pivot away from
119+
-- adversarial peers as soon as possible.
120+
psaOnReceiveObjects objects
121+
pure $ goCollect n decision
122+
)
123+
124+
-- \| Request objects, if the set of ids of objects to request in the
125+
-- decision is non-empty.
126+
-- Regardless, it will ultimately call `goReqIds`.
127+
goReqObjects ::
128+
Nat n ->
129+
PeerDecision objectId object ->
130+
InboundStIdle n objectId object m ()
131+
goReqObjects n decision = do
132+
let objectIds = rodObjectsToReqIds (pdReqObjects decision)
133+
if Set.null objectIds
134+
then
135+
goReqIds n decision
136+
else WithEffect $ do
137+
psaOnRequestObjects objectIds
138+
pure $
139+
SendMsgRequestObjectsPipelined
140+
(Set.toList objectIds)
141+
(goReqIds (Succ n) decision)
142+
143+
-- \| Request objectIds, either in a blocking or pipelined fashion depending
144+
-- on the decision's `ridCanPipelineIdsRequests` flag.
145+
-- In both cases, once done, we will ultimately call `psaOnDecisionCompleted`
146+
-- and return to `goIdle`.
147+
goReqIds ::
148+
forall (n :: N).
149+
Nat n ->
150+
PeerDecision objectId object ->
151+
InboundStIdle n objectId object m ()
152+
goReqIds n decision = do
153+
let canPipelineIdRequests = ridCanPipelineIdsRequests (pdReqIds decision)
154+
if canPipelineIdRequests
155+
then goReqIdsPipelined n decision
156+
else case n of
157+
Zero -> goReqIdsBlocking decision
158+
Succ{} -> error "Impossible to have pipelined requests when we have no known unacknowledged objectIds"
159+
160+
-- \| Request objectIds in a blocking fashion if the number to request in the
161+
-- decision is non-zero.
162+
-- Regardless, it will ultimately call `psaOnDecisionCompleted` and return to
163+
-- `goIdle`.
164+
goReqIdsBlocking ::
165+
PeerDecision objectId object ->
166+
InboundStIdle Z objectId object m ()
167+
goReqIdsBlocking decision = WithEffect $ do
168+
let numIdsToAck = ridNumIdsToAck (pdReqIds decision)
169+
let numIdsToReq = ridNumIdsToReq (pdReqIds decision)
170+
if numIdsToReq == 0
171+
then do
172+
psaOnDecisionCompleted
173+
pure $ goIdle Zero
174+
else do
175+
psaOnRequestIds numIdsToAck numIdsToReq
176+
psaOnDecisionCompleted
177+
pure $
178+
SendMsgRequestObjectIdsBlocking
179+
numIdsToAck
180+
numIdsToReq
181+
( \objectIds -> WithEffect $ do
182+
psaOnReceiveIds numIdsToReq (NonEmpty.toList objectIds)
183+
pure $ goIdle Zero
184+
)
185+
186+
-- \| Request objectIds in a pipelined fashion if the number to request in the
187+
-- decision is non-zero.
188+
-- Regardless, it will ultimately call `psaOnDecisionCompleted` and return to
189+
-- `goIdle`.
190+
goReqIdsPipelined ::
191+
forall (n :: N).
192+
Nat n ->
193+
PeerDecision objectId object ->
194+
InboundStIdle n objectId object m ()
195+
goReqIdsPipelined n decision = WithEffect $ do
196+
let numIdsToAck = ridNumIdsToAck (pdReqIds decision)
197+
let numIdsToReq = ridNumIdsToReq (pdReqIds decision)
198+
if numIdsToReq == 0
199+
then do
200+
psaOnDecisionCompleted
201+
pure $ goIdle n
202+
else do
203+
psaOnRequestIds numIdsToAck numIdsToReq
204+
psaOnDecisionCompleted
205+
pure $
206+
SendMsgRequestObjectIdsPipelined
207+
numIdsToAck
208+
numIdsToReq
209+
(goIdle (Succ n))

0 commit comments

Comments
 (0)