Skip to content

Commit

Permalink
Reduce size of futures in HTTP API to prevent stack overflows (sigp#5104
Browse files Browse the repository at this point in the history
)

* Box::pin a few big futures

* Arc the blocks early in publication

* Fix more tests
  • Loading branch information
michaelsproul authored and danielrachi1 committed Feb 14, 2024
1 parent 8c066be commit 1e55af5
Show file tree
Hide file tree
Showing 17 changed files with 116 additions and 123 deletions.
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ impl<T: BeaconChainTypes> IntoGossipVerifiedBlockContents<T> for PublishBlockReq
Ok::<_, BlockContentsError<T::EthSpec>>(gossip_verified_blobs)
})
.transpose()?;
let gossip_verified_block = GossipVerifiedBlock::new(Arc::new(block), chain)?;
let gossip_verified_block = GossipVerifiedBlock::new(block, chain)?;

Ok((gossip_verified_block, gossip_verified_blobs))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ mod test {
};

let availability_pending_block = AvailabilityPendingExecutedBlock {
block: Arc::new(block),
block,
import_data,
payload_verification_outcome,
};
Expand Down
26 changes: 13 additions & 13 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ where
slot: Slot,
) -> (SignedBlindedBeaconBlock<E>, BeaconState<E>) {
let (unblinded, new_state) = self.make_block(state, slot).await;
(unblinded.0.into(), new_state)
((*unblinded.0).clone().into(), new_state)
}

/// Returns a newly created block, signed by the proposer for the given slot.
Expand Down Expand Up @@ -916,14 +916,14 @@ where
panic!("Should always be a full payload response");
};

let signed_block = block_response.block.sign(
let signed_block = Arc::new(block_response.block.sign(
&self.validator_keypairs[proposer_index].sk,
&block_response.state.fork(),
block_response.state.genesis_validators_root(),
&self.spec,
);
));

let block_contents: SignedBlockContentsTuple<E> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E> = match *signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
Expand Down Expand Up @@ -978,14 +978,14 @@ where
panic!("Should always be a full payload response");
};

let signed_block = block_response.block.sign(
let signed_block = Arc::new(block_response.block.sign(
&self.validator_keypairs[proposer_index].sk,
&block_response.state.fork(),
block_response.state.genesis_validators_root(),
&self.spec,
);
));

let block_contents: SignedBlockContentsTuple<E> = match &signed_block {
let block_contents: SignedBlockContentsTuple<E> = match *signed_block {
SignedBeaconBlock::Base(_)
| SignedBeaconBlock::Altair(_)
| SignedBeaconBlock::Merge(_)
Expand Down Expand Up @@ -1792,7 +1792,7 @@ where

let ((block, blobs), state) = self.make_block_return_pre_state(state, slot).await;

let (mut block, _) = block.deconstruct();
let (mut block, _) = (*block).clone().deconstruct();

block_modifier(&mut block);

Expand All @@ -1804,7 +1804,7 @@ where
state.genesis_validators_root(),
&self.spec,
);
((signed_block, blobs), state)
((Arc::new(signed_block), blobs), state)
}

pub async fn make_blob_with_modifier(
Expand All @@ -1818,7 +1818,7 @@ where

let ((block, mut blobs), state) = self.make_block_return_pre_state(state, slot).await;

let (block, _) = block.deconstruct();
let (block, _) = (*block).clone().deconstruct();

blob_modifier(&mut blobs.as_mut().unwrap().1);

Expand All @@ -1830,7 +1830,7 @@ where
state.genesis_validators_root(),
&self.spec,
);
((signed_block, blobs), state)
((Arc::new(signed_block), blobs), state)
}

