@@ -19,6 +19,7 @@ use itertools::Itertools;
1919use logging:: crit;
2020use logging:: TimeLatch ;
2121use slot_clock:: SlotClock ;
22+ use std:: collections:: hash_map:: Entry ;
2223use std:: collections:: { HashMap , HashSet } ;
2324use std:: future:: Future ;
2425use std:: pin:: Pin ;
@@ -182,7 +183,10 @@ pub struct IgnoredRpcBlock {
182183/// A backfill batch work that has been queued for processing later.
183184pub struct QueuedBackfillBatch ( pub AsyncFn ) ;
184185
185- pub struct QueuedColumnReconstruction ( pub AsyncFn ) ;
186+ pub struct QueuedColumnReconstruction {
187+ pub block_root : Hash256 ,
188+ pub process_fn : AsyncFn ,
189+ }
186190
187191impl < E : EthSpec > TryFrom < WorkEvent < E > > for QueuedBackfillBatch {
188192 type Error = WorkEvent < E > ;
@@ -264,6 +268,8 @@ struct ReprocessQueue<S> {
264268 queued_sampling_requests : FnvHashMap < usize , ( QueuedSamplingRequest , DelayKey ) > ,
265269 /// Sampling requests per block root.
266270 awaiting_sampling_requests_per_block_root : HashMap < Hash256 , Vec < QueuedSamplingRequestId > > ,
271+ /// Column reconstruction per block root.
272+ queued_column_reconstructions : HashMap < Hash256 , DelayKey > ,
267273 /// Queued backfill batches
268274 queued_backfill_batches : Vec < QueuedBackfillBatch > ,
269275
@@ -441,6 +447,7 @@ impl<S: SlotClock> ReprocessQueue<S> {
441447 awaiting_lc_updates_per_parent_root : HashMap :: new ( ) ,
442448 awaiting_sampling_requests_per_block_root : <_ >:: default ( ) ,
443449 queued_backfill_batches : Vec :: new ( ) ,
450+ queued_column_reconstructions : HashMap :: new ( ) ,
444451 next_attestation : 0 ,
445452 next_lc_update : 0 ,
446453 next_sampling_request_update : 0 ,
@@ -840,8 +847,19 @@ impl<S: SlotClock> ReprocessQueue<S> {
840847 }
841848 }
842849 InboundEvent :: Msg ( DelayColumnReconstruction ( request) ) => {
843- self . column_reconstructions_delay_queue
844- . insert ( request, QUEUED_RECONSTRUCTION_DELAY ) ;
850+ match self . queued_column_reconstructions . entry ( request. block_root ) {
851+ Entry :: Occupied ( key) => {
852+ // Push back the reattempted reconstruction
853+ self . column_reconstructions_delay_queue
854+ . reset ( key. get ( ) , QUEUED_RECONSTRUCTION_DELAY )
855+ }
856+ Entry :: Vacant ( vacant) => {
857+ let delay_key = self
858+ . column_reconstructions_delay_queue
859+ . insert ( request, QUEUED_RECONSTRUCTION_DELAY ) ;
860+ vacant. insert ( delay_key) ;
861+ }
862+ }
845863 }
846864 // A block that was queued for later processing is now ready to be processed.
847865 InboundEvent :: ReadyGossipBlock ( ready_block) => {
@@ -967,6 +985,8 @@ impl<S: SlotClock> ReprocessQueue<S> {
967985 }
968986 }
969987 InboundEvent :: ReadyColumnReconstruction ( column_reconstruction) => {
988+ self . queued_column_reconstructions
989+ . remove ( & column_reconstruction. block_root ) ;
970990 if self
971991 . ready_work_tx
972992 . try_send ( ReadyWork :: ColumnReconstruction ( column_reconstruction) )
0 commit comments