Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ use crate::{
use bls::{PublicKey, PublicKeyBytes, Signature};
use eth2::beacon_response::ForkVersionedResponse;
use eth2::types::{
EventKind, SseBlobSidecar, SseBlock, SseDataColumnSidecar, SseExtendedPayloadAttributes,
EventKind, PtcDuty, SseBlobSidecar, SseBlock, SseDataColumnSidecar,
SseExtendedPayloadAttributes,
};
use execution_layer::{
BlockProposalContents, BlockProposalContentsType, BuilderParams, ChainHealth, ExecutionLayer,
Expand Down Expand Up @@ -1673,6 +1674,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok((duties, dependent_root, execution_status))
}

/// Get PTC duties for validators at a given epoch.
/// MVP implementation that:
/// 1. Takes atomic snapshot of canonical head
/// 2. Uses committee caches already built for the epoch
///
/// Supports current and next epoch only (API enforces this constraint).
/// Re-org resistant: All computations use consistent cloned state.
///
/// Note: This currently uses the vanilla `get_ptc` which recomputes PTC for each slot.
/// TODO(EIP-7732): When ptc cache PR is merged, this should be updated to use the optimized cache.
/// https://github.com/shane-moore/lighthouse/pull/10
/// TODO(EIP-7732): we should consider building out something similar to the `shuffling_cache` so that way we can avoid recreating ptc caches on each reorg.
pub fn validator_ptc_duties(
&self,
validator_indices: &[u64],
epoch: Epoch,
) -> Result<(Vec<Option<PtcDuty>>, Hash256, ExecutionStatus), Error> {
// Get head state snapshot atomically
let (mut state, head_block_root) = {
let head = self.canonical_head.cached_head();
let head_state = head.snapshot.beacon_state.clone();
let head_block_root = head.head_block_root();
(head_state, head_block_root)
};

let execution_status = self
.canonical_head
.fork_choice_read_lock()
.get_block_execution_status(&head_block_root)
.ok_or(Error::PtcHeadNotInForkChoice(head_block_root))?;

// build_committee_cache is idempotent (no-op if already built), so we call it as a safety check
let relative_epoch = RelativeEpoch::from_epoch(state.current_epoch(), epoch)
.map_err(Error::IncorrectStateForAttestation)?;
state.build_committee_cache(relative_epoch, &self.spec)?;

// Compute the dependent root (the block that decided the shuffling for this epoch).
let dependent_root =
state.attester_shuffling_decision_root(head_block_root, relative_epoch)?;

// Get pubkeys for all requested validators (invalid indices will be missing from the map)
let usize_indices = validator_indices
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>();
let index_to_pubkey_map = self.validator_pubkey_bytes_many(&usize_indices)?;

// Map validator indices to duties by checking each slot in the epoch for PTC membership.
let duties: Vec<Option<PtcDuty>> = validator_indices
.iter()
.map(|&validator_index| -> Result<Option<PtcDuty>, Error> {
// Get pubkey; if validator doesn't exist, return None
let pubkey = match index_to_pubkey_map.get(&(validator_index as usize)) {
Some(pk) => *pk,
None => return Ok(None),
};

let slot_opt =
state.get_ptc_assignment(validator_index as usize, epoch, &self.spec)?;

Ok(slot_opt.map(|slot| PtcDuty {
validator_index,
slot,
pubkey,
}))
})
.collect::<Result<Vec<_>, _>>()?;

Ok((duties, dependent_root, execution_status))
}

pub fn get_aggregated_attestation(
&self,
attestation: AttestationRef<T::EthSpec>,
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ pub enum BeaconChainError {
execution_status: ExecutionStatus,
},
AttestationHeadNotInForkChoice(Hash256),
PtcHeadNotInForkChoice(Hash256),
MissingPersistedForkChoice,
CommitteePromiseFailed(oneshot_broadcast::Error),
MaxCommitteePromises(usize),
Expand Down
10 changes: 10 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod metrics;
mod peer;
mod produce_block;
mod proposer_duties;
mod ptc_duties;
mod publish_attestations;
mod publish_blocks;
mod standard_block_rewards;
Expand Down Expand Up @@ -2472,6 +2473,14 @@ pub fn serve<T: BeaconChainTypes>(
task_spawner_filter.clone(),
);

// POST validator/duties/ptc/{epoch}
let post_validator_duties_ptc = post_validator_duties_ptc(
eth_v1.clone().clone(),
chain_filter.clone(),
not_while_syncing_filter.clone(),
task_spawner_filter.clone(),
);

// POST validator/duties/sync/{epoch}
let post_validator_duties_sync = post_validator_duties_sync(
eth_v1.clone().clone(),
Expand Down Expand Up @@ -3362,6 +3371,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_beacon_rewards_attestations)
.uor(post_beacon_rewards_sync_committee)
.uor(post_validator_duties_attester)
.uor(post_validator_duties_ptc)
.uor(post_validator_duties_sync)
.uor(post_validator_aggregate_and_proofs)
.uor(post_validator_contribution_and_proofs)
Expand Down
241 changes: 241 additions & 0 deletions beacon_node/http_api/src/ptc_duties.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//! Contains the handler for the `POST validator/duties/ptc/{epoch}` endpoint.

use crate::state_id::StateId;
use beacon_chain::{BeaconChain, BeaconChainError, BeaconChainTypes};
use eth2::types::{self as api_types, PtcDuty};
use slot_clock::SlotClock;
use state_processing::state_advance::partial_state_advance;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, RelativeEpoch};

/// The struct that is returned to the requesting HTTP client.
type ApiDuties = api_types::DutiesResponse<Vec<PtcDuty>>;

/// Handles a request from the HTTP API for PTC duties.
pub fn ptc_duties<T: BeaconChainTypes>(
request_epoch: Epoch,
request_indices: &[u64],
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
let current_epoch = chain
.slot_clock
.now_or_genesis()
.map(|slot| slot.epoch(T::EthSpec::slots_per_epoch()))
.ok_or(BeaconChainError::UnableToReadSlot)
.map_err(warp_utils::reject::unhandled_error)?;

// Determine what the current epoch would be if we fast-forward our system clock by
// `MAXIMUM_GOSSIP_CLOCK_DISPARITY`.
//
// Most of the time, `tolerant_current_epoch` will be equal to `current_epoch`. However, during
// the first `MAXIMUM_GOSSIP_CLOCK_DISPARITY` duration of the epoch `tolerant_current_epoch`
// will equal `current_epoch + 1`
let tolerant_current_epoch = if chain.slot_clock.is_prior_to_genesis().unwrap_or(true) {
current_epoch
} else {
chain
.slot_clock
.now_with_future_tolerance(chain.spec.maximum_gossip_clock_disparity())
.ok_or_else(|| {
warp_utils::reject::custom_server_error("unable to read slot clock".into())
})?
.epoch(T::EthSpec::slots_per_epoch())
};

// Check if the request is within acceptable clock tolerance
let is_within_clock_tolerance = request_epoch == current_epoch
|| request_epoch == current_epoch + 1
|| request_epoch == tolerant_current_epoch + 1;

if is_within_clock_tolerance {
// Get the head state epoch
let head_epoch = chain
.canonical_head
.cached_head()
.snapshot
.beacon_state
.current_epoch();

// Check if the head state can compute duties for this epoch (current or next epoch only)
let head_can_serve_request = request_epoch == head_epoch || request_epoch == head_epoch + 1;

if head_can_serve_request {
compute_ptc_duties_from_cached_head(request_epoch, request_indices, chain)
} else {
// Within tolerance but head is lagging
compute_ptc_duties_from_state(request_epoch, request_indices, chain)
}
} else if request_epoch > current_epoch + 1 {
Err(warp_utils::reject::custom_bad_request(format!(
"request epoch {} is more than one epoch past the current epoch {}",
request_epoch, current_epoch
)))
} else {
// request_epoch < current_epoch (historic request)
compute_ptc_duties_from_state(request_epoch, request_indices, chain)
}
}

fn compute_ptc_duties_from_cached_head<T: BeaconChainTypes>(
request_epoch: Epoch,
request_indices: &[u64],
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
let (duties, dependent_root, execution_status) = chain
.validator_ptc_duties(request_indices, request_epoch)
.map_err(warp_utils::reject::unhandled_error)?;

convert_to_api_response(
duties,
dependent_root,
execution_status.is_optimistic_or_invalid(),
)
}

/// Compute PTC duties by reading a `BeaconState` from disk, building the committee cache
fn compute_ptc_duties_from_state<T: BeaconChainTypes>(
request_epoch: Epoch,
request_indices: &[u64],
chain: &BeaconChain<T>,
) -> Result<ApiDuties, warp::reject::Rejection> {
// If the head is quite old then it might still be relevant for a historical request.
//
// Avoid holding the `cached_head` longer than necessary.
let state_opt = {
let (cached_head, execution_status) = chain
.canonical_head
.head_and_execution_status()
.map_err(warp_utils::reject::unhandled_error)?;
let head = &cached_head.snapshot;

if head.beacon_state.current_epoch() <= request_epoch {
Some((
head.beacon_state_root(),
head.beacon_state.clone(),
execution_status.is_optimistic_or_invalid(),
))
} else {
None
}
};

let (mut state, execution_optimistic) =
if let Some((state_root, mut state, execution_optimistic)) = state_opt {
// If we've loaded the head state it might be from a previous epoch, ensure it's in a
// suitable epoch.
ensure_state_knows_ptc_duties_for_epoch(
&mut state,
state_root,
request_epoch,
&chain.spec,
)?;
(state, execution_optimistic)
} else {
let (state, execution_optimistic, _finalized) =
StateId::from_slot(request_epoch.start_slot(T::EthSpec::slots_per_epoch()))
.state(chain)?;
(state, execution_optimistic)
};

// Sanity-check the state lookup.
if !(state.current_epoch() == request_epoch || state.current_epoch() + 1 == request_epoch) {
return Err(warp_utils::reject::custom_server_error(format!(
"state epoch {} not suitable for request epoch {}",
state.current_epoch(),
request_epoch
)));
}

let relative_epoch =
RelativeEpoch::from_epoch(state.current_epoch(), request_epoch).map_err(|e| {
warp_utils::reject::custom_server_error(format!("invalid epoch for state: {:?}", e))
})?;

state
.build_committee_cache(relative_epoch, &chain.spec)
.map_err(BeaconChainError::from)
.map_err(warp_utils::reject::unhandled_error)?;

let dependent_root = state
.attester_shuffling_decision_root(chain.genesis_block_root, relative_epoch)
.map_err(BeaconChainError::from)
.map_err(warp_utils::reject::unhandled_error)?;

// Get pubkeys for all requested validators (invalid indices will be missing from the map)
let usize_indices = request_indices
.iter()
.map(|i| *i as usize)
.collect::<Vec<_>>();
let index_to_pubkey_map = chain
.validator_pubkey_bytes_many(&usize_indices)
.map_err(warp_utils::reject::unhandled_error)?;

// Map validator indices to duties by checking each slot in the epoch for PTC membership.
let duties: Vec<Option<PtcDuty>> = request_indices
.iter()
.map(
|&validator_index| -> Result<Option<PtcDuty>, warp::reject::Rejection> {
// Get pubkey; if validator doesn't exist, return None
let pubkey = match index_to_pubkey_map.get(&(validator_index as usize)) {
Some(pk) => *pk,
None => return Ok(None),
};

let slot_opt = state
.get_ptc_assignment(validator_index as usize, request_epoch, &chain.spec)
.map_err(warp_utils::reject::unhandled_error)?;

Ok(slot_opt.map(|slot| PtcDuty {
validator_index,
slot,
pubkey,
}))
},
)
.collect::<Result<Vec<_>, _>>()?;

convert_to_api_response(duties, dependent_root, execution_optimistic)
}

fn ensure_state_knows_ptc_duties_for_epoch<E: EthSpec>(
state: &mut BeaconState<E>,
state_root: Hash256,
target_epoch: Epoch,
spec: &ChainSpec,
) -> Result<(), warp::reject::Rejection> {
// Protect against an inconsistent slot clock.
if state.current_epoch() > target_epoch {
return Err(warp_utils::reject::custom_server_error(format!(
"state epoch {} is later than target epoch {}",
state.current_epoch(),
target_epoch
)));
} else if state.current_epoch() + 1 < target_epoch {
// Since there's a one-epoch look-ahead on PTC duties, it suffices to only advance to
// the prior epoch.
let target_slot = target_epoch
.saturating_sub(1_u64)
.start_slot(E::slots_per_epoch());

// A "partial" state advance is adequate since PTC duties don't rely on state roots.
partial_state_advance(state, Some(state_root), target_slot, spec)
.map_err(BeaconChainError::from)
.map_err(warp_utils::reject::unhandled_error)?;
}

Ok(())
}

/// Convert internal PTC duties to API response format
fn convert_to_api_response(
duties: Vec<Option<PtcDuty>>,
dependent_root: Hash256,
execution_optimistic: bool,
) -> Result<ApiDuties, warp::reject::Rejection> {
let data = duties.into_iter().flatten().collect::<Vec<_>>();

Ok(api_types::DutiesResponse {
dependent_root,
execution_optimistic: Some(execution_optimistic),
data,
})
}
Loading