Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PeerDAS: Multiple improvements #14467

Merged
merged 6 commits into from
Sep 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address Nishant's comment
  • Loading branch information
nalepae committed Sep 23, 2024
commit 1dc8225324f4b96fc199c364889333bad998efbc
4 changes: 1 addition & 3 deletions beacon-chain/blockchain/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/prysmaticlabs/prysm/v5/async/event"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
Expand All @@ -24,7 +23,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/startup"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/testing/require"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -74,7 +72,7 @@ func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.B
return nil
}

func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ primitives.Slot, _ time.Time, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
mb.broadcastCalled = true
return nil
}
Expand Down
15 changes: 9 additions & 6 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/helpers"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing"
"github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace"
Expand Down Expand Up @@ -285,8 +284,6 @@ func (s *Service) internalBroadcastBlob(
// TODO: Add tests
func (s *Service) BroadcastDataColumn(
ctx context.Context,
slot primitives.Slot,
slotStartTime time.Time,
root [fieldparams.RootLength]byte,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
Expand All @@ -309,15 +306,13 @@ func (s *Service) BroadcastDataColumn(
}

// Non-blocking broadcast, with attempts to discover a column subnet peer if none available.
go s.internalBroadcastDataColumn(ctx, slot, slotStartTime, root, columnSubnet, dataColumnSidecar, forkDigest)
go s.internalBroadcastDataColumn(ctx, root, columnSubnet, dataColumnSidecar, forkDigest)

return nil
}

func (s *Service) internalBroadcastDataColumn(
ctx context.Context,
slot primitives.Slot,
slotStartTime time.Time,
root [fieldparams.RootLength]byte,
columnSubnet uint64,
dataColumnSidecar *ethpb.DataColumnSidecar,
Expand Down Expand Up @@ -379,6 +374,14 @@ func (s *Service) internalBroadcastDataColumn(
tracing.AnnotateError(span, err)
}

header := dataColumnSidecar.SignedBlockHeader.GetHeader()
slot := header.GetSlot()

slotStartTime, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot)
if err != nil {
log.WithError(err).Error("Failed to convert slot to time")
}

log.WithFields(logrus.Fields{
"slot": slot,
"timeSinceSlotStart": time.Since(slotStartTime),
Expand Down
7 changes: 2 additions & 5 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v5/consensus-types/wrapper"
"github.com/prysmaticlabs/prysm/v5/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
Expand Down Expand Up @@ -586,12 +585,10 @@ func TestService_BroadcastDataColumn(t *testing.T) {

// Attempt to broadcast nil object should fail.
ctx := context.Background()
slot := primitives.Slot(42)
slotStartTime := time.Now()
var root [fieldparams.RootLength]byte
require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastDataColumn(ctx, slot, slotStartTime, root, subnet, nil))
require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastDataColumn(ctx, root, subnet, nil))

// Broadcast to peers and wait.
require.NoError(t, p.BroadcastDataColumn(ctx, slot, slotStartTime, root, subnet, sidecar))
require.NoError(t, p.BroadcastDataColumn(ctx, root, subnet, sidecar))
require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s")
}
4 changes: 1 addition & 3 deletions beacon-chain/p2p/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package p2p

import (
"context"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -46,7 +44,7 @@ type Broadcaster interface {
BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error
BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error
BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error
BroadcastDataColumn(ctx context.Context, slot primitives.Slot, slotStartTime time.Time, root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
BroadcastDataColumn(ctx context.Context, root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error
}

// SetStreamHandler configures p2p to handle streams of a certain topic ID.
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/testing/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//beacon-chain/p2p/peers/scorers:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/primitives:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//proto/prysm/v1alpha1/metadata:go_default_library",
"@com_github_ethereum_go_ethereum//crypto:go_default_library",
Expand Down
4 changes: 1 addition & 3 deletions beacon-chain/p2p/testing/fuzz_p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package testing

import (
"context"
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
Expand All @@ -15,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -158,7 +156,7 @@ func (*FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar)
}

// BroadcastDataColumn -- fake.
func (*FakeP2P) BroadcastDataColumn(_ context.Context, _ primitives.Slot, _ time.Time, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
func (*FakeP2P) BroadcastDataColumn(_ context.Context, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error {
return nil
}

Expand Down
4 changes: 1 addition & 3 deletions beacon-chain/p2p/testing/mock_broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@ import (
"context"
"sync"
"sync/atomic"
"time"

fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -52,7 +50,7 @@ func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.BlobSide
}

// BroadcastDataColumn broadcasts a data column for mock.
func (m *MockBroadcaster) BroadcastDataColumn(context.Context, primitives.Slot, time.Time, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (m *MockBroadcaster) BroadcastDataColumn(context.Context, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
m.BroadcastCalled.Store(true)
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/p2p/testing/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers"
fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -194,7 +193,7 @@ func (p *TestP2P) BroadcastBlob(context.Context, uint64, *ethpb.BlobSidecar) err
}

// BroadcastDataColumn broadcasts a data column for mock.
func (p *TestP2P) BroadcastDataColumn(context.Context, primitives.Slot, time.Time, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
func (p *TestP2P) BroadcastDataColumn(context.Context, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error {
p.BroadcastCalled.Store(true)
return nil
}
Expand Down
9 changes: 1 addition & 8 deletions beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,13 +473,6 @@ func (vs *Server) broadcastAndReceiveDataColumns(
eg, _ := errgroup.WithContext(ctx)
dataColumnsWithholdCount := features.Get().DataColumnsWithholdCount

// Get the time corresponding to the start of the slot.
genesisTime := uint64(vs.TimeFetcher.GenesisTime().Unix())
slotStartTime, err := slots.ToTime(genesisTime, slot)
if err != nil {
return errors.Wrap(err, "to time")
}

for _, sd := range sidecars {
// Copy the iteration instance to a local variable to give each go-routine its own copy to play with.
// See https://golang.org/doc/faq#closures_and_goroutines for more details.
Expand All @@ -496,7 +489,7 @@ func (vs *Server) broadcastAndReceiveDataColumns(
"dataColumnIndex": sidecar.ColumnIndex,
}).Warning("Withholding data column")
} else {
if err := vs.P2P.BroadcastDataColumn(ctx, slot, slotStartTime, root, subnet, sidecar); err != nil {
if err := vs.P2P.BroadcastDataColumn(ctx, root, subnet, sidecar); err != nil {
return errors.Wrap(err, "broadcast data column")
}
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/data_columns_reconstruct.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast(
// Compute the subnet for this column.
subnet := column % params.BeaconConfig().DataColumnSidecarSubnetCount
// Broadcast the missing data column.
if err := s.cfg.p2p.BroadcastDataColumn(ctx, slot, slotStartTime, blockRoot, subnet, dataColumnSidecar); err != nil {
if err := s.cfg.p2p.BroadcastDataColumn(ctx, blockRoot, subnet, dataColumnSidecar); err != nil {
log.WithError(err).Error("Broadcast data column")
}
}
Expand Down
Loading