Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions aptos-core/consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ pub fn process_validator_transactions_util(
if let Some(validator_txns) = validator_txns {
extra_data = validator_txns
.iter()
.map(|txn| process_single_validator_transaction_util(txn, block))
.filter_map(|txn| process_single_validator_transaction_util(txn, block))
.collect();
}

Expand All @@ -358,7 +358,7 @@ pub fn process_validator_transactions_util(
pub fn process_single_validator_transaction_util(
txn: &ValidatorTransaction,
block: &Block,
) -> ExtraDataType {
) -> Option<ExtraDataType> {
match txn {
ValidatorTransaction::DKGResult(transcript) => {
let transcript = gaptos::api_types::on_chain_config::dkg::DKGTranscript {
Expand All @@ -368,17 +368,26 @@ pub fn process_single_validator_transaction_util(
},
transcript_bytes: transcript.transcript_bytes.clone(),
};
ExtraDataType::DKG(bcs::to_bytes(&transcript).unwrap())
bcs::to_bytes(&transcript)
.map(ExtraDataType::DKG)
.map_err(|err| {
warn!(
"failed to serialize DKG validator transaction for block {}: {}",
block.id(),
err
)
})
.ok()
}
ValidatorTransaction::ObservedJWKUpdate(jwks::QuorumCertifiedUpdate {
update,
multi_sig,
}) => ExtraDataType::JWK(process_jwk_update_util(&update, block)),
}) => process_jwk_update_util(&update, block).map(ExtraDataType::JWK),
}
}

