Skip to content

Commit

Permalink
Remove list from AcceptedFrontier message (#1578)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Jun 5, 2023
1 parent c2ff5ff commit ab20b7d
Show file tree
Hide file tree
Showing 24 changed files with 252 additions and 271 deletions.
10 changes: 4 additions & 6 deletions message/inbound_msg_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,18 +135,16 @@ func InboundGetAcceptedFrontier(
func InboundAcceptedFrontier(
chainID ids.ID,
requestID uint32,
containerIDs []ids.ID,
containerID ids.ID,
nodeID ids.NodeID,
) InboundMessage {
containerIDBytes := make([][]byte, len(containerIDs))
encodeIDs(containerIDs, containerIDBytes)
return &inboundMessage{
nodeID: nodeID,
op: AcceptedFrontierOp,
message: &p2p.AcceptedFrontier{
ChainId: chainID[:],
RequestId: requestID,
ContainerIds: containerIDBytes,
ChainId: chainID[:],
RequestId: requestID,
ContainerId: containerID[:],
},
expiration: mockable.MaxTime,
}
Expand Down
9 changes: 2 additions & 7 deletions message/inbound_msg_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func TestInboundMsgBuilder(t *testing.T) {
msg := InboundAcceptedFrontier(
chainID,
requestID,
containerIDs,
containerIDs[0],
nodeID,
)

Expand All @@ -196,12 +196,7 @@ func TestInboundMsgBuilder(t *testing.T) {
innerMsg := msg.Message().(*p2p.AcceptedFrontier)
require.Equal(chainID[:], innerMsg.ChainId)
require.Equal(requestID, innerMsg.RequestId)
containerIDsBytes := make([][]byte, len(containerIDs))
for i, id := range containerIDs {
id := id
containerIDsBytes[i] = id[:]
}
require.Equal(containerIDsBytes, innerMsg.ContainerIds)
require.Equal(containerIDs[0][:], innerMsg.ContainerId)
},
)

Expand Down
6 changes: 3 additions & 3 deletions message/messages_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ func TestMessage(t *testing.T) {
msg: &p2p.Message{
Message: &p2p.Message_AcceptedFrontier_{
AcceptedFrontier_: &p2p.AcceptedFrontier{
ChainId: testID[:],
RequestId: 1,
ContainerIds: [][]byte{testID[:], testID[:]},
ChainId: testID[:],
RequestId: 1,
ContainerId: testID[:],
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion message/mock_outbound_message_builder.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions message/outbound_msg_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type OutboundMsgBuilder interface {
AcceptedFrontier(
chainID ids.ID,
requestID uint32,
containerIDs []ids.ID,
containerID ids.ID,
) (OutboundMessage, error)

GetAccepted(
Expand Down Expand Up @@ -400,17 +400,15 @@ func (b *outMsgBuilder) GetAcceptedFrontier(
func (b *outMsgBuilder) AcceptedFrontier(
chainID ids.ID,
requestID uint32,
containerIDs []ids.ID,
containerID ids.ID,
) (OutboundMessage, error) {
containerIDBytes := make([][]byte, len(containerIDs))
encodeIDs(containerIDs, containerIDBytes)
return b.builder.createOutbound(
&p2p.Message{
Message: &p2p.Message_AcceptedFrontier_{
AcceptedFrontier_: &p2p.AcceptedFrontier{
ChainId: chainID[:],
RequestId: requestID,
ContainerIds: containerIDBytes,
ChainId: chainID[:],
RequestId: requestID,
ContainerId: containerID[:],
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion proto/p2p/p2p.proto
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ message AcceptedFrontier {

bytes chain_id = 1;
uint32 request_id = 2;
repeated bytes container_ids = 3;
bytes container_id = 3;
}

// Message to request for the accepted blocks/vertices of the "remote" peer.
Expand Down
234 changes: 117 additions & 117 deletions proto/pb/p2p/p2p.pb.go

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion snow/engine/avalanche/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ func (gh *getter) GetAcceptedStateSummary(_ context.Context, nodeID ids.NodeID,

func (gh *getter) GetAcceptedFrontier(ctx context.Context, validatorID ids.NodeID, requestID uint32) error {
acceptedFrontier := gh.storage.Edge(ctx)
gh.sender.SendAcceptedFrontier(ctx, validatorID, requestID, acceptedFrontier)
// Since all the DAGs are linearized, we only need to return the stop
// vertex.
if len(acceptedFrontier) > 0 {
gh.sender.SendAcceptedFrontier(ctx, validatorID, requestID, acceptedFrontier[0])
}
return nil
}

Expand Down
38 changes: 8 additions & 30 deletions snow/engine/avalanche/getter/getter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,47 +68,25 @@ func TestAcceptedFrontier(t *testing.T) {

manager, sender, config := testSetup(t)

vtxID0 := ids.GenerateTestID()
vtxID1 := ids.GenerateTestID()
vtxID2 := ids.GenerateTestID()
vtxID := ids.GenerateTestID()

bsIntf, err := New(manager, config)
if err != nil {
t.Fatal(err)
}
require.NoError(err)
require.IsType(&getter{}, bsIntf)
bs := bsIntf.(*getter)

manager.EdgeF = func(context.Context) []ids.ID {
return []ids.ID{
vtxID0,
vtxID1,
vtxID,
}
}

var accepted []ids.ID
sender.SendAcceptedFrontierF = func(_ context.Context, _ ids.NodeID, _ uint32, frontier []ids.ID) {
accepted = frontier
}

if err := bs.GetAcceptedFrontier(context.Background(), ids.EmptyNodeID, 0); err != nil {
t.Fatal(err)
}

acceptedSet := set.Set[ids.ID]{}
acceptedSet.Add(accepted...)

manager.EdgeF = nil

if !acceptedSet.Contains(vtxID0) {
t.Fatalf("Vtx should be accepted")
}
if !acceptedSet.Contains(vtxID1) {
t.Fatalf("Vtx should be accepted")
}
if acceptedSet.Contains(vtxID2) {
t.Fatalf("Vtx shouldn't be accepted")
var accepted ids.ID
sender.SendAcceptedFrontierF = func(_ context.Context, _ ids.NodeID, _ uint32, containerID ids.ID) {
accepted = containerID
}
require.NoError(bs.GetAcceptedFrontier(context.Background(), ids.EmptyNodeID, 0))
require.Equal(vtxID, accepted)
}

func TestFilterAccepted(t *testing.T) {
Expand Down
54 changes: 32 additions & 22 deletions snow/engine/common/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func NewCommonBootstrapper(config Config) Bootstrapper {
}
}

func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerIDs []ids.ID) error {
func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error {
// ignores any late responses
if requestID != b.Config.SharedCfg.RequestID {
b.Ctx.Log.Debug("received out-of-sync AcceptedFrontier message",
Expand All @@ -98,12 +98,39 @@ func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID,
return nil
}

// Mark that we received a response from [nodeID]
b.pendingReceiveAcceptedFrontier.Remove(nodeID)

// Union the reported accepted frontier from [nodeID] with the accepted
// frontier we got from others
b.acceptedFrontierSet.Add(containerIDs...)
b.acceptedFrontierSet.Add(containerID)
return b.markAcceptedFrontierReceived(ctx, nodeID)
}

func (b *bootstrapper) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
// ignores any late responses
if requestID != b.Config.SharedCfg.RequestID {
b.Ctx.Log.Debug("received out-of-sync GetAcceptedFrontierFailed message",
zap.Stringer("nodeID", nodeID),
zap.Uint32("expectedRequestID", b.Config.SharedCfg.RequestID),
zap.Uint32("requestID", requestID),
)
return nil
}

if !b.pendingReceiveAcceptedFrontier.Contains(nodeID) {
b.Ctx.Log.Debug("received unexpected GetAcceptedFrontierFailed message",
zap.Stringer("nodeID", nodeID),
)
return nil
}

// If we can't get a response from [nodeID], act as though they said their
// accepted frontier is empty and we add the validator to the failed list
b.failedAcceptedFrontier.Add(nodeID)
return b.markAcceptedFrontierReceived(ctx, nodeID)
}

func (b *bootstrapper) markAcceptedFrontierReceived(ctx context.Context, nodeID ids.NodeID) error {
// Mark that we received a response from [nodeID]
b.pendingReceiveAcceptedFrontier.Remove(nodeID)

b.sendGetAcceptedFrontiers(ctx)

Expand Down Expand Up @@ -150,23 +177,6 @@ func (b *bootstrapper) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID,
return nil
}

func (b *bootstrapper) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error {
// ignores any late responses
if requestID != b.Config.SharedCfg.RequestID {
b.Ctx.Log.Debug("received out-of-sync GetAcceptedFrontierFailed message",
zap.Stringer("nodeID", nodeID),
zap.Uint32("expectedRequestID", b.Config.SharedCfg.RequestID),
zap.Uint32("requestID", requestID),
)
return nil
}

// If we can't get a response from [nodeID], act as though they said their
// accepted frontier is empty and we add the validator to the failed list
b.failedAcceptedFrontier.Add(nodeID)
return b.AcceptedFrontier(ctx, nodeID, requestID, nil)
}

func (b *bootstrapper) Accepted(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerIDs []ids.ID) error {
// ignores any late responses
if requestID != b.Config.SharedCfg.RequestID {
Expand Down
4 changes: 2 additions & 2 deletions snow/engine/common/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ type AcceptedFrontierHandler interface {
//
// This function can be called by any validator. It is not safe to assume
// this message is in response to a GetAcceptedFrontier message, is
// utilizing a unique requestID, or that the containerIDs from a valid
// utilizing a unique requestID, or that the containerID is a valid
// frontier.
AcceptedFrontier(
ctx context.Context,
validatorID ids.NodeID,
requestID uint32,
containerIDs []ids.ID,
containerID ids.ID,
) error

// Notify this engine that a get accepted frontier request it issued has
Expand Down
3 changes: 2 additions & 1 deletion snow/engine/common/no_ops_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,13 @@ func NewNoOpAcceptedFrontierHandler(log logging.Logger) AcceptedFrontierHandler
return &noOpAcceptedFrontierHandler{log: log}
}

func (nop *noOpAcceptedFrontierHandler) AcceptedFrontier(_ context.Context, nodeID ids.NodeID, requestID uint32, _ []ids.ID) error {
func (nop *noOpAcceptedFrontierHandler) AcceptedFrontier(_ context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error {
nop.log.Debug("dropping request",
zap.String("reason", "unhandled by this gear"),
zap.Stringer("messageOp", message.AcceptedFrontierOp),
zap.Stringer("nodeID", nodeID),
zap.Uint32("requestID", requestID),
zap.Stringer("containerID", containerID),
)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion snow/engine/common/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type FrontierSender interface {
ctx context.Context,
nodeID ids.NodeID,
requestID uint32,
containerIDs []ids.ID,
containerID ids.ID,
)
}

Expand Down
27 changes: 14 additions & 13 deletions snow/engine/common/test_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,17 +105,18 @@ type EngineTest struct {

CantGetVM bool

StartF func(ctx context.Context, startReqID uint32) error
IsBootstrappedF func() bool
ContextF func() *snow.ConsensusContext
HaltF func(context.Context)
TimeoutF, GossipF, ShutdownF func(context.Context) error
NotifyF func(context.Context, Message) error
GetF, GetAncestorsF, PullQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte) error
AncestorsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containers [][]byte) error
AcceptedFrontierF, GetAcceptedF, AcceptedF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredIDs []ids.ID) error
ChitsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredID ids.ID, acceptedID ids.ID) error
StartF func(ctx context.Context, startReqID uint32) error
IsBootstrappedF func() bool
ContextF func() *snow.ConsensusContext
HaltF func(context.Context)
TimeoutF, GossipF, ShutdownF func(context.Context) error
NotifyF func(context.Context, Message) error
GetF, GetAncestorsF, PullQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, container []byte) error
AncestorsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containers [][]byte) error
AcceptedFrontierF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error
GetAcceptedF, AcceptedF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredIDs []ids.ID) error
ChitsF func(ctx context.Context, nodeID ids.NodeID, requestID uint32, preferredID ids.ID, acceptedID ids.ID) error
GetStateSummaryFrontierF, GetStateSummaryFrontierFailedF, GetAcceptedStateSummaryFailedF,
GetAcceptedFrontierF, GetFailedF, GetAncestorsFailedF,
QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(ctx context.Context, nodeID ids.NodeID, requestID uint32) error
Expand Down Expand Up @@ -371,9 +372,9 @@ func (e *EngineTest) GetAcceptedFrontierFailed(ctx context.Context, nodeID ids.N
return errGetAcceptedFrontierFailed
}

func (e *EngineTest) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerIDs []ids.ID) error {
func (e *EngineTest) AcceptedFrontier(ctx context.Context, nodeID ids.NodeID, requestID uint32, containerID ids.ID) error {
if e.AcceptedFrontierF != nil {
return e.AcceptedFrontierF(ctx, nodeID, requestID, containerIDs)
return e.AcceptedFrontierF(ctx, nodeID, requestID, containerID)
}
if !e.CantAcceptedFrontier {
return nil
Expand Down
Loading

0 comments on commit ab20b7d

Please sign in to comment.