Skip to content

Commit 65e5ac4

Browse files
committed
Use processor
1 parent e2216c0 commit 65e5ac4

File tree

9 files changed

+185
-103
lines changed

9 files changed

+185
-103
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,15 @@ pub enum AvailabilityProcessingStatus {
181181
Imported(Hash256),
182182
}
183183

184+
pub enum ReconstructionOutcome<E: EthSpec> {
185+
Reconstructed {
186+
availability_processing_status: AvailabilityProcessingStatus,
187+
data_columns_to_publish: DataColumnSidecarList<E>,
188+
},
189+
Delay,
190+
NoReconstruction,
191+
}
192+
184193
impl TryInto<SignedBeaconBlockHash> for AvailabilityProcessingStatus {
185194
type Error = ();
186195

@@ -3240,13 +3249,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32403249
pub async fn reconstruct_data_columns(
32413250
self: &Arc<Self>,
32423251
block_root: Hash256,
3243-
) -> Result<
3244-
Option<(
3245-
AvailabilityProcessingStatus,
3246-
DataColumnSidecarList<T::EthSpec>,
3247-
)>,
3248-
BlockError,
3249-
> {
3252+
) -> Result<ReconstructionOutcome<T::EthSpec>, BlockError> {
32503253
// As of now we only reconstruct data columns on supernodes, so if the block is already
32513254
// available on a supernode, there's no need to reconstruct as the node must already have
32523255
// all columns.
@@ -3255,7 +3258,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32553258
.fork_choice_read_lock()
32563259
.contains_block(&block_root)
32573260
{
3258-
return Ok(None);
3261+
return Ok(ReconstructionOutcome::NoReconstruction);
32593262
}
32603263

32613264
let data_availability_checker = self.data_availability_checker.clone();
@@ -3274,25 +3277,29 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
32743277
DataColumnReconstructionResult::Success((availability, data_columns_to_publish)) => {
32753278
let Some(slot) = data_columns_to_publish.first().map(|d| d.slot()) else {
32763279
// This should be unreachable because empty result would return `RecoveredColumnsNotImported` instead of success.
3277-
return Ok(None);
3280+
return Err(BlockError::InternalError("should have columns".to_string()));
32783281
};
32793282

32803283
let r = self
32813284
.process_availability(slot, availability, || Ok(()))
32823285
.await;
32833286
self.remove_notified(&block_root, r)
3284-
.map(|availability_processing_status| {
3285-
Some((availability_processing_status, data_columns_to_publish))
3286-
})
3287+
.map(
3288+
|availability_processing_status| ReconstructionOutcome::Reconstructed {
3289+
availability_processing_status,
3290+
data_columns_to_publish,
3291+
},
3292+
)
32873293
}
3294+
DataColumnReconstructionResult::Reattempt => Ok(ReconstructionOutcome::Delay),
32883295
DataColumnReconstructionResult::NotStarted(reason)
32893296
| DataColumnReconstructionResult::RecoveredColumnsNotImported(reason) => {
32903297
// We use metric here because logging this would be *very* noisy.
32913298
metrics::inc_counter_vec(
32923299
&metrics::KZG_DATA_COLUMN_RECONSTRUCTION_INCOMPLETE_TOTAL,
32933300
&[reason],
32943301
);
3295-
Ok(None)
3302+
Ok(ReconstructionOutcome::NoReconstruction)
32963303
}
32973304
}
32983305
}

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub type AvailabilityAndReconstructedColumns<E> = (Availability<E>, DataColumnSi
8282
#[derive(Debug)]
8383
pub enum DataColumnReconstructionResult<E: EthSpec> {
8484
Success(AvailabilityAndReconstructedColumns<E>),
85+
Reattempt,
8586
NotStarted(&'static str),
8687
RecoveredColumnsNotImported(&'static str),
8788
}
@@ -523,6 +524,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
523524
.check_and_set_reconstruction_started(block_root)
524525
{
525526
ReconstructColumnsDecision::Yes(verified_data_columns) => verified_data_columns,
527+
ReconstructColumnsDecision::Wait => {
528+
return Ok(DataColumnReconstructionResult::Reattempt)
529+
}
526530
ReconstructColumnsDecision::No(reason) => {
527531
return Ok(DataColumnReconstructionResult::NotStarted(reason));
528532
}

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

Lines changed: 33 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ use parking_lot::RwLock;
1313
use std::cmp::Ordering;
1414
use std::num::NonZeroUsize;
1515
use std::sync::Arc;
16-
use std::thread::sleep;
17-
use std::time::Duration;
1816
use tracing::debug;
1917
use types::blob_sidecar::BlobIdentifier;
2018
use types::{
@@ -31,7 +29,7 @@ pub struct PendingComponents<E: EthSpec> {
3129
pub verified_blobs: RuntimeFixedVector<Option<KzgVerifiedBlob<E>>>,
3230
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
3331
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
34-
pub reconstruction_started: bool,
32+
pub reconstruction_state: ReconstructionState,
3533
}
3634

3735
impl<E: EthSpec> PendingComponents<E> {
@@ -280,7 +278,7 @@ impl<E: EthSpec> PendingComponents<E> {
280278
verified_blobs: RuntimeFixedVector::new(vec![None; max_len]),
281279
verified_data_columns: vec![],
282280
executed_block: None,
283-
reconstruction_started: false,
281+
reconstruction_state: ReconstructionState::NotStarted,
284282
}
285283
}
286284

@@ -340,6 +338,12 @@ impl<E: EthSpec> PendingComponents<E> {
340338
}
341339
}
342340

341+
pub enum ReconstructionState {
342+
NotStarted,
343+
WaitingForColumns { num_last: usize },
344+
Started,
345+
}
346+
343347
/// This is the main struct for this module. Outside methods should
344348
/// interact with the cache through this.
345349
pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
@@ -357,6 +361,7 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
357361
#[allow(clippy::large_enum_variant)]
358362
pub(crate) enum ReconstructColumnsDecision<E: EthSpec> {
359363
Yes(Vec<KzgVerifiedCustodyDataColumn<E>>),
364+
Wait,
360365
No(&'static str),
361366
}
362367

@@ -562,51 +567,39 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
562567

563568
// If we're sampling all columns, it means we must be custodying all columns.
564569
let total_column_count = self.spec.number_of_columns as usize;
565-
let mut received_column_count = pending_components.verified_data_columns.len();
570+
let received_column_count = pending_components.verified_data_columns.len();
566571

567-
if pending_components.reconstruction_started {
568-
return ReconstructColumnsDecision::No("already started");
569-
}
570572
if received_column_count >= total_column_count {
571573
return ReconstructColumnsDecision::No("all columns received");
572574
}
573575
if received_column_count < total_column_count / 2 {
574576
return ReconstructColumnsDecision::No("not enough columns");
575577
}
576578

577-
pending_components.reconstruction_started = true;
578-
579-
debug!(received_column_count, "Starting wait for more cols...");
580-
581-
// Instead of starting to reconstruct immediately, wait for more columns to arrive
582-
drop(write_lock);
583-
let mut iter = 1;
584-
loop {
585-
sleep(Duration::from_millis(100));
586-
let mut write_lock = self.critical.write();
587-
let Some(pending_components) = write_lock.get(block_root) else {
588-
// Block may have been imported as it does not exist in availability cache.
589-
return ReconstructColumnsDecision::No("block already imported");
590-
};
591-
let new_received_column_count = pending_components.verified_data_columns.len();
592-
593-
// Check if there is still a need to reconstruct.
594-
if new_received_column_count >= total_column_count {
595-
debug!(iter, new_received_column_count, "Got all!");
596-
return ReconstructColumnsDecision::No("all columns received");
579+
match pending_components.reconstruction_state {
580+
ReconstructionState::NotStarted => {
581+
pending_components.reconstruction_state = ReconstructionState::WaitingForColumns {
582+
num_last: received_column_count,
583+
};
584+
ReconstructColumnsDecision::Wait
597585
}
598-
599-
// Check if no new column arrived.
600-
if new_received_column_count == received_column_count {
601-
debug!(iter, new_received_column_count, "No new cols :/");
602-
return ReconstructColumnsDecision::Yes(pending_components.verified_data_columns.clone());
586+
ReconstructionState::WaitingForColumns { num_last } => {
587+
if num_last < received_column_count {
588+
// We got more columns, let's wait more
589+
pending_components.reconstruction_state =
590+
ReconstructionState::WaitingForColumns {
591+
num_last: received_column_count,
592+
};
593+
ReconstructColumnsDecision::Wait
594+
} else {
595+
// We made no progress waiting for columns, let's start.
596+
pending_components.reconstruction_state = ReconstructionState::Started;
597+
ReconstructColumnsDecision::Yes(
598+
pending_components.verified_data_columns.clone(),
599+
)
600+
}
603601
}
604-
605-
debug!(iter, new_received_column_count, "Waiting for more cols...");
606-
607-
// Update count for next check.
608-
received_column_count = new_received_column_count;
609-
iter += 1;
602+
ReconstructionState::Started => ReconstructColumnsDecision::No("already started"),
610603
}
611604
}
612605

@@ -616,7 +609,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
616609
pub fn handle_reconstruction_failure(&self, block_root: &Hash256) {
617610
if let Some(pending_components_mut) = self.critical.write().get_mut(block_root) {
618611
pending_components_mut.verified_data_columns = vec![];
619-
pending_components_mut.reconstruction_started = false;
612+
pending_components_mut.reconstruction_state = ReconstructionState::NotStarted;
620613
}
621614
}
622615

beacon_node/beacon_chain/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ pub use self::beacon_chain::{
6666
AttestationProcessingOutcome, AvailabilityProcessingStatus, BeaconBlockResponse,
6767
BeaconBlockResponseWrapper, BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus,
6868
ChainSegmentResult, ForkChoiceError, LightClientProducerEvent, OverrideForkchoiceUpdate,
69-
ProduceBlockVerification, StateSkipConfig, WhenSlotSkipped,
69+
ProduceBlockVerification, ReconstructionOutcome, StateSkipConfig, WhenSlotSkipped,
7070
INVALID_FINALIZED_MERGE_TRANSITION_BLOCK_SHUTDOWN_REASON,
7171
INVALID_JUSTIFIED_PAYLOAD_SHUTDOWN_REASON,
7272
};

beacon_node/beacon_processor/src/lib.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
//! task.
4040
4141
use crate::work_reprocessing_queue::{
42-
QueuedBackfillBatch, QueuedGossipBlock, ReprocessQueueMessage,
42+
QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
4343
};
4444
use futures::stream::{Stream, StreamExt};
4545
use futures::task::Poll;
@@ -117,6 +117,7 @@ pub struct BeaconProcessorQueueLengths {
117117
rpc_custody_column_queue: usize,
118118
rpc_verify_data_column_queue: usize,
119119
sampling_result_queue: usize,
120+
column_reconstruction_queue: usize,
120121
chain_segment_queue: usize,
121122
backfill_chain_segment: usize,
122123
gossip_block_queue: usize,
@@ -184,6 +185,7 @@ impl BeaconProcessorQueueLengths {
184185
rpc_verify_data_column_queue: 1000,
185186
unknown_block_sampling_request_queue: 16384,
186187
sampling_result_queue: 1000,
188+
column_reconstruction_queue: 64,
187189
chain_segment_queue: 64,
188190
backfill_chain_segment: 64,
189191
gossip_block_queue: 1024,
@@ -498,6 +500,10 @@ impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
498500
drop_during_sync: false,
499501
work: Work::ChainSegmentBackfill(process_fn),
500502
},
503+
ReadyWork::ColumnReconstruction(QueuedColumnReconstruction(process_fn)) => Self {
504+
drop_during_sync: true,
505+
work: Work::ColumnReconstruction(process_fn),
506+
},
501507
}
502508
}
503509
}
@@ -619,6 +625,7 @@ pub enum Work<E: EthSpec> {
619625
RpcCustodyColumn(AsyncFn),
620626
RpcVerifyDataColumn(AsyncFn),
621627
SamplingResult(AsyncFn),
628+
ColumnReconstruction(AsyncFn),
622629
IgnoredRpcBlock {
623630
process_fn: BlockingFn,
624631
},
@@ -674,6 +681,7 @@ pub enum WorkType {
674681
RpcCustodyColumn,
675682
RpcVerifyDataColumn,
676683
SamplingResult,
684+
ColumnReconstruction,
677685
IgnoredRpcBlock,
678686
ChainSegment,
679687
ChainSegmentBackfill,
@@ -725,6 +733,7 @@ impl<E: EthSpec> Work<E> {
725733
Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
726734
Work::RpcVerifyDataColumn { .. } => WorkType::RpcVerifyDataColumn,
727735
Work::SamplingResult { .. } => WorkType::SamplingResult,
736+
Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
728737
Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
729738
Work::ChainSegment { .. } => WorkType::ChainSegment,
730739
Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
@@ -891,6 +900,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
891900
FifoQueue::new(queue_lengths.rpc_verify_data_column_queue);
892901
// TODO(das): the sampling_request_queue is never read
893902
let mut sampling_result_queue = FifoQueue::new(queue_lengths.sampling_result_queue);
903+
let mut column_reconstruction_queue =
904+
FifoQueue::new(queue_lengths.column_reconstruction_queue);
894905
let mut unknown_block_sampling_request_queue =
895906
FifoQueue::new(queue_lengths.unknown_block_sampling_request_queue);
896907
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
@@ -1371,6 +1382,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
13711382
rpc_verify_data_column_queue.push(work, work_id)
13721383
}
13731384
Work::SamplingResult(_) => sampling_result_queue.push(work, work_id),
1385+
Work::ColumnReconstruction(_) => {
1386+
column_reconstruction_queue.push(work, work_id)
1387+
}
13741388
Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
13751389
Work::ChainSegmentBackfill { .. } => {
13761390
backfill_chain_segment.push(work, work_id)
@@ -1460,6 +1474,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
14601474
WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
14611475
WorkType::RpcVerifyDataColumn => rpc_verify_data_column_queue.len(),
14621476
WorkType::SamplingResult => sampling_result_queue.len(),
1477+
WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
14631478
WorkType::ChainSegment => chain_segment_queue.len(),
14641479
WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
14651480
WorkType::Status => status_queue.len(),
@@ -1602,7 +1617,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
16021617
| Work::RpcBlobs { process_fn }
16031618
| Work::RpcCustodyColumn(process_fn)
16041619
| Work::RpcVerifyDataColumn(process_fn)
1605-
| Work::SamplingResult(process_fn) => task_spawner.spawn_async(process_fn),
1620+
| Work::SamplingResult(process_fn)
1621+
| Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
16061622
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
16071623
Work::GossipBlock(work)
16081624
| Work::GossipBlobSidecar(work)

0 commit comments

Comments
 (0)