Skip to content

Commit e92cde5

Browse files
committed
PeerDAS: Batch columns verifications (#14559)
* `ColumnAlignsWithBlock`: Split lines. * Data columns verifications: Batch * Remove completely `DataColumnBatchVerifier`. Only `DataColumnsVerifier` (with `s`) on columns remains. It is the responsability of the function which receive the data column (either by gossip, by range request or by root request) to verify the data column wrt. corresponding checks. * Fix Nishant's comment.
1 parent c8af1e3 commit e92cde5

24 files changed

+1359
-893
lines changed

beacon-chain/core/peerdas/helpers.go

Lines changed: 36 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -405,34 +405,53 @@ func DataColumnSidecarsForReconstruct(
405405
return sidecars, nil
406406
}
407407

408-
// VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular
409-
// data column.
410-
func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) {
408+
// VerifyDataColumnsSidecarKZGProofs verifies the provided KZG Proofs of data columns.
409+
func VerifyDataColumnsSidecarKZGProofs(sidecars []blocks.RODataColumn) (bool, error) {
410+
// Retrieve the number of columns.
411411
numberOfColumns := params.BeaconConfig().NumberOfColumns
412412

413-
if sc.ColumnIndex >= numberOfColumns {
414-
return false, errIndexTooLarge
413+
// Compute the total count.
414+
count := 0
415+
for _, sidecar := range sidecars {
416+
count += len(sidecar.DataColumn)
415417
}
416418

417-
if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) {
418-
return false, errMismatchLength
419-
}
420-
421-
count := len(sc.DataColumn)
422-
423419
commitments := make([]kzg.Bytes48, 0, count)
424420
indices := make([]uint64, 0, count)
425421
cells := make([]kzg.Cell, 0, count)
426422
proofs := make([]kzg.Bytes48, 0, count)
427423

428-
for i := range sc.DataColumn {
429-
commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i]))
430-
indices = append(indices, sc.ColumnIndex)
431-
cells = append(cells, kzg.Cell(sc.DataColumn[i]))
432-
proofs = append(proofs, kzg.Bytes48(sc.KzgProof[i]))
424+
for _, sidecar := range sidecars {
425+
// Check if the columns index is not too large
426+
if sidecar.ColumnIndex >= numberOfColumns {
427+
return false, errIndexTooLarge
428+
}
429+
430+
// Check if the KZG commitments size and data column size match.
431+
if len(sidecar.DataColumn) != len(sidecar.KzgCommitments) {
432+
return false, errMismatchLength
433+
}
434+
435+
// Check if the KZG proofs size and data column size match.
436+
if len(sidecar.DataColumn) != len(sidecar.KzgProof) {
437+
return false, errMismatchLength
438+
}
439+
440+
for i := range sidecar.DataColumn {
441+
commitments = append(commitments, kzg.Bytes48(sidecar.KzgCommitments[i]))
442+
indices = append(indices, sidecar.ColumnIndex)
443+
cells = append(cells, kzg.Cell(sidecar.DataColumn[i]))
444+
proofs = append(proofs, kzg.Bytes48(sidecar.KzgProof[i]))
445+
}
446+
}
447+
448+
// Verify all the batch at once.
449+
verified, err := kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
450+
if err != nil {
451+
return false, errors.Wrap(err, "verify cell KZG proof batch")
433452
}
434453

435-
return kzg.VerifyCellKZGProofBatch(commitments, indices, cells, proofs)
454+
return verified, nil
436455
}
437456

438457
// CustodySubnetCount returns the number of subnets the node should participate in for custody.

beacon-chain/core/peerdas/helpers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func TestVerifyDataColumnSidecarKZGProofs(t *testing.T) {
9393
for i, sidecar := range sCars {
9494
roCol, err := blocks.NewRODataColumn(sidecar)
9595
require.NoError(t, err)
96-
verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roCol)
96+
verified, err := peerdas.VerifyDataColumnsSidecarKZGProofs([]blocks.RODataColumn{roCol})
9797
require.NoError(t, err)
9898
require.Equal(t, true, verified, fmt.Sprintf("sidecar %d failed", i))
9999
}

beacon-chain/das/availability_columns.go

