Skip to content
Merged
44 changes: 28 additions & 16 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::crit;
use logging::TimeLatch;
use parking_lot::Mutex;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
use std::cmp;
Expand All @@ -73,7 +74,7 @@ use work_reprocessing_queue::{
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};

mod metrics;
pub mod work_reprocessing_queue;
pub mod scheduler;

/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
Expand Down Expand Up @@ -264,22 +265,16 @@ impl Default for BeaconProcessorConfig {
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_work_event_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);

Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}
Expand Down Expand Up @@ -638,6 +633,7 @@ pub enum Work<E: EthSpec> {
LightClientUpdatesByRangeRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
Reprocess(ReprocessQueueMessage),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -692,6 +688,7 @@ pub enum WorkType {
LightClientUpdatesByRangeRequest,
ApiRequestP0,
ApiRequestP1,
Reprocess,
}

impl<E: EthSpec> Work<E> {
Expand Down Expand Up @@ -750,6 +747,7 @@ impl<E: EthSpec> Work<E> {
}
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
Work::Reprocess { .. } => WorkType::Reprocess,
}
}
}
Expand All @@ -774,7 +772,7 @@ struct InboundEvents<E: EthSpec> {
/// Used by upstream processes to send new work to the `BeaconProcessor`.
event_rx: mpsc::Receiver<WorkEvent<E>>,
/// Used internally for queuing work ready to be re-processed.
reprocess_work_rx: mpsc::Receiver<ReadyWork>,
ready_work_rx: mpsc::Receiver<ReadyWork>,
}

impl<E: EthSpec> Stream for InboundEvents<E> {
Expand All @@ -795,7 +793,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {

// Poll for delayed blocks before polling for new work. It might be the case that a delayed
// block is required to successfully process some new work.
match self.reprocess_work_rx.poll_recv(cx) {
match self.ready_work_rx.poll_recv(cx) {
Poll::Ready(Some(ready_work)) => {
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
}
Expand Down Expand Up @@ -846,8 +844,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
pub fn spawn_manager<S: SlotClock + 'static>(
mut self,
event_rx: mpsc::Receiver<WorkEvent<E>>,
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
work_journal_tx: Option<mpsc::Sender<&'static str>>,
slot_clock: S,
maximum_gossip_clock_disparity: Duration,
Expand Down Expand Up @@ -935,9 +931,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);

let (reprocess_work_tx, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);

spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
reprocess_work_rx,
&self.executor,
Arc::new(slot_clock),
maximum_gossip_clock_disparity,
Expand All @@ -951,7 +951,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut inbound_events = InboundEvents {
idle_rx,
event_rx,
reprocess_work_rx: ready_work_rx,
ready_work_rx,
};

let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
Expand All @@ -965,7 +965,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
match QueuedBackfillBatch::try_from(event) {
Ok(backfill_batch) => {
match work_reprocessing_tx
match reprocess_work_tx
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
{
Err(e) => {
Expand Down Expand Up @@ -1027,8 +1027,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
.unwrap_or(WORKER_FREED);

// We don't care if this message was successfully sent, we only use the journal
// during testing.
let _ = work_journal_tx.try_send(id);
// during testing. We also ignore reprocess messages to ensure our test cases can pass.
if id != "reprocess" {
let _ = work_journal_tx.try_send(id);
}
}

let can_spawn = self.current_workers < self.config.max_workers;
Expand Down Expand Up @@ -1318,6 +1320,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
let work_type = work.to_type();

match work {
Work::Reprocess(work_event) => {
if let Err(e) = reprocess_work_tx.try_send(work_event) {
error!(
error = ?e,
"Failed to reprocess work event"
)
}
}
_ if can_spawn => self.spawn_worker(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
// Attestation batches are formed internally within the
Expand Down Expand Up @@ -1488,6 +1498,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
Expand Down Expand Up @@ -1639,6 +1650,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::LightClientUpdatesByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::Reprocess(_) => {}
};
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_processor/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod work_reprocessing_queue;
9 changes: 1 addition & 8 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ where
network_senders: None,
network_globals: None,
beacon_processor_send: None,
beacon_processor_reprocess_send: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});
Expand Down Expand Up @@ -638,7 +637,6 @@ where
context.executor,
libp2p_registry.as_mut(),
beacon_processor_channels.beacon_processor_tx.clone(),
beacon_processor_channels.work_reprocessing_tx.clone(),
)
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;
Expand Down Expand Up @@ -777,9 +775,6 @@ where
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
beacon_processor_reprocess_send: Some(
beacon_processor_channels.work_reprocessing_tx.clone(),
),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});

Expand Down Expand Up @@ -843,8 +838,6 @@ where
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx.clone(),
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
beacon_chain.spec.maximum_gossip_clock_disparity(),
Expand Down Expand Up @@ -918,7 +911,7 @@ where
compute_light_client_updates(
&inner_chain,
light_client_server_rv,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.beacon_processor_tx,
)
.await
},
Expand Down
12 changes: 9 additions & 3 deletions beacon_node/client/src/compute_light_client_updates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use beacon_processor::{BeaconProcessorSend, Work, WorkEvent};
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use tokio::sync::mpsc::Sender;
use tracing::error;

// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
Expand All @@ -14,7 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
reprocess_tx: Sender<ReprocessQueueMessage>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
) {
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
//
Expand All @@ -31,7 +31,13 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
});