/// Public utility function to process JWK update
pub fn process_jwk_update_util(update: &ProviderJWKs, block: &Block) -> Vec<u8> {
pub fn process_jwk_update_util(update: &ProviderJWKs, block: &Block) -> Option<Vec<u8>> {
use gaptos::api_types::on_chain_config::jwks::{JWKStruct, ProviderJWKs};
// TODO(Gravity): Check the signature here instead of execution layer
info!(
Expand All @@ -396,7 +405,14 @@ pub fn process_jwk_update_util(update: &ProviderJWKs, block: &Block) -> Vec<u8>
.jwks
.iter()
.filter_map(|jwk| {
let aptos_jwk = JWK::try_from(jwk).unwrap();
let aptos_jwk = match JWK::try_from(jwk) {
Ok(jwk) => jwk,
Err(err) => {
warn!("failed to parse JWK update for block {}: {}", block.id(), err);
return None;
}
};

match aptos_jwk {
JWK::RSA(rsa_jwk) => {
error!(
Expand All @@ -417,7 +433,11 @@ pub fn process_jwk_update_util(update: &ProviderJWKs, block: &Block) -> Vec<u8>
.collect(),
};

bcs::to_bytes(&gaptos_provider_jwk).unwrap()
bcs::to_bytes(&gaptos_provider_jwk)
.map_err(|err| {
warn!("failed to serialize provider JWK update for block {}: {}", block.id(), err)
})
.ok()
}

#[async_trait::async_trait]
Expand Down
5 changes: 4 additions & 1 deletion bin/gravity_node/src/consensus/mock_consensus/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ impl MockConsensus {
// Genesis block is at epoch 0
block_number_to_block_id.insert(0u64, (0, genesis_block_id));
// Initialize with epoch 1 to match the mock consensus epoch
get_block_buffer_manager().init(0, block_number_to_block_id, 1).await;
get_block_buffer_manager()
.init(0, block_number_to_block_id, 1)
.await
.expect("failed to initialize BlockBufferManager in mock consensus");

Self {
pool: Arc::new(tokio::sync::Mutex::new(Mempool::new(pool))),
Expand Down
80 changes: 63 additions & 17 deletions bin/gravity_node/src/reth_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use greth::reth_transaction_pool::{EthPooledTransaction, ValidPoolTransaction};
use proposer_reth_map::get_reth_address_by_index;

use alloy_rpc_types_eth::TransactionRequest;
use gaptos::aptos_metrics_core::{register_int_counter_vec, IntCounterVec};
use greth::{
gravity_storage::block_view_storage::BlockViewStorage,
reth::rpc::builder::auth::AuthServerHandle,
Expand All @@ -27,6 +28,7 @@ use greth::{
reth_provider::{providers::BlockchainProvider, BlockNumReader, ChainSpecProvider},
reth_rpc_api::eth::{helpers::EthCall, RpcTypes},
};
use once_cell::sync::Lazy;
use rayon::iter::{IndexedParallelIterator, IntoParallelRefMutIterator, ParallelIterator};
use std::{
sync::{
Expand All @@ -39,6 +41,31 @@ use std::{
use tokio::sync::{broadcast, Mutex};
use tracing::*;

const FILTER_REASON_DECODE_FAILED: &str = "decode_failed";
const FILTER_REASON_RECOVER_SIGNER_FAILED: &str = "recover_signer_failed";
const FILTER_REASON_MISSING_SENDER_OR_BODY: &str = "missing_sender_or_body";
const COINBASE_FALLBACK_NO_PROPOSER_INDEX: &str = "no_proposer_index";
const COINBASE_FALLBACK_PROPOSER_NOT_IN_MAP: &str = "proposer_not_in_map";
const COINBASE_FALLBACK_INVALID_ADDRESS_LENGTH: &str = "invalid_address_length";

static GCEI_FILTERED_TX_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"gcei_filtered_tx_total",
"Number of transactions filtered while converting consensus blocks into GCEI ordered blocks",
&["reason"]
)
.unwrap()
});

static GCEI_COINBASE_FALLBACK_TOTAL: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"gcei_coinbase_fallback_total",
"Number of times GCEI block coinbase fell back to the zero address",
&["reason"]
)
.unwrap()
});

pub(crate) type RethBlockChainProvider =
BlockchainProvider<NodeTypesWithDBAdapter<EthereumNode, Arc<DatabaseEnv>>>;

Expand Down Expand Up @@ -124,13 +151,20 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
self.chain_id
}

fn txn_to_signed(bytes: &[u8], _chain_id: u64) -> Result<(Address, TransactionSigned), String> {
fn txn_to_signed(
bytes: &[u8],
_chain_id: u64,
) -> Result<(Address, TransactionSigned), (&'static str, String)> {
let mut slice = bytes;
let txn = TransactionSigned::decode_2718(&mut slice)
.map_err(|e| format!("Failed to decode transaction: {e}"))?;
let signer = txn
.recover_signer()
.map_err(|e| format!("Failed to recover signer from transaction: {e}"))?;
let txn = TransactionSigned::decode_2718(&mut slice).map_err(|e| {
(FILTER_REASON_DECODE_FAILED, format!("Failed to decode transaction: {e}"))
})?;
let signer = txn.recover_signer().map_err(|e| {
(
FILTER_REASON_RECOVER_SIGNER_FAILED,
format!("Failed to recover signer from transaction: {e}"),
)
})?;
Ok((signer, txn))
}

Expand All @@ -140,6 +174,9 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
let index = match proposer_index {
Some(idx) => idx,
None => {
GCEI_COINBASE_FALLBACK_TOTAL
.with_label_values(&[COINBASE_FALLBACK_NO_PROPOSER_INDEX])
.inc();
// Log when proposer_index is absent — this means the block
// metadata from consensus did not include a proposer.
warn!(
Expand All @@ -157,6 +194,9 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
if reth_addr_bytes.len() == 20 {
Address::from_slice(&reth_addr_bytes)
} else {
GCEI_COINBASE_FALLBACK_TOTAL
.with_label_values(&[COINBASE_FALLBACK_INVALID_ADDRESS_LENGTH])
.inc();
warn!(
"Reth address length {} is not 20 bytes for proposer index {}, using ZERO. \
Metric: coinbase_zero_address_fallback{{reason=invalid_address_length}}",
Expand All @@ -167,6 +207,9 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
}
}
None => {
GCEI_COINBASE_FALLBACK_TOTAL
.with_label_values(&[COINBASE_FALLBACK_PROPOSER_NOT_IN_MAP])
.inc();
warn!(
"Failed to get reth coinbase for proposer index {}, using ZERO. \
Metric: coinbase_zero_address_fallback{{reason=proposer_not_in_map}}",
Expand All @@ -188,6 +231,7 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {

let mut senders = vec![None; block.txns.len()];
let mut transactions = vec![None; block.txns.len()];
let mut filtered_reasons = vec![None; block.txns.len()];

{
for (idx, txn) in block.txns.iter().enumerate() {
Expand All @@ -204,19 +248,18 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
.par_iter_mut()
.enumerate()
.filter(|(idx, _)| senders[*idx].is_none())
.map(|(idx, txn)| match Self::txn_to_signed(txn.bytes().as_slice(), self.chain_id) {
Ok((sender, transaction)) => Some((idx, sender, transaction)),
Err(e) => {
.map(|(idx, txn)| (idx, Self::txn_to_signed(txn.bytes().as_slice(), self.chain_id)))
.collect::<Vec<(usize, Result<(Address, TransactionSigned), (&'static str, String)>)>>()
.into_iter()
.for_each(|(idx, result)| match result {
Ok((sender, transaction)) => {
senders[idx] = Some(sender);
transactions[idx] = Some(transaction);
}
Err((reason, e)) => {
warn!("Skipping malformed transaction at index {}: {}", idx, e);
None
filtered_reasons[idx] = Some(reason);
}
})
.collect::<Vec<Option<(usize, Address, TransactionSigned)>>>()
.into_iter()
.flatten()
.for_each(|(idx, sender, transaction)| {
senders[idx] = Some(sender);
transactions[idx] = Some(transaction);
});

// Filter out transactions that failed decoding/signer recovery
Expand All @@ -231,6 +274,9 @@ impl<EthApi: RethEthCall> RethCli<EthApi> {
valid_transactions.push(t);
}
_ => {
let reason =
filtered_reasons[idx].unwrap_or(FILTER_REASON_MISSING_SENDER_OR_BODY);
GCEI_FILTERED_TX_TOTAL.with_label_values(&[reason]).inc();
warn!("Filtering out transaction at index {} with missing sender or body", idx);
}
}
Expand Down
10 changes: 8 additions & 2 deletions crates/api/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,10 @@ pub fn init_peers_and_metadata(
PeersAndMetadata::new(&network_ids)
}

pub async fn init_block_buffer_manager(consensus_db: &Arc<ConsensusDB>, latest_block_number: u64) {
pub async fn init_block_buffer_manager(
consensus_db: &Arc<ConsensusDB>,
latest_block_number: u64,
) -> anyhow::Result<()> {
info!("init_block_buffer_manager start");
let start_block_number = latest_block_number.saturating_sub(RECENT_BLOCKS_RANGE);

Expand Down Expand Up @@ -305,5 +308,8 @@ pub async fn init_block_buffer_manager(consensus_db: &Arc<ConsensusDB>, latest_b
block_number_to_block_id
.insert(0u64, (0, BlockId::from_bytes(GENESIS_BLOCK_ID.as_slice())));
}
get_block_buffer_manager().init(latest_block_number, block_number_to_block_id, max_epoch).await;
get_block_buffer_manager()
.init(latest_block_number, block_number_to_block_id, max_epoch)
.await?;
Ok(())
}
4 changes: 3 additions & 1 deletion crates/api/src/consensus_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,9 @@ impl ConsensusEngine {
);
runtimes.push(jwk_consensus_runtime);
}
init_block_buffer_manager(&consensus_db, latest_block_number).await;
init_block_buffer_manager(&consensus_db, latest_block_number)
.await
.expect("failed to initialize BlockBufferManager");
let mut args = ConsensusAdapterArgs::new(consensus_db.clone());
let (consensus_runtime, _, _) = start_consensus(
&node_config,
Expand Down
Loading
Loading