pub fn make_deposits<'a>(
Expand Down Expand Up @@ -1923,7 +1923,7 @@ where
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), Arc::new(block), sidecars).unwrap(),
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand All @@ -1949,7 +1949,7 @@ where
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), Arc::new(block), sidecars).unwrap(),
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down
20 changes: 3 additions & 17 deletions beacon_node/beacon_chain/tests/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,11 +1142,7 @@ async fn verify_block_for_gossip_slashing_detection() {
let ((block1, blobs1), _) = harness.make_block(state.clone(), Slot::new(1)).await;
let ((block2, _blobs2), _) = harness.make_block(state, Slot::new(1)).await;

let verified_block = harness
.chain
.verify_block_for_gossip(Arc::new(block1))
.await
.unwrap();
let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap();

if let Some((kzg_proofs, blobs)) = blobs1 {
let sidecars =
Expand Down Expand Up @@ -1174,12 +1170,7 @@ async fn verify_block_for_gossip_slashing_detection() {
)
.await
.unwrap();
unwrap_err(
harness
.chain
.verify_block_for_gossip(Arc::new(block2))
.await,
);
unwrap_err(harness.chain.verify_block_for_gossip(block2).await);

// Slasher should have been handed the two conflicting blocks and crafted a slashing.
slasher.process_queued(Epoch::new(0)).unwrap();
Expand All @@ -1198,11 +1189,7 @@ async fn verify_block_for_gossip_doppelganger_detection() {
let state = harness.get_current_state();
let ((block, _), _) = harness.make_block(state.clone(), Slot::new(1)).await;

let verified_block = harness
.chain
.verify_block_for_gossip(Arc::new(block))
.await
.unwrap();
let verified_block = harness.chain.verify_block_for_gossip(block).await.unwrap();
let attestations = verified_block.block.message().body().attestations().clone();
harness
.chain
Expand Down Expand Up @@ -1564,7 +1551,6 @@ async fn import_duplicate_block_unrealized_justification() {
let slot = harness.get_current_slot();
let (block_contents, _) = harness.make_block(state.clone(), slot).await;
let (block, _) = block_contents;
let block = Arc::new(block);
let block_root = block.canonical_root();

// Create two verified variants of the block, representing the same block being processed in
Expand Down
11 changes: 5 additions & 6 deletions beacon_node/beacon_chain/tests/payload_invalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ impl InvalidPayloadRig {
.get_full_block(&block_root)
.unwrap()
.unwrap(),
block,
*block,
"block from db must match block imported"
);
}
Expand Down Expand Up @@ -700,7 +700,7 @@ async fn invalidates_all_descendants() {
.chain
.process_block(
fork_block.canonical_root(),
Arc::new(fork_block),
fork_block,
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down Expand Up @@ -800,7 +800,7 @@ async fn switches_heads() {
.chain
.process_block(
fork_block.canonical_root(),
Arc::new(fork_block),
fork_block,
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down Expand Up @@ -1044,8 +1044,7 @@ async fn invalid_parent() {
// Produce another block atop the parent, but don't import yet.
let slot = parent_block.slot() + 1;
rig.harness.set_current_slot(slot);
let (block_tuple, state) = rig.harness.make_block(parent_state, slot).await;
let block = Arc::new(block_tuple.0);
let ((block, _), state) = rig.harness.make_block(parent_state, slot).await;
let block_root = block.canonical_root();
assert_eq!(block.parent_root(), parent_root);

Expand Down Expand Up @@ -1865,7 +1864,7 @@ impl InvalidHeadSetup {
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.unwrap();
let (fork_block_tuple, _) = rig.harness.make_block(parent_state, slot).await;
opt_fork_block = Some(Arc::new(fork_block_tuple.0));
opt_fork_block = Some(fork_block_tuple.0);
} else {
// Skipped slot.
};
Expand Down
10 changes: 5 additions & 5 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2268,17 +2268,17 @@ async fn garbage_collect_temp_states_from_failed_block() {
let block_slot = Slot::new(2 * slots_per_epoch);
let ((signed_block, _), state) = harness.make_block(genesis_state, block_slot).await;

let (mut block, _) = signed_block.deconstruct();
let (mut block, _) = (*signed_block).clone().deconstruct();

// Mutate the block to make it invalid, and re-sign it.
*block.state_root_mut() = Hash256::repeat_byte(0xff);
let proposer_index = block.proposer_index() as usize;
let block = block.sign(
let block = Arc::new(block.sign(
&harness.validator_keypairs[proposer_index].sk,
&state.fork(),
state.genesis_validators_root(),
&harness.spec,
);
));

// The block should be rejected, but should store a bunch of temporary states.
harness.set_current_slot(block_slot);
Expand Down Expand Up @@ -2677,7 +2677,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
.chain
.process_block(
invalid_fork_block.canonical_root(),
Arc::new(invalid_fork_block.clone()),
invalid_fork_block.clone(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand All @@ -2690,7 +2690,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() {
.chain
.process_block(
valid_fork_block.canonical_root(),
Arc::new(valid_fork_block.clone()),
valid_fork_block.clone(),
NotifyExecutionLayer::Yes,
|| Ok(()),
)
Expand Down
6 changes: 4 additions & 2 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1410,7 +1410,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(network_tx_filter.clone())
.and(log_filter.clone())
.then(
move |block_contents: SignedBlindedBeaconBlock<T::EthSpec>,
move |block_contents: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand Down Expand Up @@ -1450,6 +1450,7 @@ pub fn serve<T: BeaconChainTypes>(
&block_bytes,
&chain.spec,
)
.map(Arc::new)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
Expand Down Expand Up @@ -1478,7 +1479,7 @@ pub fn serve<T: BeaconChainTypes>(
.and(log_filter.clone())
.then(
move |validation_level: api_types::BroadcastValidationQuery,
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
Expand Down Expand Up @@ -1519,6 +1520,7 @@ pub fn serve<T: BeaconChainTypes>(
&block_bytes,
&chain.spec,
)
.map(Arc::new)
.map_err(|e| {
warp_utils::reject::custom_bad_request(format!("invalid SSZ: {e:?}"))
})?;
Expand Down
25 changes: 14 additions & 11 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten

if let Some(gossip_verified_blobs) = gossip_verified_blobs {
for blob in gossip_verified_blobs {
if let Err(e) = chain.process_gossip_blob(blob).await {
if let Err(e) = Box::pin(chain.process_gossip_blob(blob)).await {
let msg = format!("Invalid blob: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
Expand All @@ -210,14 +210,13 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}
}

match chain
.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
publish_fn,
)
.await
match Box::pin(chain.process_block(
block_root,
gossip_verified_block,
NotifyExecutionLayer::Yes,
publish_fn,
))
.await
{
Ok(AvailabilityProcessingStatus::Imported(root)) => {
info!(
Expand Down Expand Up @@ -291,7 +290,7 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
/// Handles a request from the HTTP API for blinded blocks. This converts blinded blocks into full
/// blocks before publishing.
pub async fn publish_blinded_block<T: BeaconChainTypes>(
blinded_block: SignedBlindedBeaconBlock<T::EthSpec>,
blinded_block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
chain: Arc<BeaconChain<T>>,
network_tx: &UnboundedSender<NetworkMessage<T::EthSpec>>,
log: Logger,
Expand Down Expand Up @@ -319,7 +318,7 @@ pub async fn publish_blinded_block<T: BeaconChainTypes>(
pub async fn reconstruct_block<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
block: SignedBlindedBeaconBlock<T::EthSpec>,
block: Arc<SignedBlindedBeaconBlock<T::EthSpec>>,
log: Logger,
) -> Result<ProvenancedBlock<T, PublishBlockRequest<T::EthSpec>>, Rejection> {
let full_payload_opt = if let Ok(payload_header) = block.message().body().execution_payload() {
Expand Down Expand Up @@ -380,6 +379,10 @@ pub async fn reconstruct_block<T: BeaconChainTypes>(
None
};

// Perf: cloning the block here to unblind it is a little sub-optimal. This is considered an
// acceptable tradeoff to avoid passing blocks around on the stack (unarced), which blows up
// the size of futures.
let block = (*block).clone();
match full_payload_opt {
// A block without a payload is pre-merge and we consider it locally
// built.
Expand Down
Loading

0 comments on commit 1e55af5

Please sign in to comment.