Skip to content

[WIP][Immutable Models M2] messages.BlockResponse, messages.ClusterBlockResponse: message/internal split + validation #7740

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 9 commits into
base: andron/7696-define-UntrustedMessage-interface
Choose a base branch
from
Draft
41 changes: 6 additions & 35 deletions engine/collection/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/state/cluster"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
"github.com/onflow/flow-go/utils/rand"
)

Expand Down Expand Up @@ -150,40 +149,12 @@ func (e *Engine) setupResponseMessageHandler() error {
},
engine.Pattern{
Match: func(msg *engine.Message) bool {
_, ok := msg.Payload.(*messages.ClusterBlockResponse)
_, ok := msg.Payload.(*clustermodel.BlockResponse)
if ok {
e.metrics.MessageReceived(metrics.EngineClusterSynchronization, metrics.MessageBlockResponse)
}
return ok
},
Map: func(msg *engine.Message) (*engine.Message, bool) {
blockResponse, ok := msg.Payload.(*messages.ClusterBlockResponse)
if !ok {
// should never happen, unless there is a bug.
e.log.Fatal().
Hex("origin_id", logging.ID(msg.OriginID)).
Interface("payload", msg.Payload).
Msg("cannot match the payload to ClusterBlockResponse")
return nil, false
}
proposals, err := blockResponse.BlocksInternal()
if err != nil {
// TODO(BFT, #7620): Replace this log statement with a call to the protocol violation consumer.
e.log.Warn().
Hex("origin_id", logging.ID(msg.OriginID)).
Uint64("nonce", blockResponse.Nonce).
Int("block_count", len(blockResponse.Blocks)).
Err(err).
Msgf("cannot convert untrusted proposal to trusted proposal")
e.metrics.InboundMessageDropped(metrics.EngineClusterSynchronization, metrics.MessageBlockProposal)
return nil, false
}

return &engine.Message{
OriginID: msg.OriginID,
Payload: proposals,
}, true
},
Store: e.pendingBlockResponses,
},
)
Expand Down Expand Up @@ -260,7 +231,7 @@ func (e *Engine) process(originID flow.Identifier, event interface{}) error {
switch event.(type) {
case *messages.RangeRequest, *messages.BatchRequest, *messages.SyncRequest:
return e.requestHandler.process(originID, event)
case *messages.SyncResponse, *messages.ClusterBlockResponse:
case *messages.SyncResponse, *clustermodel.BlockResponse:
return e.responseMessageHandler.Process(originID, event)
default:
return fmt.Errorf("received input with type %T from %x: %w", event, originID[:], engine.IncompatibleInputTypeError)
Expand Down Expand Up @@ -298,7 +269,7 @@ func (e *Engine) processAvailableResponses() {

msg, ok = e.pendingBlockResponses.Get()
if ok {
e.onBlockResponse(msg.OriginID, msg.Payload.([]*clustermodel.Proposal))
e.onBlockResponse(msg.OriginID, msg.Payload.(*clustermodel.BlockResponse))
e.metrics.MessageHandled(metrics.EngineClusterSynchronization, metrics.MessageBlockResponse)
continue
}
Expand All @@ -321,15 +292,15 @@ func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResp

// onBlockResponse processes a slice of requested block proposals.
// Input proposals are structurally validated.
func (e *Engine) onBlockResponse(originID flow.Identifier, proposals []*clustermodel.Proposal) {
func (e *Engine) onBlockResponse(originID flow.Identifier, response *clustermodel.BlockResponse) {
// process the blocks one by one
for _, proposal := range proposals {
for _, proposal := range response.Blocks {
if !e.core.HandleBlock(proposal.Block.ToHeader()) {
continue
}
synced := flow.Slashable[*clustermodel.Proposal]{
OriginID: originID,
Message: proposal,
Message: &proposal,
}
// forward the block to the compliance engine for validation and processing
e.comp.OnSyncedClusterBlock(synced)
Expand Down
59 changes: 23 additions & 36 deletions engine/collection/synchronization/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,9 @@ func (ss *SyncSuite) TestOnRangeRequest() {
req.ToHeight = ref - 1
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
expected := ss.heights[ref-1]
actual, err := clustermodel.NewProposal(res.Blocks[0])
require.NoError(t, err)
res := args.Get(0).(*clustermodel.BlockResponse)
expected := *ss.heights[ref-1]
actual := res.Blocks[0]
assert.Equal(ss.T(), expected.Block.ID(), actual.Block.ID(), "response should contain right block")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
Expand All @@ -280,11 +279,9 @@ func (ss *SyncSuite) TestOnRangeRequest() {
req.ToHeight = ref + 2
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
expected := []*clustermodel.Proposal{ss.heights[ref-2], ss.heights[ref-1], ss.heights[ref]}
actual, err := res.BlocksInternal()
require.NoError(t, err)
assert.ElementsMatch(ss.T(), expected, actual, "response should contain right blocks")
res := args.Get(0).(*clustermodel.BlockResponse)
expected := []clustermodel.Proposal{*ss.heights[ref-2], *ss.heights[ref-1], *ss.heights[ref]}
assert.ElementsMatch(ss.T(), expected, res.Blocks, "response should contain right blocks")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "should send response to original requester")
Expand All @@ -302,11 +299,9 @@ func (ss *SyncSuite) TestOnRangeRequest() {
req.ToHeight = ref
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
expected := []*clustermodel.Proposal{ss.heights[ref-2], ss.heights[ref-1], ss.heights[ref]}
actual, err := res.BlocksInternal()
require.NoError(t, err)
assert.ElementsMatch(ss.T(), expected, actual, "response should contain right blocks")
res := args.Get(0).(*clustermodel.BlockResponse)
expected := []clustermodel.Proposal{*ss.heights[ref-2], *ss.heights[ref-1], *ss.heights[ref]}
assert.ElementsMatch(ss.T(), expected, res.Blocks, "response should contain right blocks")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "should send response to original requester")
Expand All @@ -326,11 +321,9 @@ func (ss *SyncSuite) TestOnRangeRequest() {

ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
expected := []*clustermodel.Proposal{ss.heights[ref-4], ss.heights[ref-3], ss.heights[ref-2]}
actual, err := res.BlocksInternal()
require.NoError(t, err)
assert.ElementsMatch(ss.T(), expected, actual, "response should contain right blocks")
res := args.Get(0).(*clustermodel.BlockResponse)
expected := []clustermodel.Proposal{*ss.heights[ref-4], *ss.heights[ref-3], *ss.heights[ref-2]}
assert.ElementsMatch(ss.T(), expected, res.Blocks, "response should contain right blocks")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "should send response to original requester")
Expand Down Expand Up @@ -385,10 +378,9 @@ func (ss *SyncSuite) TestOnBatchRequest() {
ss.blockIDs[block.ID()] = proposal
ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
actual, err := clustermodel.NewProposal(res.Blocks[0])
require.NoError(t, err)
assert.Equal(ss.T(), proposal, actual, "response should contain right block")
res := args.Get(0).(*clustermodel.BlockResponse)
actual := res.Blocks[0]
assert.Equal(ss.T(), proposal, &actual, "response should contain right block")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "response should be send to original requester")
Expand All @@ -413,10 +405,8 @@ func (ss *SyncSuite) TestOnBatchRequest() {

ss.con.On("Unicast", mock.Anything, mock.Anything).Return(nil).Once().Run(
func(args mock.Arguments) {
res := args.Get(0).(*messages.ClusterBlockResponse)
proposals, err := res.BlocksInternal()
require.NoError(t, err)
assert.ElementsMatch(ss.T(), []*clustermodel.Proposal{ss.blockIDs[req.BlockIDs[0]], ss.blockIDs[req.BlockIDs[1]]}, proposals, "response should contain right block")
res := args.Get(0).(*clustermodel.BlockResponse)
assert.ElementsMatch(ss.T(), []clustermodel.Proposal{*ss.blockIDs[req.BlockIDs[0]], *ss.blockIDs[req.BlockIDs[1]]}, res.Blocks, "response should contain right block")
assert.Equal(ss.T(), req.Nonce, res.Nonce, "response should contain request nonce")
recipientID := args.Get(1).(flow.Identifier)
assert.Equal(ss.T(), originID, recipientID, "response should be send to original requester")
Expand All @@ -439,20 +429,17 @@ func (ss *SyncSuite) TestOnBatchRequest() {
}

func (ss *SyncSuite) TestOnBlockResponse() {

// generate origin and block response
originID := unittest.IdentifierFixture()
var res []*clustermodel.Proposal

response := unittest.ClusterBlockResponseFixture(2)
// add one block that should be processed
processable := unittest.ClusterProposalFixture()
processable := response.Blocks[0]
ss.core.On("HandleBlock", processable.Block.ToHeader()).Return(true)
res = append(res, processable)

// add one block that should not be processed
unprocessable := unittest.ClusterProposalFixture()
unprocessable := response.Blocks[1]
ss.core.On("HandleBlock", unprocessable.Block.ToHeader()).Return(false)
res = append(res, unprocessable)

ss.comp.On("OnSyncedClusterBlock", mock.Anything).Run(func(args mock.Arguments) {
res := args.Get(0).(flow.Slashable[*clustermodel.Proposal])
Expand All @@ -461,7 +448,7 @@ func (ss *SyncSuite) TestOnBlockResponse() {
ss.Assert().Equal(originID, res.OriginID)
}).Return(nil)

ss.e.onBlockResponse(originID, res)
ss.e.onBlockResponse(originID, response)
ss.comp.AssertExpectations(ss.T())
ss.core.AssertExpectations(ss.T())
}
Expand All @@ -475,9 +462,9 @@ func (ss *SyncSuite) TestOnInvalidBlockResponse() {
proposal := unittest.ClusterProposalFixture()
proposal.ProposerSigData = nil // invalid value

req := &messages.ClusterBlockResponse{
req := &clustermodel.BlockResponse{
Nonce: 0,
Blocks: []clustermodel.UntrustedProposal{clustermodel.UntrustedProposal(*proposal)},
Blocks: []clustermodel.Proposal{*proposal},
}

// Expect metrics to track message receipt and message drop for invalid block proposal
Expand Down
12 changes: 6 additions & 6 deletions engine/collection/synchronization/request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func (r *RequestHandlerEngine) onRangeRequest(originID flow.Identifier, req *mes
}

// get all of the blocks, one by one
proposals := make([]clustermodel.UntrustedProposal, 0, req.ToHeight-req.FromHeight+1)
proposals := make([]clustermodel.Proposal, 0, req.ToHeight-req.FromHeight+1)
for height := req.FromHeight; height <= req.ToHeight; height++ {
proposal, err := r.blocks.ProposalByHeight(height)
if errors.Is(err, storage.ErrNotFound) {
Expand All @@ -245,7 +245,7 @@ func (r *RequestHandlerEngine) onRangeRequest(originID flow.Identifier, req *mes
if err != nil {
return fmt.Errorf("could not get block for height (%d): %w", height, err)
}
proposals = append(proposals, clustermodel.UntrustedProposal(*proposal))
proposals = append(proposals, *proposal)
}

// if there are no blocks to send, skip network message
Expand All @@ -255,7 +255,7 @@ func (r *RequestHandlerEngine) onRangeRequest(originID flow.Identifier, req *mes
}

// send the response
res := &messages.ClusterBlockResponse{
res := &clustermodel.BlockResponse{
Nonce: req.Nonce,
Blocks: proposals,
}
Expand Down Expand Up @@ -304,7 +304,7 @@ func (r *RequestHandlerEngine) onBatchRequest(originID flow.Identifier, req *mes
}

// try to get all the blocks by ID
proposals := make([]clustermodel.UntrustedProposal, 0, len(blockIDs))
proposals := make([]clustermodel.Proposal, 0, len(blockIDs))
for blockID := range blockIDs {
proposal, err := r.blocks.ProposalByID(blockID)
if errors.Is(err, storage.ErrNotFound) {
Expand All @@ -314,7 +314,7 @@ func (r *RequestHandlerEngine) onBatchRequest(originID flow.Identifier, req *mes
if err != nil {
return fmt.Errorf("could not get block by ID (%s): %w", blockID, err)
}
proposals = append(proposals, clustermodel.UntrustedProposal(*proposal))
proposals = append(proposals, *proposal)
}

// if there are no blocks to send, skip network message
Expand All @@ -324,7 +324,7 @@ func (r *RequestHandlerEngine) onBatchRequest(originID flow.Identifier, req *mes
}

// send the response
res := &messages.ClusterBlockResponse{
res := &clustermodel.BlockResponse{
Nonce: req.Nonce,
Blocks: proposals,
}
Expand Down
53 changes: 13 additions & 40 deletions engine/common/synchronization/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,40 +172,12 @@ func (e *Engine) setupResponseMessageHandler() error {
},
engine.Pattern{
Match: func(msg *engine.Message) bool {
_, ok := msg.Payload.(*messages.BlockResponse)
_, ok := msg.Payload.(*flow.BlockResponse)
if ok {
e.metrics.MessageReceived(metrics.EngineSynchronization, metrics.MessageBlockResponse)
}
return ok
},
Map: func(msg *engine.Message) (*engine.Message, bool) {
blockResponse, ok := msg.Payload.(*messages.BlockResponse)
if !ok {
// should never happen, unless there is a bug.
e.log.Fatal().
Hex("origin_id", logging.ID(msg.OriginID)).
Interface("payload", msg.Payload).
Msg("cannot match the payload to BlockResponse")
return nil, false
}
proposals, err := blockResponse.BlocksInternal()
if err != nil {
// TODO(BFT, #7620): Replace this log statement with a call to the protocol violation consumer.
e.log.Warn().
Hex("origin_id", logging.ID(msg.OriginID)).
Uint64("nonce", blockResponse.Nonce).
Int("block_count", len(blockResponse.Blocks)).
Err(err).
Msgf("cannot convert untrusted proposal to trusted proposal")
e.metrics.InboundMessageDropped(metrics.EngineSynchronization, metrics.MessageBlockProposal)
return nil, false
}

return &engine.Message{
OriginID: msg.OriginID,
Payload: proposals,
}, true
},
Store: e.pendingBlockResponses,
},
)
Expand Down Expand Up @@ -253,7 +225,7 @@ func (e *Engine) process(channel channels.Channel, originID flow.Identifier, eve
}
return e.requestHandler.Process(channel, originID, event)

case *messages.BlockResponse:
case *flow.BlockResponse:
err := e.validateBlockResponseForALSP(channel, originID, message)
if err != nil {
irrecoverable.Throw(context.TODO(), fmt.Errorf("failed to validate block response from %x: %w", originID[:], err))
Expand Down Expand Up @@ -305,7 +277,7 @@ func (e *Engine) processAvailableResponses(ctx context.Context) {

msg, ok = e.pendingBlockResponses.Get()
if ok {
e.onBlockResponse(msg.OriginID, msg.Payload.([]*flow.Proposal))
e.onBlockResponse(msg.OriginID, msg.Payload.(*flow.BlockResponse))
e.metrics.MessageHandled(metrics.EngineSynchronization, metrics.MessageBlockResponse)
continue
}
Expand All @@ -323,26 +295,27 @@ func (e *Engine) onSyncResponse(originID flow.Identifier, res *messages.SyncResp
e.core.HandleHeight(final, res.Height)
}

// onBlockResponse processes a structurally validated block proposal containing a specifically requested block.
func (e *Engine) onBlockResponse(originID flow.Identifier, res []*flow.Proposal) {
// onBlockResponse processes a structurally validated block proposal containing a specifically requested block response.
func (e *Engine) onBlockResponse(originID flow.Identifier, res *flow.BlockResponse) {
// process the proposal one by one
if len(res) == 0 {
if len(res.Blocks) == 0 {
e.log.Debug().Msg("received empty proposals")
return
}

first := res[0].Block.Height
last := res[len(res)-1].Block.Height
proposals := res.Blocks
first := proposals[0].Block.Height
last := proposals[len(proposals)-1].Block.Height
e.log.Debug().Uint64("first", first).Uint64("last", last).Msg("received proposal")

filteredProposals := make([]*flow.Proposal, 0, len(res))
for _, proposal := range res {
filteredProposals := make([]*flow.Proposal, 0, len(proposals))
for _, proposal := range proposals {
header := proposal.Block.ToHeader()
if !e.core.HandleBlock(header) {
e.log.Debug().Uint64("height", header.Height).Msg("block handler rejected")
continue
}
filteredProposals = append(filteredProposals, proposal)
filteredProposals = append(filteredProposals, &proposal)
}

// forward the block to the compliance engine for validation and processing
Expand Down Expand Up @@ -556,7 +529,7 @@ func (e *Engine) validateBatchRequestForALSP(originID flow.Identifier, batchRequ
}

// TODO: implement spam reporting similar to validateSyncRequestForALSP
func (e *Engine) validateBlockResponseForALSP(channel channels.Channel, id flow.Identifier, blockResponse *messages.BlockResponse) error {
func (e *Engine) validateBlockResponseForALSP(channel channels.Channel, id flow.Identifier, blockResponse *flow.BlockResponse) error {
return nil
}

Expand Down
Loading