Skip to content

Commit b2a6877

Browse files
authored
Fix data columns sampling (#14263)
* Fix the obvious... * Data columns sampling: Modify logging. * `waitForChainStart`: Set it threadsafe - Do only wait once. * Sampling: Wait for chain start before running the sampling. Reason: `newDataColumnSampler1D` needs `s.ctxMap`. `s.ctxMap` is only set when chain is started. Previously `waitForChainStart` was only called in `s.registerHandlers`, it self called in a go-routine. ==> We had a race condition here: Sometimes `newDataColumnSampler1D` were called once `s.ctxMap` were set, sometimes not. * Adresse Nishant's comments. * Sampling: Improve logging. * `waitForChainStart`: Remove `chainIsStarted` check.
1 parent ea57ca7 commit b2a6877

File tree

5 files changed

+46
-39
lines changed

5 files changed

+46
-39
lines changed

beacon-chain/sync/data_columns_sampling.go

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -170,8 +170,6 @@ func (d *dataColumnSampler1D) refreshPeerInfo() {
170170
}
171171
}
172172

173-
log.WithField("columnFromPeer", d.columnFromPeer).Debug("Peer info refreshed")
174-
175173
columnWithNoPeers := make([]uint64, 0)
176174
for column, peers := range d.peerFromColumn {
177175
if len(peers) == 0 {
@@ -228,7 +226,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
228226
return
229227
}
230228

231-
if coreTime.PeerDASIsActive(data.Slot) {
229+
if !coreTime.PeerDASIsActive(data.Slot) {
232230
// We do not trigger sampling if peerDAS is not active yet.
233231
return
234232
}
@@ -249,22 +247,12 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
249247
// Randomize columns for sample selection.
250248
randomizedColumns := randomizeColumns(d.nonCustodyColumns)
251249
samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2)
252-
ok, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
250+
251+
// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
252+
_, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
253253
if err != nil {
254254
log.WithError(err).Error("Failed to run incremental DAS")
255255
}
256-
257-
if ok {
258-
log.WithFields(logrus.Fields{
259-
"root": fmt.Sprintf("%#x", data.BlockRoot),
260-
"columns": randomizedColumns,
261-
}).Debug("Data column sampling successful")
262-
} else {
263-
log.WithFields(logrus.Fields{
264-
"root": fmt.Sprintf("%#x", data.BlockRoot),
265-
"columns": randomizedColumns,
266-
}).Warning("Data column sampling failed")
267-
}
268256
}
269257

270258
// incrementalDAS samples data columns from active peers using incremental DAS.
@@ -280,17 +268,28 @@ func (d *dataColumnSampler1D) incrementalDAS(
280268
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
281269
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.
282270

271+
start := time.Now()
272+
283273
for round := 1; ; /*No exit condition */ round++ {
284274
if extendedSampleCount > uint64(len(columns)) {
285275
// We already tried to sample all possible columns, this is the unhappy path.
286-
log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns")
276+
log.WithFields(logrus.Fields{
277+
"root": fmt.Sprintf("%#x", root),
278+
"round": round - 1,
279+
}).Warning("Some columns are still missing after trying to sample all possible columns")
287280
return false, roundSummaries, nil
288281
}
289282

290283
// Get the columns to sample for this round.
291284
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
292285
columnsToSampleCount := extendedSampleCount - firstColumnToSample
293286

287+
log.WithFields(logrus.Fields{
288+
"root": fmt.Sprintf("%#x", root),
289+
"columns": columnsToSample,
290+
"round": round,
291+
}).Debug("Start data columns sampling")
292+
294293
// Sample data columns from peers in parallel.
295294
retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample)
296295

@@ -311,7 +310,8 @@ func (d *dataColumnSampler1D) incrementalDAS(
311310
// All columns were correctly sampled, this is the happy path.
312311
log.WithFields(logrus.Fields{
313312
"root": fmt.Sprintf("%#x", root),
314-
"roundsNeeded": round,
313+
"neededRounds": round,
314+
"duration": time.Since(start),
315315
}).Debug("All columns were successfully sampled")
316316
return true, roundSummaries, nil
317317
}
@@ -429,14 +429,14 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
429429
"peerID": pid,
430430
"root": fmt.Sprintf("%#x", root),
431431
"requestedColumns": sortedSliceFromMap(requestedColumns),
432-
}).Debug("All requested columns were successfully sampled from peer")
432+
}).Debug("Sampled columns from peer successfully")
433433
} else {
434434
log.WithFields(logrus.Fields{
435435
"peerID": pid,
436436
"root": fmt.Sprintf("%#x", root),
437437
"requestedColumns": sortedSliceFromMap(requestedColumns),
438438
"retrievedColumns": sortedSliceFromMap(retrievedColumns),
439-
}).Debug("Some requested columns were not sampled from peer")
439+
}).Debug("Sampled columns from peer with some errors")
440440
}
441441