let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
if reprocess_tx.try_send(msg).is_err() {
if beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: true,
work: Work::Reprocess(msg),
})
.is_err()
{
error!(%parent_root,"Failed to inform light client update")
};
}
Expand Down
14 changes: 3 additions & 11 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use beacon_chain::{
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, WhenSlotSkipped,
};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
use beacon_processor::BeaconProcessorSend;
pub use block_id::BlockId;
use builder_states::get_next_withdrawals;
use bytes::Bytes;
Expand Down Expand Up @@ -130,7 +130,6 @@ pub struct Context<T: BeaconChainTypes> {
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub beacon_processor_reprocess_send: Option<Sender<ReprocessQueueMessage>>,
pub eth1_service: Option<eth1::Service>,
pub sse_logging_components: Option<SSELoggingComponents>,
}
Expand Down Expand Up @@ -554,11 +553,6 @@ pub fn serve<T: BeaconChainTypes>(
.filter(|_| config.enable_beacon_processor);
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
let beacon_processor_reprocess_send = ctx
.beacon_processor_reprocess_send
.clone()
.filter(|_| config.enable_beacon_processor);
let reprocess_send_filter = warp::any().map(move || beacon_processor_reprocess_send.clone());

let duplicate_block_status_code = ctx.config.duplicate_block_status_code;

Expand Down Expand Up @@ -1986,20 +1980,18 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp_utils::json::json::<Vec<SingleAttestation>>())
.and(optional_consensus_version_header_filter)
.and(network_tx_filter.clone())
.and(reprocess_send_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
_fork_name: Option<ForkName>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>| async move {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
let result = crate::publish_attestations::publish_attestations(
task_spawner,
chain,
attestations,
network_tx,
reprocess_tx,
true,
)
.await
.map(|()| warp::reply::json(&()));
Expand Down
19 changes: 12 additions & 7 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ use beacon_chain::{
BeaconChainTypes,
};
use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage};
use beacon_processor::{Work, WorkEvent};
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{debug, error, warn};
use types::SingleAttestation;

Expand Down Expand Up @@ -130,7 +128,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
allow_reprocess: bool,
) -> Result<(), warp::Rejection> {
// Collect metadata about attestations which we'll use to report failures. We need to
// move the `attestations` vec into the blocking task, so this small overhead is unavoidable.
Expand All @@ -142,6 +140,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
// Gossip validate and publish attestations that can be immediately processed.
let seen_timestamp = timestamp_now();
let mut prelim_results = task_spawner
.clone()
.blocking_task(Priority::P0, move || {
Ok(attestations
.into_iter()
Expand All @@ -156,7 +155,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
Err(Error::Validation(AttestationError::UnknownHeadBlock {
beacon_block_root,
})) => {
let Some(reprocess_tx) = &reprocess_send else {
if !allow_reprocess {
return PublishAttestationResult::Failure(Error::ReprocessDisabled);
};
// Re-process.
Expand All @@ -180,7 +179,13 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
beacon_block_root,
process_fn: Box::new(reprocess_fn),
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
if task_spawner
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
PublishAttestationResult::Failure(Error::ReprocessFull)
} else {
PublishAttestationResult::Reprocessing(rx)
Expand Down
27 changes: 27 additions & 0 deletions beacon_node/http_api/src/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl Priority {
}

/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
#[derive(Clone)]
pub struct TaskSpawner<E: EthSpec> {
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
/// used if this is `None`.
Expand Down Expand Up @@ -155,6 +156,32 @@ impl<E: EthSpec> TaskSpawner<E> {
.and_then(|x| x)
}
}

pub fn try_send(&self, work_event: WorkEvent<E>) -> Result<(), warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
let error_message = match beacon_processor_send.try_send(work_event) {
Ok(()) => None,
Err(TrySendError::Full(_)) => {
Some("The task was dropped. The server is overloaded.")
}
Err(TrySendError::Closed(_)) => {
Some("The task was dropped. The server is shutting down.")
}
};

if let Some(error_message) = error_message {
return Err(warp_utils::reject::custom_server_error(
error_message.to_string(),
));
};

Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"The beacon processor is unavailable".to_string(),
))
}
}
}

/// Send a task to the beacon processor and await execution.
Expand Down
Loading