From 9e2a3d2ae6ef27606b7f298a109650cbbdc56537 Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Mon, 23 Sep 2024 11:08:58 +0200 Subject: [PATCH] PeerDAS: Multiple improvements (#14467) * `scheduleReconstructedDataColumnsBroadcast`: Really minor refactor. * `receivedDataColumnsFromRootLock` -> `dataColumnsFromRootLock` * `reconstructDataColumns`: Stop looking into the DB to know if we have some columns. Before this commit: Each time we receive a column, we look into the filesystem for all columns we store. ==> For 128 columns, it looks for 1 + 2 + 3 + ... + 128 = 128(128+1)/2 = 8256 files look. Also, as soon as a column is saved into the file system, then if, right after, we look at the filesystem again, we assume the column will be available (strict consistency). It happens not to be always true. ==> Sometimes, we can reconstruct and reseed columns more than once, because of this lack of filesystem strict consistency. After this commit: We use a (strictly consistent) cache to determine if we received a column or not. ==> No more consistency issue, and less stress for the filesystem. * `dataColumnSidecarByRootRPCHandler`: Improve logging. Before this commit, logged values assumed that all requested columns correspond to the same block root, which is not always the case. After this commit, we know which columns are requested for which root. * Add a log when broadcasting a data column. This is useful to debug "lost data columns" in devnet. * Address Nishant's comment --- beacon-chain/blockchain/setup_test.go | 3 +- beacon-chain/p2p/broadcaster.go | 25 ++- beacon-chain/p2p/broadcaster_test.go | 5 +- beacon-chain/p2p/interfaces.go | 3 +- beacon-chain/p2p/testing/BUILD.bazel | 1 + beacon-chain/p2p/testing/fuzz_p2p.go | 3 +- beacon-chain/p2p/testing/mock_broadcaster.go | 3 +- beacon-chain/p2p/testing/p2p.go | 3 +- .../rpc/prysm/v1alpha1/validator/proposer.go | 4 +- beacon-chain/sync/BUILD.bazel | 1 + beacon-chain/sync/data_columns_reconstruct.go | 179 +++++++++++++----- .../sync/data_columns_reconstruct_test.go | 87 +++++++++ .../sync/rpc_data_column_sidecars_by_root.go | 80 ++++---- beacon-chain/sync/service.go | 17 +- .../sync/subscriber_data_column_sidecar.go | 17 +- 15 files changed, 332 insertions(+), 99 deletions(-) create mode 100644 beacon-chain/sync/data_columns_reconstruct_test.go diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 8728b8bdfc32..65af044040ba 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -22,6 +22,7 @@ import ( p2pTesting "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" "github.com/prysmaticlabs/prysm/v5/beacon-chain/state/stategen" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/testing/require" "google.golang.org/protobuf/proto" @@ -71,7 +72,7 @@ func (mb *mockBroadcaster) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.B return nil } -func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error { +func (mb *mockBroadcaster) BroadcastDataColumn(_ context.Context, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { mb.broadcastCalled = true return nil } diff --git a/beacon-chain/p2p/broadcaster.go b/beacon-chain/p2p/broadcaster.go index 4dd90646dfe6..6780174d641a 100644 --- a/beacon-chain/p2p/broadcaster.go +++ b/beacon-chain/p2p/broadcaster.go @@ -282,7 +282,12 @@ func (s *Service) internalBroadcastBlob( // BroadcastDataColumn broadcasts a data column to the p2p network, the message is assumed to be // broadcasted to the current fork and to the input column subnet. // TODO: Add tests -func (s *Service) BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error { +func (s *Service) BroadcastDataColumn( + ctx context.Context, + root [fieldparams.RootLength]byte, + columnSubnet uint64, + dataColumnSidecar *ethpb.DataColumnSidecar, +) error { // Add tracing to the function. ctx, span := trace.StartSpan(ctx, "p2p.BroadcastBlob") defer span.End() @@ -301,13 +306,14 @@ func (s *Service) BroadcastDataColumn(ctx context.Context, columnSubnet uint64, } // Non-blocking broadcast, with attempts to discover a column subnet peer if none available. - go s.internalBroadcastDataColumn(ctx, columnSubnet, dataColumnSidecar, forkDigest) + go s.internalBroadcastDataColumn(ctx, root, columnSubnet, dataColumnSidecar, forkDigest) return nil } func (s *Service) internalBroadcastDataColumn( ctx context.Context, + root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar, forkDigest [fieldparams.VersionLength]byte, @@ -368,6 +374,21 @@ func (s *Service) internalBroadcastDataColumn( tracing.AnnotateError(span, err) } + header := dataColumnSidecar.SignedBlockHeader.GetHeader() + slot := header.GetSlot() + + slotStartTime, err := slots.ToTime(uint64(s.genesisTime.Unix()), slot) + if err != nil { + log.WithError(err).Error("Failed to convert slot to time") + } + + log.WithFields(logrus.Fields{ + "slot": slot, + "timeSinceSlotStart": time.Since(slotStartTime), + "root": fmt.Sprintf("%#x", root), + "columnSubnet": columnSubnet, + }).Debug("Broadcasted data column sidecar") + // Increase the number of successful broadcasts. dataColumnSidecarBroadcasts.Inc() } diff --git a/beacon-chain/p2p/broadcaster_test.go b/beacon-chain/p2p/broadcaster_test.go index 80e94f044f32..99c64ddd71be 100644 --- a/beacon-chain/p2p/broadcaster_test.go +++ b/beacon-chain/p2p/broadcaster_test.go @@ -585,9 +585,10 @@ func TestService_BroadcastDataColumn(t *testing.T) { // Attempt to broadcast nil object should fail. ctx := context.Background() - require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastDataColumn(ctx, subnet, nil)) + var root [fieldparams.RootLength]byte + require.ErrorContains(t, "attempted to broadcast nil", p.BroadcastDataColumn(ctx, root, subnet, nil)) // Broadcast to peers and wait. - require.NoError(t, p.BroadcastDataColumn(ctx, subnet, sidecar)) + require.NoError(t, p.BroadcastDataColumn(ctx, root, subnet, sidecar)) require.Equal(t, false, util.WaitTimeout(&wg, 1*time.Second), "Failed to receive pubsub within 1s") } diff --git a/beacon-chain/p2p/interfaces.go b/beacon-chain/p2p/interfaces.go index 458f6ef29229..d5dec5b21b65 100644 --- a/beacon-chain/p2p/interfaces.go +++ b/beacon-chain/p2p/interfaces.go @@ -13,6 +13,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" "google.golang.org/protobuf/proto" @@ -43,7 +44,7 @@ type Broadcaster interface { BroadcastAttestation(ctx context.Context, subnet uint64, att ethpb.Att) error BroadcastSyncCommitteeMessage(ctx context.Context, subnet uint64, sMsg *ethpb.SyncCommitteeMessage) error BroadcastBlob(ctx context.Context, subnet uint64, blob *ethpb.BlobSidecar) error - BroadcastDataColumn(ctx context.Context, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error + BroadcastDataColumn(ctx context.Context, root [fieldparams.RootLength]byte, columnSubnet uint64, dataColumnSidecar *ethpb.DataColumnSidecar) error } // SetStreamHandler configures p2p to handle streams of a certain topic ID. diff --git a/beacon-chain/p2p/testing/BUILD.bazel b/beacon-chain/p2p/testing/BUILD.bazel index 71e668119ba7..ed1410b7b94c 100644 --- a/beacon-chain/p2p/testing/BUILD.bazel +++ b/beacon-chain/p2p/testing/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//beacon-chain/p2p/encoder:go_default_library", "//beacon-chain/p2p/peers:go_default_library", "//beacon-chain/p2p/peers/scorers:go_default_library", + "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//proto/prysm/v1alpha1:go_default_library", "//proto/prysm/v1alpha1/metadata:go_default_library", diff --git a/beacon-chain/p2p/testing/fuzz_p2p.go b/beacon-chain/p2p/testing/fuzz_p2p.go index 582623cfefa1..d2b99ce3cc73 100644 --- a/beacon-chain/p2p/testing/fuzz_p2p.go +++ b/beacon-chain/p2p/testing/fuzz_p2p.go @@ -13,6 +13,7 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" "google.golang.org/protobuf/proto" @@ -155,7 +156,7 @@ func (*FakeP2P) BroadcastBlob(_ context.Context, _ uint64, _ *ethpb.BlobSidecar) } // BroadcastDataColumn -- fake. -func (*FakeP2P) BroadcastDataColumn(_ context.Context, _ uint64, _ *ethpb.DataColumnSidecar) error { +func (*FakeP2P) BroadcastDataColumn(_ context.Context, _ [fieldparams.RootLength]byte, _ uint64, _ *ethpb.DataColumnSidecar) error { return nil } diff --git a/beacon-chain/p2p/testing/mock_broadcaster.go b/beacon-chain/p2p/testing/mock_broadcaster.go index 75679bec8f11..5c7eda9dc48c 100644 --- a/beacon-chain/p2p/testing/mock_broadcaster.go +++ b/beacon-chain/p2p/testing/mock_broadcaster.go @@ -5,6 +5,7 @@ import ( "sync" "sync/atomic" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "google.golang.org/protobuf/proto" ) @@ -49,7 +50,7 @@ func (m *MockBroadcaster) BroadcastBlob(context.Context, uint64, *ethpb.BlobSide } // BroadcastDataColumn broadcasts a data column for mock. -func (m *MockBroadcaster) BroadcastDataColumn(context.Context, uint64, *ethpb.DataColumnSidecar) error { +func (m *MockBroadcaster) BroadcastDataColumn(context.Context, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { m.BroadcastCalled.Store(true) return nil } diff --git a/beacon-chain/p2p/testing/p2p.go b/beacon-chain/p2p/testing/p2p.go index 9c4ec5521c1f..0d68d0481efc 100644 --- a/beacon-chain/p2p/testing/p2p.go +++ b/beacon-chain/p2p/testing/p2p.go @@ -28,6 +28,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers/scorers" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/metadata" @@ -207,7 +208,7 @@ func (p *TestP2P) BroadcastBlob(context.Context, uint64, *ethpb.BlobSidecar) err } // BroadcastDataColumn broadcasts a data column for mock. -func (p *TestP2P) BroadcastDataColumn(context.Context, uint64, *ethpb.DataColumnSidecar) error { +func (p *TestP2P) BroadcastDataColumn(context.Context, [fieldparams.RootLength]byte, uint64, *ethpb.DataColumnSidecar) error { p.BroadcastCalled.Store(true) return nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go index 278a781655d2..957c38955f33 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer.go @@ -471,7 +471,6 @@ func (vs *Server) broadcastAndReceiveDataColumns( slot primitives.Slot, ) error { eg, _ := errgroup.WithContext(ctx) - dataColumnsWithholdCount := features.Get().DataColumnsWithholdCount for _, sd := range sidecars { @@ -487,11 +486,10 @@ func (vs *Server) broadcastAndReceiveDataColumns( log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), "slot": slot, - "subnet": subnet, "dataColumnIndex": sidecar.ColumnIndex, }).Warning("Withholding data column") } else { - if err := vs.P2P.BroadcastDataColumn(ctx, subnet, sidecar); err != nil { + if err := vs.P2P.BroadcastDataColumn(ctx, root, subnet, sidecar); err != nil { return errors.Wrap(err, "broadcast data column") } } diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index e4250b041017..b939a761a2e3 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -162,6 +162,7 @@ go_test( "block_batcher_test.go", "broadcast_bls_changes_test.go", "context_test.go", + "data_columns_reconstruct_test.go", "data_columns_sampling_test.go", "decode_pubsub_test.go", "error_test.go", diff --git a/beacon-chain/sync/data_columns_reconstruct.go b/beacon-chain/sync/data_columns_reconstruct.go index 32bd26febac2..e34a4c5dc368 100644 --- a/beacon-chain/sync/data_columns_reconstruct.go +++ b/beacon-chain/sync/data_columns_reconstruct.go @@ -3,9 +3,10 @@ package sync import ( "context" "fmt" - "sort" + "slices" "time" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -24,9 +25,9 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu blockRoot := verifiedRODataColumn.BlockRoot() // Get the columns we store. - storedDataColumns, err := s.cfg.blobStorage.ColumnIndices(blockRoot) + storedDataColumns, err := s.storedDataColumns(blockRoot) if err != nil { - return errors.Wrap(err, "columns indices") + return errors.Wrap(err, "stored data columns") } storedColumnsCount := len(storedDataColumns) @@ -50,10 +51,12 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu defer s.dataColumsnReconstructionLock.Unlock() - // Retrieve the custodied columns. - custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), peerdas.CustodySubnetCount()) + // Retrieve the custody columns. + nodeID := s.cfg.p2p.NodeID() + custodySubnetCount := peerdas.CustodySubnetCount() + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) if err != nil { - return errors.Wrap(err, "custodied columns") + return errors.Wrap(err, "custody columns") } // Load the data columns sidecars. @@ -67,7 +70,7 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu dataColumnSideCars = append(dataColumnSideCars, dataColumnSidecar) } - // Recover cells and proofs + // Recover cells and proofs. recoveredCellsAndProofs, err := peerdas.RecoverCellsAndProofs(dataColumnSideCars, blockRoot) if err != nil { return errors.Wrap(err, "recover cells and proofs") @@ -86,7 +89,7 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu // Save the data columns sidecars in the database. for _, dataColumnSidecar := range dataColumnSidecars { - shouldSave := custodiedColumns[dataColumnSidecar.ColumnIndex] + shouldSave := custodyColumns[dataColumnSidecar.ColumnIndex] if !shouldSave { // We do not custody this column, so we dot not need to save it. continue @@ -101,6 +104,11 @@ func (s *Service) reconstructDataColumns(ctx context.Context, verifiedRODataColu if err := s.cfg.blobStorage.SaveDataColumn(verifiedRoDataColumn); err != nil { return errors.Wrap(err, "save column") } + + // Mark the data column as stored (but not received). + if err := s.setStoredDataColumn(blockRoot, dataColumnSidecar.ColumnIndex); err != nil { + return errors.Wrap(err, "set stored data column") + } } log.WithField("root", fmt.Sprintf("%x", blockRoot)).Debug("Data columns reconstructed and saved successfully") @@ -118,48 +126,58 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( blockRoot [fieldparams.RootLength]byte, dataColumn blocks.VerifiedRODataColumn, ) error { + log := log.WithField("root", fmt.Sprintf("%x", blockRoot)) + // Retrieve the slot of the block. slot := dataColumn.Slot() // Get the time corresponding to the start of the slot. - slotStart, err := slots.ToTime(uint64(s.cfg.chain.GenesisTime().Unix()), slot) + genesisTime := uint64(s.cfg.chain.GenesisTime().Unix()) + slotStartTime, err := slots.ToTime(genesisTime, slot) if err != nil { return errors.Wrap(err, "to time") } // Compute when to broadcast the missing data columns. - broadcastTime := slotStart.Add(broadCastMissingDataColumnsTimeIntoSlot) + broadcastTime := slotStartTime.Add(broadCastMissingDataColumnsTimeIntoSlot) // Compute the waiting time. This could be negative. In such a case, broadcast immediately. waitingTime := time.Until(broadcastTime) time.AfterFunc(waitingTime, func() { s.dataColumsnReconstructionLock.Lock() - defer s.deleteReceivedDataColumns(blockRoot) defer s.dataColumsnReconstructionLock.Unlock() // Get the received by gossip data columns. - receivedDataColumns := s.receivedDataColumns(blockRoot) + receivedDataColumns, err := s.receivedDataColumns(blockRoot) + if err != nil { + log.WithError(err).Error("Received data columns") + return + } + if receivedDataColumns == nil { - log.WithField("root", fmt.Sprintf("%x", blockRoot)).Error("No received data columns") + log.Error("No received data columns") + return } // Get the data columns we should store. - custodiedDataColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), peerdas.CustodySubnetCount()) + nodeID := s.cfg.p2p.NodeID() + custodySubnetCount := peerdas.CustodySubnetCount() + custodyDataColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) if err != nil { log.WithError(err).Error("Custody columns") } // Get the data columns we actually store. - storedDataColumns, err := s.cfg.blobStorage.ColumnIndices(blockRoot) + storedDataColumns, err := s.storedDataColumns(blockRoot) if err != nil { log.WithField("root", fmt.Sprintf("%x", blockRoot)).WithError(err).Error("Columns indices") return } // Compute the missing data columns (data columns we should custody but we do not have received via gossip.) - missingColumns := make(map[uint64]bool, len(custodiedDataColumns)) - for column := range custodiedDataColumns { + missingColumns := make(map[uint64]bool, len(custodyDataColumns)) + for column := range custodyDataColumns { if ok := receivedDataColumns[column]; !ok { missingColumns[column] = true } @@ -178,7 +196,7 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( "root": fmt.Sprintf("%x", blockRoot), "slot": slot, "column": column, - }).Error("Data column not received nor reconstructed.") + }).Error("Data column not received nor reconstructed") continue } @@ -191,9 +209,8 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( // Compute the subnet for this column. subnet := column % params.BeaconConfig().DataColumnSidecarSubnetCount - // Broadcast the missing data column. - if err := s.cfg.p2p.BroadcastDataColumn(ctx, subnet, dataColumnSidecar); err != nil { + if err := s.cfg.p2p.BroadcastDataColumn(ctx, blockRoot, subnet, dataColumnSidecar); err != nil { log.WithError(err).Error("Broadcast data column") } } @@ -205,62 +222,128 @@ func (s *Service) scheduleReconstructedDataColumnsBroadcast( } // Sort the missing data columns. - sort.Slice(missingColumnsList, func(i, j int) bool { - return missingColumnsList[i] < missingColumnsList[j] - }) + slices.Sort[[]uint64](missingColumnsList) log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%x", blockRoot), "slot": slot, "timeIntoSlot": broadCastMissingDataColumnsTimeIntoSlot, "columns": missingColumnsList, - }).Debug("Broadcasting not seen via gossip but reconstructed data columns") + }).Debug("Start broadcasting not seen via gossip but reconstructed data columns") }) return nil } // setReceivedDataColumn marks the data column for a given root as received. -func (s *Service) setReceivedDataColumn(root [fieldparams.RootLength]byte, columnIndex uint64) { +func (s *Service) setReceivedDataColumn(root [fieldparams.RootLength]byte, columnIndex uint64) error { s.receivedDataColumnsFromRootLock.Lock() defer s.receivedDataColumnsFromRootLock.Unlock() - // Get all the received data columns for this root. - receivedDataColumns, ok := s.receivedDataColumnsFromRoot[root] - if !ok { - // Create the map for this block root if needed. - receivedDataColumns = make(map[uint64]bool, params.BeaconConfig().NumberOfColumns) - s.receivedDataColumnsFromRoot[root] = receivedDataColumns + if err := setDataColumnCache(s.receivedDataColumnsFromRoot, root, columnIndex); err != nil { + return errors.Wrap(err, "set data column cache") } - // Mark the data column as received. - receivedDataColumns[columnIndex] = true + return nil } // receivedDataColumns returns the received data columns for a given root. -func (s *Service) receivedDataColumns(root [fieldparams.RootLength]byte) map[uint64]bool { - s.receivedDataColumnsFromRootLock.RLock() - defer s.receivedDataColumnsFromRootLock.RUnlock() +func (s *Service) receivedDataColumns(root [fieldparams.RootLength]byte) (map[uint64]bool, error) { + dataColumns, err := dataColumnsCache(s.receivedDataColumnsFromRoot, root) + if err != nil { + return nil, errors.Wrap(err, "data columns cache") + } - // Get all the received data columns for this root. - receivedDataColumns, ok := s.receivedDataColumnsFromRoot[root] + return dataColumns, nil +} + +// setStorededDataColumn marks the data column for a given root as stored. +func (s *Service) setStoredDataColumn(root [fieldparams.RootLength]byte, columnIndex uint64) error { + s.storedDataColumnsFromRootLock.Lock() + defer s.storedDataColumnsFromRootLock.Unlock() + + if err := setDataColumnCache(s.storedDataColumnsFromRoot, root, columnIndex); err != nil { + return errors.Wrap(err, "set data column cache") + } + + return nil +} + +// storedDataColumns returns the received data columns for a given root. +func (s *Service) storedDataColumns(root [fieldparams.RootLength]byte) (map[uint64]bool, error) { + dataColumns, err := dataColumnsCache(s.storedDataColumnsFromRoot, root) + if err != nil { + return nil, errors.Wrap(err, "data columns cache") + } + + return dataColumns, nil +} + +// setDataColumnCache sets the data column for a given root in columnsCache. +// The caller should hold the lock for the cache. +func setDataColumnCache(columnsCache *cache.Cache, root [fieldparams.RootLength]byte, columnIndex uint64) error { + if columnIndex >= fieldparams.NumberOfColumns { + return errors.Errorf("column index out of bounds: got %d, expected < %d", columnIndex, fieldparams.NumberOfColumns) + } + + rootString := fmt.Sprintf("%#x", root) + + // Get all the data columns for this root. + items, ok := columnsCache.Get(rootString) if !ok { + var columns [fieldparams.NumberOfColumns]bool + columns[columnIndex] = true + columnsCache.Set(rootString, columns, cache.DefaultExpiration) + return nil } - // Copy the received data columns. - copied := make(map[uint64]bool, len(receivedDataColumns)) - for column, received := range receivedDataColumns { - copied[column] = received + // Cast the array. + columns, ok := items.([fieldparams.NumberOfColumns]bool) + if !ok { + return errors.New("cannot cast data columns from cache") } - return copied + // Add the data column to the data columns. + columns[columnIndex] = true + + // Update the data columns in the cache. + columnsCache.Set(rootString, columns, cache.DefaultExpiration) + + return nil } -// deleteReceivedDataColumns deletes the received data columns for a given root. -func (s *Service) deleteReceivedDataColumns(root [fieldparams.RootLength]byte) { - s.receivedDataColumnsFromRootLock.Lock() - defer s.receivedDataColumnsFromRootLock.Unlock() +// dataColumnsCache returns the data columns for a given root in columnsCache. +func dataColumnsCache(columnsCache *cache.Cache, root [fieldparams.RootLength]byte) (map[uint64]bool, error) { + rootString := fmt.Sprintf("%#x", root) + + // Get all the data columns for this root. + items, ok := columnsCache.Get(rootString) + if !ok { + return nil, nil + } + + // Cast the array. + dataColumns, ok := items.([fieldparams.NumberOfColumns]bool) + if !ok { + return nil, errors.New("Cannot cast data columns from cache") + } + + // Convert to map. + result := columnsArrayToMap(dataColumns) + + return result, nil +} + +// columnsArrayToMap converts an array of columns to a map of columns. +func columnsArrayToMap(columnsArray [fieldparams.NumberOfColumns]bool) map[uint64]bool { + columnsMap := make(map[uint64]bool) + + for i, v := range columnsArray { + if v { + columnsMap[uint64(i)] = v + } + } - delete(s.receivedDataColumnsFromRoot, root) + return columnsMap } diff --git a/beacon-chain/sync/data_columns_reconstruct_test.go b/beacon-chain/sync/data_columns_reconstruct_test.go new file mode 100644 index 000000000000..e8879b06edc8 --- /dev/null +++ b/beacon-chain/sync/data_columns_reconstruct_test.go @@ -0,0 +1,87 @@ +package sync + +import ( + "testing" + "time" + + "github.com/patrickmn/go-cache" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestDataColumnsCache(t *testing.T) { + var ( + root1 [fieldparams.RootLength]byte + root2 [fieldparams.RootLength]byte + ) + + root1[0] = 1 + root2[0] = 2 + + columnsCache := cache.New(1*time.Minute, 2*time.Minute) + + // Retrieve a non-existent entry + res, err := dataColumnsCache(columnsCache, root1) + require.NoError(t, err) + require.Equal(t, 0, len(res)) + + res, err = dataColumnsCache(columnsCache, root2) + require.NoError(t, err) + require.Equal(t, 0, len(res)) + + // Set an entry in an empty cache for this root + err = setDataColumnCache(columnsCache, root1, 1) + require.NoError(t, err) + + err = setDataColumnCache(columnsCache, root2, 2) + require.NoError(t, err) + + // Retrieve the entry + res, err = dataColumnsCache(columnsCache, root1) + require.NoError(t, err) + require.Equal(t, 1, len(res)) + require.Equal(t, true, res[1]) + + res, err = dataColumnsCache(columnsCache, root2) + require.NoError(t, err) + require.Equal(t, 1, len(res)) + require.Equal(t, true, res[2]) + + // Set a new entry in the cache + err = setDataColumnCache(columnsCache, root1, 11) + require.NoError(t, err) + + err = setDataColumnCache(columnsCache, root2, 22) + require.NoError(t, err) + + // Retrieve the entries + res, err = dataColumnsCache(columnsCache, root1) + require.NoError(t, err) + require.Equal(t, 2, len(res)) + require.Equal(t, true, res[1]) + require.Equal(t, true, res[11]) + + res, err = dataColumnsCache(columnsCache, root2) + require.NoError(t, err) + require.Equal(t, 2, len(res)) + require.Equal(t, true, res[2]) + require.Equal(t, true, res[22]) +} + +func TestColumnsArrayToMap(t *testing.T) { + var input [fieldparams.NumberOfColumns]bool + input[0] = true + input[7] = true + input[14] = true + input[125] = true + + expected := map[uint64]bool{0: true, 7: true, 14: true, 125: true} + + actual := columnsArrayToMap(input) + + require.Equal(t, len(expected), len(actual)) + + for k, v := range expected { + require.Equal(t, v, actual[k]) + } +} diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index e9b62b4f0e90..ddc53d374342 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -14,9 +14,9 @@ import ( coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" @@ -45,7 +45,6 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int defer cancel() SetRPCStreamDeadlines(stream) - log := log.WithField("handler", p2p.DataColumnSidecarsByRootName[1:]) // slice the leading slash off the name var // We use the same type as for blobs as they are the same data structure. // TODO: Make the type naming more generic to be extensible to data columns @@ -55,7 +54,6 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int } requestedColumnIdents := *ref - requestedColumnsCount := uint64(len(requestedColumnIdents)) if err := validateDataColumnsByRootRequest(requestedColumnIdents); err != nil { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) @@ -66,9 +64,29 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int // Sort the identifiers so that requests for the same blob root will be adjacent, minimizing db lookups. sort.Sort(&requestedColumnIdents) - requestedColumnsList := make([]uint64, 0, len(requestedColumnIdents)) - for _, ident := range requestedColumnIdents { - requestedColumnsList = append(requestedColumnsList, ident.ColumnIndex) + numberOfColumns := params.BeaconConfig().NumberOfColumns + + requestedColumnsByRoot := make(map[[fieldparams.RootLength]byte]map[uint64]bool) + for _, columnIdent := range requestedColumnIdents { + var root [fieldparams.RootLength]byte + copy(root[:], columnIdent.BlockRoot) + + columnIndex := columnIdent.ColumnIndex + + if _, ok := requestedColumnsByRoot[root]; !ok { + requestedColumnsByRoot[root] = map[uint64]bool{columnIndex: true} + continue + } + + requestedColumnsByRoot[root][columnIndex] = true + } + + requestedColumnsByRootLog := make(map[[fieldparams.RootLength]byte]interface{}) + for root, columns := range requestedColumnsByRoot { + requestedColumnsByRootLog[root] = "all" + if uint64(len(columns)) != numberOfColumns { + requestedColumnsByRootLog[root] = uint64MapToSortedSlice(columns) + } } batchSize := flags.Get().DataColumnBatchLimit @@ -84,43 +102,41 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int return errors.Wrapf(err, "unexpected error computing min valid blob request slot, current_slot=%d", cs) } - // Compute all custodied columns. - custodiedColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), peerdas.CustodySubnetCount()) + // Compute all custody columns. + nodeID := s.cfg.p2p.NodeID() + custodySubnetCount := peerdas.CustodySubnetCount() + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) + custodyColumnsCount := uint64(len(custodyColumns)) + if err != nil { log.WithError(err).Errorf("unexpected error retrieving the node id") s.writeErrorResponseToStream(responseCodeServerError, types.ErrGeneric.Error(), stream) return errors.Wrap(err, "custody columns") } - numberOfColumns := params.BeaconConfig().NumberOfColumns - - var ( - custodied interface{} = "all" - requested interface{} = "all" - ) + var custody interface{} = "all" - custodiedColumnsCount := uint64(len(custodiedColumns)) - - if custodiedColumnsCount != numberOfColumns { - custodied = uint64MapToSortedSlice(custodiedColumns) + if custodyColumnsCount != numberOfColumns { + custody = uint64MapToSortedSlice(custodyColumns) } - if requestedColumnsCount != numberOfColumns { - requested = requestedColumnsList - } + remotePeer := stream.Conn().RemotePeer() + log := log.WithFields(logrus.Fields{ + "peer": remotePeer, + "custody": custody, + }) - custodiedColumnsList := make([]uint64, 0, len(custodiedColumns)) - for column := range custodiedColumns { - custodiedColumnsList = append(custodiedColumnsList, column) - } + i := 0 + for root, columns := range requestedColumnsByRootLog { + log = log.WithFields(logrus.Fields{ + fmt.Sprintf("root%d", i): fmt.Sprintf("%#x", root), + fmt.Sprintf("columns%d", i): columns, + }) - // Sort the custodied columns by index. - slices.Sort[[]uint64](custodiedColumnsList) + i++ + } - log.WithFields(logrus.Fields{ - "custodied": custodied, - "requested": requested, - }).Debug("Data column sidecar by root request received") + log.Debug("Serving data column sidecar by root request") // Subscribe to the data column feed. rootIndexChan := make(chan filesystem.RootIndexPair) @@ -150,7 +166,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int requestedRoot, requestedIndex := bytesutil.ToBytes32(requestedColumnIdents[i].BlockRoot), requestedColumnIdents[i].ColumnIndex // Decrease the peer's score if it requests a column that is not custodied. - isCustodied := custodiedColumns[requestedIndex] + isCustodied := custodyColumns[requestedIndex] if !isCustodied { s.cfg.p2p.Peers().Scorers().BadResponsesScorer().Increment(stream.Conn().RemotePeer()) s.writeErrorResponseToStream(responseCodeInvalidRequest, types.ErrInvalidColumnIndex.Error(), stream) diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index e50f29d0af49..41316a7142f8 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -38,7 +38,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/backfill/coverage" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" lruwrpr "github.com/prysmaticlabs/prysm/v5/cache/lru" - fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces" @@ -167,26 +166,34 @@ type Service struct { newColumnVerifier verification.NewColumnVerifier availableBlocker coverage.AvailableBlocker dataColumsnReconstructionLock sync.Mutex - receivedDataColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool + receivedDataColumnsFromRoot *gcache.Cache receivedDataColumnsFromRootLock sync.RWMutex + storedDataColumnsFromRoot *gcache.Cache + storedDataColumnsFromRootLock sync.RWMutex ctxMap ContextByteVersions } // NewService initializes new regular sync service. func NewService(ctx context.Context, opts ...Option) *Service { - c := gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */) + const ( + dataColumnCacheExpiration = 1 * time.Minute + dataColumnCacheCleanupInterval = 2 * time.Minute + ) + ctx, cancel := context.WithCancel(ctx) r := &Service{ ctx: ctx, cancel: cancel, chainStarted: abool.New(), cfg: &config{clock: startup.NewClock(time.Unix(0, 0), [32]byte{})}, - slotToPendingBlocks: c, + slotToPendingBlocks: gcache.New(pendingBlockExpTime /* exp time */, 0 /* disable janitor */), seenPendingBlocks: make(map[[32]byte]bool), blkRootToPendingAtts: make(map[[32]byte][]ethpb.SignedAggregateAttAndProof), signatureChan: make(chan *signatureVerifier, verifierLimit), - receivedDataColumnsFromRoot: make(map[[32]byte]map[uint64]bool), + receivedDataColumnsFromRoot: gcache.New(dataColumnCacheExpiration, dataColumnCacheCleanupInterval), + storedDataColumnsFromRoot: gcache.New(dataColumnCacheExpiration, dataColumnCacheCleanupInterval), } + for _, opt := range opts { if err := opt(r); err != nil { return nil diff --git a/beacon-chain/sync/subscriber_data_column_sidecar.go b/beacon-chain/sync/subscriber_data_column_sidecar.go index 3936d3fd14f5..0aa81605aff9 100644 --- a/beacon-chain/sync/subscriber_data_column_sidecar.go +++ b/beacon-chain/sync/subscriber_data_column_sidecar.go @@ -17,13 +17,26 @@ func (s *Service) dataColumnSubscriber(ctx context.Context, msg proto.Message) e return fmt.Errorf("message was not type blocks.VerifiedRODataColumn, type=%T", msg) } - s.setSeenDataColumnIndex(dc.SignedBlockHeader.Header.Slot, dc.SignedBlockHeader.Header.ProposerIndex, dc.ColumnIndex) - s.setReceivedDataColumn(dc.BlockRoot(), dc.ColumnIndex) + slot := dc.SignedBlockHeader.Header.Slot + proposerIndex := dc.SignedBlockHeader.Header.ProposerIndex + columnIndex := dc.ColumnIndex + blockRoot := dc.BlockRoot() + + s.setSeenDataColumnIndex(slot, proposerIndex, columnIndex) if err := s.cfg.chain.ReceiveDataColumn(dc); err != nil { return errors.Wrap(err, "receive data column") } + // Mark the data column as both received and stored. + if err := s.setReceivedDataColumn(blockRoot, columnIndex); err != nil { + return errors.Wrap(err, "set received data column") + } + + if err := s.setStoredDataColumn(blockRoot, columnIndex); err != nil { + return errors.Wrap(err, "set stored data column") + } + s.cfg.operationNotifier.OperationFeed().Send(&feed.Event{ Type: opfeed.DataColumnSidecarReceived, Data: &opfeed.DataColumnSidecarReceivedData{