Lines changed: 15 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
errors "github.com/pkg/errors"
99
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
1010
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
11-
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
1211
"github.com/prysmaticlabs/prysm/v5/config/params"
1312
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
1413
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
@@ -21,22 +20,14 @@ import (
2120
// This implementation will hold any blobs passed to Persist until the IsDataAvailable is called for their
2221
// block, at which time they will undergo full verification and be saved to the disk.
2322
type LazilyPersistentStoreColumn struct {
24-
store *filesystem.BlobStorage
25-
cache *cache
26-
verifier ColumnBatchVerifier
27-
nodeID enode.ID
23+
store *filesystem.BlobStorage
24+
cache *cache
2825
}
2926

30-
type ColumnBatchVerifier interface {
31-
VerifiedRODataColumns(ctx context.Context, blk blocks.ROBlock, sc []blocks.RODataColumn) ([]blocks.VerifiedRODataColumn, error)
32-
}
33-
34-
func NewLazilyPersistentStoreColumn(store *filesystem.BlobStorage, verifier ColumnBatchVerifier, id enode.ID) *LazilyPersistentStoreColumn {
27+
func NewLazilyPersistentStoreColumn(store *filesystem.BlobStorage) *LazilyPersistentStoreColumn {
3528
return &LazilyPersistentStoreColumn{
36-
store: store,
37-
cache: newCache(),
38-
verifier: verifier,
39-
nodeID: id,
29+
store: store,
30+
cache: newCache(),
4031
}
4132
}
4233

@@ -120,33 +111,23 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
120111
// Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent.
121112
// We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather
122113
// ignore their response and decrease their peer score.
123-
sidecars, err := entry.filterColumns(blockRoot, blockCommitments)
114+
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
124115
if err != nil {
125116
return errors.Wrap(err, "incomplete BlobSidecar batch")
126117
}
127118

128-
// Do thorough verifications of each RODataColumns for the block.
129-
// Same as above, we don't save DataColumnsSidecars if there are any problems with the batch.
130-
vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars)
131-
if err != nil {
132-
var me verification.VerificationMultiError
133-
ok := errors.As(err, &me)
134-
if ok {
135-
fails := me.Failures()
136-
lf := make(log.Fields, len(fails))
137-
for i := range fails {
138-
lf[fmt.Sprintf("fail_%d", i)] = fails[i].Error()
139-
}
140-
log.WithFields(lf).
141-
Debug("invalid ColumnSidecars received")
142-
}
143-
return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot)
119+
// Create verified RO data columns from RO data columns.
120+
verifiedRODataColumns := make([]blocks.VerifiedRODataColumn, 0, len(roDataColumns))
121+
122+
for _, roDataColumn := range roDataColumns {
123+
verifiedRODataColumn := blocks.NewVerifiedRODataColumn(roDataColumn)
124+
verifiedRODataColumns = append(verifiedRODataColumns, verifiedRODataColumn)
144125
}
145126

146127
// Ensure that each column sidecar is written to disk.
147-
for i := range vscs {
148-
if err := s.store.SaveDataColumn(vscs[i]); err != nil {
149-
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot)
128+
for _, verifiedRODataColumn := range verifiedRODataColumns {
129+
if err := s.store.SaveDataColumn(verifiedRODataColumn); err != nil {
130+
return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", verifiedRODataColumn.ColumnIndex, blockRoot)
150131
}
151132
}
152133

beacon-chain/sync/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ go_test(
211211
"//beacon-chain/core/altair:go_default_library",
212212
"//beacon-chain/core/feed:go_default_library",
213213
"//beacon-chain/core/feed/operation:go_default_library",
214+
"//beacon-chain/core/feed/state:go_default_library",
214215
"//beacon-chain/core/helpers:go_default_library",
215216
"//beacon-chain/core/peerdas:go_default_library",
216217
"//beacon-chain/core/signing:go_default_library",

beacon-chain/sync/data_columns_sampling.go

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/libp2p/go-libp2p/core/peer"
1212
"github.com/pkg/errors"
13+
"github.com/prysmaticlabs/prysm/v5/beacon-chain/sync/verify"
1314
"github.com/prysmaticlabs/prysm/v5/beacon-chain/verification"
1415
"github.com/sirupsen/logrus"
1516

@@ -60,7 +61,7 @@ type dataColumnSampler1D struct {
6061
// peerFromColumn maps a column to the peer responsible for custody.
6162
peerFromColumn map[uint64]map[peer.ID]bool
6263
// columnVerifier verifies a column according to the specified requirements.
63-
columnVerifier verification.NewColumnVerifier
64+
columnVerifier verification.NewDataColumnsVerifier
6465
}
6566

6667
// newDataColumnSampler1D creates a new 1D data column sampler.
@@ -69,7 +70,7 @@ func newDataColumnSampler1D(
6970
clock *startup.Clock,
7071
ctxMap ContextByteVersions,
7172
stateNotifier statefeed.Notifier,
72-
colVerifier verification.NewColumnVerifier,
73+
colVerifier verification.NewDataColumnsVerifier,
7374
) *dataColumnSampler1D {
7475
numColumns := params.BeaconConfig().NumberOfColumns
7576
peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns)
@@ -265,7 +266,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
265266
samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2)
266267

267268
// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
268-
_, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
269+
_, _, err = d.incrementalDAS(ctx, data, randomizedColumns, samplesCount)
269270
if err != nil {
270271
log.WithError(err).Error("Failed to run incremental DAS")
271272
}
@@ -276,21 +277,22 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
276277
// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns.
277278
func (d *dataColumnSampler1D) incrementalDAS(
278279
ctx context.Context,
279-
root [fieldparams.RootLength]byte,
280+
blockProcessedData *statefeed.BlockProcessedData,
280281
columns []uint64,
281282
sampleCount uint64,
282283
) (bool, []roundSummary, error) {
283284
allowedFailures := uint64(0)
284285
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
285286
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.
287+
blockRoot := blockProcessedData.BlockRoot
286288

287289
start := time.Now()
288290

289291
for round := 1; ; /*No exit condition */ round++ {
290292
if extendedSampleCount > uint64(len(columns)) {
291293
// We already tried to sample all possible columns, this is the unhappy path.
292294
log.WithFields(logrus.Fields{
293-
"root": fmt.Sprintf("%#x", root),
295+
"root": fmt.Sprintf("%#x", blockRoot),
294296
"round": round - 1,
295297
}).Warning("Some columns are still missing after trying to sample all possible columns")
296298
return false, roundSummaries, nil
@@ -301,13 +303,13 @@ func (d *dataColumnSampler1D) incrementalDAS(
301303
columnsToSampleCount := extendedSampleCount - firstColumnToSample
302304

303305
log.WithFields(logrus.Fields{
304-
"root": fmt.Sprintf("%#x", root),
306+
"root": fmt.Sprintf("%#x", blockRoot),
305307
"columns": columnsToSample,
306308
"round": round,
307309
}).Debug("Start data columns sampling")
308310

309311
// Sample data columns from peers in parallel.
310-
retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample)
312+
retrievedSamples := d.sampleDataColumns(ctx, blockProcessedData, columnsToSample)
311313

312314
missingSamples := make(map[uint64]bool)
313315
for _, column := range columnsToSample {
@@ -325,7 +327,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
325327
if retrievedSampleCount == columnsToSampleCount {
326328
// All columns were correctly sampled, this is the happy path.
327329
log.WithFields(logrus.Fields{
328-
"root": fmt.Sprintf("%#x", root),
330+
"root": fmt.Sprintf("%#x", blockRoot),
329331
"neededRounds": round,
330332
"duration": time.Since(start),
331333
}).Debug("All columns were successfully sampled")
@@ -344,7 +346,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
344346
extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
345347

346348
log.WithFields(logrus.Fields{
347-
"root": fmt.Sprintf("%#x", root),
349+
"root": fmt.Sprintf("%#x", blockRoot),
348350
"round": round,
349351
"missingColumnsCount": allowedFailures,
350352
"currentSampleIndex": oldExtendedSampleCount,
@@ -355,7 +357,7 @@ func (d *dataColumnSampler1D) incrementalDAS(
355357

356358
func (d *dataColumnSampler1D) sampleDataColumns(
357359
ctx context.Context,
358-
root [fieldparams.RootLength]byte,
360+
blockProcessedData *statefeed.BlockProcessedData,
359361
columns []uint64,
360362
) map[uint64]bool {
361363
// distribute samples to peer
@@ -365,10 +367,12 @@ func (d *dataColumnSampler1D) sampleDataColumns(
365367
mu sync.Mutex
366368
wg sync.WaitGroup
367369
)
370+
368371
res := make(map[uint64]bool)
372+
369373
sampleFromPeer := func(pid peer.ID, cols map[uint64]bool) {
370374
defer wg.Done()
371-
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, root, cols)
375+
retrieved := d.sampleDataColumnsFromPeer(ctx, pid, blockProcessedData, cols)
372376

373377
mu.Lock()
374378
for col := range retrieved {
@@ -414,15 +418,15 @@ func (d *dataColumnSampler1D) distributeSamplesToPeer(
414418
func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
415419
ctx context.Context,
416420
pid peer.ID,
417-
root [fieldparams.RootLength]byte,
421+
blockProcessedData *statefeed.BlockProcessedData,
418422
requestedColumns map[uint64]bool,
419423
) map[uint64]bool {
420424
retrievedColumns := make(map[uint64]bool)
421425

422426
req := make(types.DataColumnSidecarsByRootReq, 0)
423427
for col := range requestedColumns {
424428
req = append(req, &eth.DataColumnIdentifier{
425-
BlockRoot: root[:],
429+
BlockRoot: blockProcessedData.BlockRoot[:],
426430
ColumnIndex: col,
427431
})
428432
}
@@ -434,22 +438,23 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
434438
return nil
435439
}
436440

441+
// TODO: Once peer sampling is used, we should verify all sampled data columns in a single batch instead of looping over columns.
437442
for _, roDataColumn := range roDataColumns {
438-
if verifyColumn(roDataColumn, root, pid, requestedColumns, d.columnVerifier) {
443+
if verifyColumn(roDataColumn, blockProcessedData, pid, requestedColumns, d.columnVerifier) {
439444
retrievedColumns[roDataColumn.ColumnIndex] = true
440445
}
441446
}
442447

443448
if len(retrievedColumns) == len(requestedColumns) {
444449
log.WithFields(logrus.Fields{
445450
"peerID": pid,
446-
"root": fmt.Sprintf("%#x", root),
451+
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
447452
"requestedColumns": sortedSliceFromMap(requestedColumns),
448453
}).Debug("Sampled columns from peer successfully")
449454
} else {
450455
log.WithFields(logrus.Fields{
451456
"peerID": pid,
452-
"root": fmt.Sprintf("%#x", root),
457+
"root": fmt.Sprintf("%#x", blockProcessedData.BlockRoot),
453458
"requestedColumns": sortedSliceFromMap(requestedColumns),
454459
"retrievedColumns": sortedSliceFromMap(retrievedColumns),
455460
}).Debug("Sampled columns from peer with some errors")
@@ -506,20 +511,22 @@ func selectRandomPeer(peers map[peer.ID]bool) peer.ID {
506511
// the KZG inclusion and the KZG proof.
507512
func verifyColumn(
508513
roDataColumn blocks.RODataColumn,
509-
root [32]byte,
514+
blockProcessedData *statefeed.BlockProcessedData,
510515
pid peer.ID,
511516
requestedColumns map[uint64]bool,
512-
columnVerifier verification.NewColumnVerifier,
517+
dataColumnsVerifier verification.NewDataColumnsVerifier,
513518
) bool {
514519
retrievedColumn := roDataColumn.ColumnIndex
515520

516521
// Filter out columns with incorrect root.
517-
actualRoot := roDataColumn.BlockRoot()
518-
if actualRoot != root {
522+
columnRoot := roDataColumn.BlockRoot()
523+
blockRoot := blockProcessedData.BlockRoot
524+
525+
if columnRoot != blockRoot {
519526
log.WithFields(logrus.Fields{
520527
"peerID": pid,
521-
"requestedRoot": fmt.Sprintf("%#x", root),
522-
"actualRoot": fmt.Sprintf("%#x", actualRoot),
528+
"requestedRoot": fmt.Sprintf("%#x", blockRoot),
529+
"columnRoot": fmt.Sprintf("%#x", columnRoot),
523530
}).Debug("Retrieved root does not match requested root")
524531

525532
return false
@@ -538,25 +545,18 @@ func verifyColumn(
538545
return false
539546
}
540547

541-
vf := columnVerifier(roDataColumn, verification.SamplingColumnSidecarRequirements)
542-
// Filter out columns which did not pass the KZG inclusion proof verification.
543-
if err := vf.SidecarInclusionProven(); err != nil {
544-
log.WithFields(logrus.Fields{
545-
"peerID": pid,
546-
"root": fmt.Sprintf("%#x", root),
547-
"index": retrievedColumn,
548-
}).WithError(err).Debug("Failed to verify KZG inclusion proof for retrieved column")
549-
return false
548+
roBlock := blockProcessedData.SignedBlock.Block()
549+
550+
wrappedBlockDataColumns := []verify.WrappedBlockDataColumn{
551+
{
552+
ROBlock: roBlock,
553+
RODataColumn: roDataColumn,
554+
},
550555
}
551556

552-
// Filter out columns which did not pass the KZG proof verification.
553-
if err := vf.SidecarKzgProofVerified(); err != nil {
554-
log.WithFields(logrus.Fields{
555-
"peerID": pid,
556-
"root": fmt.Sprintf("%#x", root),
557-
"index": retrievedColumn,
558-
}).WithError(err).Debug("Failed to verify KZG proof for retrieved column")
557+
if err := verify.DataColumnsAlignWithBlock(wrappedBlockDataColumns, dataColumnsVerifier); err != nil {
559558
return false
560559
}
560+
561561
return true
562562
}

0 commit comments

Comments
 (0)