442442
return retrievedColumns

beacon-chain/sync/rpc_data_column_sidecars_by_root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
9494
"requested": requestedColumnsList,
9595
"custodiedCount": len(custodiedColumnsList),
9696
"requestedCount": len(requestedColumnsList),
97-
}).Debug("Received data column sidecar by root request")
97+
}).Debug("Data column sidecar by root request received")
9898

9999
// Subscribe to the data column feed.
100100
rootIndexChan := make(chan filesystem.RootIndexPair)

beacon-chain/sync/rpc_send_request.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,6 @@ func SendDataColumnSidecarByRoot(
232232
}
233233

234234
// Send the request to the peer.
235-
log.WithField("topic", topic).Debug("Sending data column sidecar request")
236235
stream, err := p2pApi.Send(ctx, req, topic, pid)
237236
if err != nil {
238237
return nil, errors.Wrap(err, "send")

beacon-chain/sync/service.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (s *Service) Start() {
239239
s.newColumnProposerVerifier = v.VerifyProposer
240240

241241
go s.verifierRoutine()
242-
go s.registerHandlers()
242+
go s.startTasksPostInitialSync()
243243

244244
s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye)
245245
s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error {
@@ -254,12 +254,6 @@ func (s *Service) Start() {
254254

255255
// Update sync metrics.
256256
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)
257-
258-
// Run data column sampling
259-
if params.PeerDASEnabled() {
260-
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
261-
go s.sampler.Run(s.ctx)
262-
}
263257
}
264258

265259
// Stop the regular sync service.
@@ -337,23 +331,37 @@ func (s *Service) waitForChainStart() {
337331
s.markForChainStart()
338332
}
339333

340-
func (s *Service) registerHandlers() {
334+
func (s *Service) startTasksPostInitialSync() {
335+
// Wait for the chain to start.
341336
s.waitForChainStart()
337+
342338
select {
343339
case <-s.initialSyncComplete:
344-
// Register respective pubsub handlers at state synced event.
345-
digest, err := s.currentForkDigest()
340+
// Compute the current epoch.
341+
currentSlot := slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix()))
342+
currentEpoch := slots.ToEpoch(currentSlot)
343+
344+
// Compute the current fork forkDigest.
345+
forkDigest, err := s.currentForkDigest()
346346
if err != nil {
347347
log.WithError(err).Error("Could not retrieve current fork digest")
348348
return
349349
}
350-
currentEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix())))
351-
s.registerSubscribers(currentEpoch, digest)
350+
351+
// Register respective pubsub handlers at state synced event.
352+
s.registerSubscribers(currentEpoch, forkDigest)
353+
354+
// Start the fork watcher.
352355
go s.forkWatcher()
353-
return
356+
357+
// Start data columns sampling if peerDAS is enabled.
358+
if params.PeerDASEnabled() {
359+
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
360+
go s.sampler.Run(s.ctx)
361+
}
362+
354363
case <-s.ctx.Done():
355364
log.Debug("Context closed, exiting goroutine")
356-
return
357365
}
358366
}
359367

beacon-chain/sync/service_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
6262
}
6363

6464
topic := "/eth2/%x/beacon_block"
65-
go r.registerHandlers()
65+
go r.startTasksPostInitialSync()
6666
time.Sleep(100 * time.Millisecond)
6767

6868
var vr [32]byte
@@ -143,7 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {
143143

144144
syncCompleteCh := make(chan bool)
145145
go func() {
146-
r.registerHandlers()
146+
r.startTasksPostInitialSync()
147147
syncCompleteCh <- true
148148
}()
149149

@@ -200,7 +200,7 @@ func TestSyncService_StopCleanly(t *testing.T) {
200200
initialSyncComplete: make(chan struct{}),
201201
}
202202

203-
go r.registerHandlers()
203+
go r.startTasksPostInitialSync()
204204
var vr [32]byte
205205
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
206206
r.waitForChainStart()

0 commit comments

Comments
 (0)