Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 58 additions & 48 deletions validator_client/validator_services/src/duties_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,54 +1660,8 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
// Only download duties and push out additional block production events if we have some
// validators.
if !local_pubkeys.is_empty() {
let download_result = duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node
.get_validator_duties_proposer(current_epoch)
.await
})
.await;

match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;

let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();

debug!(
%dependent_root,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);

if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(current_epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
// Don't return early here, we still want to try and produce blocks using the cached values.
Err(e) => error!(
err = %e,
"Failed to download proposer duties"
),
for epoch in [current_epoch, current_epoch + 1] {
fetch_and_store_proposer_duties(duties_service, epoch, &local_pubkeys).await;
}

// Compute the block proposers for this slot again, now that we've received an update from
Expand Down Expand Up @@ -1750,6 +1704,62 @@ async fn poll_beacon_proposers<S: ValidatorStore, T: SlotClock + 'static>(
Ok(())
}

async fn fetch_and_store_proposer_duties<S: ValidatorStore, T: SlotClock + 'static>(
duties_service: &DutiesService<S, T>,
epoch: Epoch,
local_pubkeys: &HashSet<PublicKeyBytes>,
) {
let download_result = duties_service
.beacon_nodes
.first_success(|beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::DUTIES_SERVICE_TIMES,
&[validator_metrics::PROPOSER_DUTIES_HTTP_GET],
);
beacon_node.get_validator_duties_proposer(epoch).await
})
.await;

match download_result {
Ok(response) => {
let dependent_root = response.dependent_root;

let relevant_duties = response
.data
.into_iter()
.filter(|proposer_duty| local_pubkeys.contains(&proposer_duty.pubkey))
.collect::<Vec<_>>();

debug!(
%dependent_root,
%epoch,
num_relevant_duties = relevant_duties.len(),
"Downloaded proposer duties"
);

if let Some((prior_dependent_root, _)) = duties_service
.proposers
.write()
.insert(epoch, (dependent_root, relevant_duties))
&& dependent_root != prior_dependent_root
{
warn!(
%prior_dependent_root,
%dependent_root,
%epoch,
msg = "this may happen from time to time",
"Proposer duties re-org"
)
}
}
Err(e) => error!(
err = %e,
%epoch,
"Failed to download proposer duties"
),
}
}

/// Query the beacon node for ptc duties for any known validators.
async fn poll_beacon_ptc_attesters<S: ValidatorStore + 'static, T: SlotClock + 'static>(
duties_service: &Arc<DutiesService<S, T>>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::duties_service::DutiesService;
use beacon_node_fallback::BeaconNodeFallback;
use eth2::types::ProposerData;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use task_executor::TaskExecutor;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
use types::{ChainSpec, Epoch, EthSpec, ForkName, ProposerPreferences};
use types::{ChainSpec, Epoch, EthSpec, ForkName, Hash256, ProposerPreferences};
use validator_store::ValidatorStore;

pub struct Inner<S, T> {
Expand Down Expand Up @@ -66,52 +68,83 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
let executor = self.executor.clone();

let interval_fut = async move {
let mut published_preferences: HashMap<Epoch, Hash256> = HashMap::new();

loop {
let Some(current_slot) = self.slot_clock.now() else {
error!("Failed to read slot clock");
sleep(slot_duration).await;
continue;
};

if !self
.chain_spec
.fork_name_at_slot::<S::E>(current_slot)
.gloas_enabled()
{
let duration_to_next_epoch = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32);
sleep(duration_to_next_epoch).await;
continue;
}

let current_epoch = current_slot.epoch(S::E::slots_per_epoch());
let fork_name = self.chain_spec.fork_name_at_slot::<S::E>(current_slot);
self.publish_proposer_preferences(current_epoch, fork_name)

self.poll_and_publish_preferences(current_epoch, &mut published_preferences)
.await;

let duration_to_next_epoch = self
let duration_to_next_slot = self
.slot_clock
.duration_to_next_epoch(S::E::slots_per_epoch())
.unwrap_or_else(|| slot_duration * S::E::slots_per_epoch() as u32);
sleep(duration_to_next_epoch).await;
.duration_to_next_slot()
.unwrap_or(slot_duration);
sleep(duration_to_next_slot).await;
}
};

executor.spawn(interval_fut, "proposer_preferences_service");
Ok(())
}

async fn publish_proposer_preferences(&self, current_epoch: Epoch, fork_name: ForkName) {
let (dependent_root, duties) = {
let proposers = self.duties_service.proposers.read();
match proposers.get(&current_epoch) {
Some((root, duties)) => (*root, duties.clone()),
None => return,
/// Publish proposer preferences for `current_epoch` and `current_epoch + 1`.
/// Will only publish preferences for a given epoch once per dependent root.
async fn poll_and_publish_preferences(
&self,
current_epoch: Epoch,
published_preferences: &mut HashMap<Epoch, Hash256>,
) {
for (epoch, fork_name) in [
(
current_epoch,
self.chain_spec.fork_name_at_epoch(current_epoch),
),
(
current_epoch + 1,
self.chain_spec.fork_name_at_epoch(current_epoch + 1),
),
] {
if !fork_name.gloas_enabled() {
continue;
}
};

let (dependent_root, duties) = {
let proposers = self.duties_service.proposers.read();
match proposers.get(&epoch) {
Some((root, duties)) => (*root, duties.clone()),
None => continue,
}
};

if published_preferences.get(&epoch) == Some(&dependent_root) {
continue;
}

if self
.publish_proposer_preferences(epoch, fork_name, dependent_root, duties)
.await
{
published_preferences.insert(epoch, dependent_root);
}
}

published_preferences.retain(|epoch, _| *epoch >= current_epoch);
}

async fn publish_proposer_preferences(
&self,
epoch: Epoch,
fork_name: ForkName,
dependent_root: Hash256,
duties: Vec<ProposerData>,
) -> bool {
let preferences_to_sign: Vec<_> = {
let mut result = vec![];
for duty in &duties {
Expand Down Expand Up @@ -144,11 +177,11 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
};

if preferences_to_sign.is_empty() {
return;
return false;
}

debug!(
%current_epoch,
%epoch,
count = preferences_to_sign.len(),
"Signing proposer preferences"
);
Expand All @@ -172,7 +205,7 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
}

if signed.is_empty() {
return;
return false;
}

let count = signed.len();
Expand Down Expand Up @@ -204,17 +237,19 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> ProposerPreferencesSer
match result {
Ok(()) => {
info!(
%current_epoch,
%epoch,
%count,
"Successfully published proposer preferences"
);
true
}
Err(e) => {
error!(
error = %e,
%current_epoch,
%epoch,
"Failed to publish proposer preferences"
);
false
}
}
}
Expand Down
Loading