Skip to content

feat(starknet_integration_tests): struct to collect batched txs #4786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 101 additions & 125 deletions crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;

use alloy::node_bindings::AnvilInstance;
use blockifier::context::ChainInfo;
Expand All @@ -20,16 +21,9 @@ use papyrus_network::network_manager::test_utils::{
network_config_into_broadcast_channels,
};
use papyrus_network::network_manager::BroadcastTopicChannels;
use papyrus_protobuf::consensus::{
HeightAndRound,
ProposalFin,
ProposalInit,
ProposalPart,
StreamMessage,
StreamMessageBody,
};
use papyrus_protobuf::consensus::{HeightAndRound, ProposalPart, StreamMessage, StreamMessageBody};
use papyrus_storage::StorageConfig;
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block::BlockNumber;
use starknet_api::consensus_transaction::ConsensusTransaction;
use starknet_api::core::{ChainId, ContractAddress};
use starknet_api::execution_resources::GasAmount;
Expand All @@ -40,7 +34,6 @@ use starknet_api::transaction::{
TransactionHasher,
TransactionVersion,
};
use starknet_consensus::types::ValidatorId;
use starknet_consensus_manager::config::ConsensusManagerConfig;
use starknet_gateway_types::errors::GatewaySpecError;
use starknet_http_server::config::HttpServerConfig;
Expand All @@ -57,6 +50,7 @@ use starknet_sequencer_node::utils::create_node_modules;
use starknet_state_sync::config::StateSyncConfig;
use starknet_types_core::felt::Felt;
use tempfile::TempDir;
use tokio::sync::Mutex;
use tracing::{debug, instrument};
use url::Url;

Expand All @@ -69,7 +63,7 @@ use crate::utils::{
create_state_sync_configs,
send_message_to_l2,
spawn_local_success_recorder,
ExpectedContentId,
AccumulatedTransactions,
};

const SEQUENCER_0: usize = 0;
Expand Down Expand Up @@ -324,127 +318,109 @@ pub fn create_consensus_manager_configs_and_channels(
(consensus_manager_configs, broadcast_channels)
}

// Copy of listen_to_broadcasted_messages from end_to_end_flow_test.rs - will be modified to
// aggregate streamed transactions.
async fn _listen_to_broadcasted_messages(
consensus_proposals_channels: &mut BroadcastTopicChannels<
StreamMessage<ProposalPart, HeightAndRound>,
>,
expected_batched_tx_hashes: &[TransactionHash],
expected_height: BlockNumber,
expected_content_id: ExpectedContentId,
expected_proposer_id: ValidatorId,
chain_id: &ChainId,
) {
let broadcasted_messages_receiver =
&mut consensus_proposals_channels.broadcasted_messages_receiver;
// Collect messages in a map so that validations will use the ordering defined by
// `message_id`, meaning we ignore network reordering, like the StreamHandler.
let mut messages_cache = HashMap::new();
let mut last_message_id = 0;

while let Some((Ok(message), _)) = broadcasted_messages_receiver.next().await {
if message.stream_id.0 == expected_height.0 {
messages_cache.insert(message.message_id, message.clone());
} else {
panic!(
"Expected height: {}. Received message from unexpected height: {}",
expected_height.0, message.stream_id.0
);
}
if message.message == papyrus_protobuf::consensus::StreamMessageBody::Fin {
last_message_id = message.message_id;
}
// Check that we got the Fin message and all previous messages.
if last_message_id > 0 && (0..=last_message_id).all(|id| messages_cache.contains_key(&id)) {
break;
// Collects batched transactions.
struct _TxCollector {
pub consensus_proposals_channels:
BroadcastTopicChannels<StreamMessage<ProposalPart, HeightAndRound>>,
pub accumulated_txs: Arc<Mutex<AccumulatedTransactions>>,
pub chain_id: ChainId,
}

impl _TxCollector {
#[instrument(skip(self))]
pub async fn collect_streamd_txs(mut self) {
loop {
self.listen_to_broadcasted_messages().await;
}
}
// TODO(Dan, Guy): retrieve / calculate the expected proposal init and fin.
let expected_proposal_init = ProposalInit {
height: expected_height,
proposer: expected_proposer_id,
..Default::default()
};
let expected_proposal_fin = ProposalFin { proposal_commitment: BlockHash(expected_content_id) };

let StreamMessage {
stream_id: first_stream_id,
message: init_message,
message_id: incoming_message_id,
} = messages_cache.remove(&0).expect("Stream is missing its first message");

assert_eq!(
incoming_message_id, 0,
"Expected the first message in the stream to have id 0, got {}",
incoming_message_id
);
let StreamMessageBody::Content(ProposalPart::Init(incoming_proposal_init)) = init_message
else {
panic!("Expected an init message. Got: {:?}", init_message)
};
assert_eq!(
incoming_proposal_init, expected_proposal_init,
"Unexpected init message: {:?}, expected: {:?}",
incoming_proposal_init, expected_proposal_init
);

let mut received_tx_hashes = Vec::new();
let mut got_proposal_fin = false;
let mut got_channel_fin = false;
for i in 1_u64..messages_cache.len().try_into().unwrap() {
let StreamMessage { message, stream_id, message_id: _ } =
messages_cache.remove(&i).expect("Stream should have all consecutive messages");
assert_eq!(stream_id, first_stream_id, "Expected the same stream id for all messages");
match message {
StreamMessageBody::Content(ProposalPart::Init(init)) => {
panic!("Unexpected init: {:?}", init)
}
StreamMessageBody::Content(ProposalPart::Fin(proposal_fin)) => {
assert_eq!(
proposal_fin, expected_proposal_fin,
"Unexpected fin message: {:?}, expected: {:?}",
proposal_fin, expected_proposal_fin
);
got_proposal_fin = true;
async fn listen_to_broadcasted_messages(&mut self) {
let broadcasted_messages_receiver =
&mut self.consensus_proposals_channels.broadcasted_messages_receiver;
// Collect messages in a map so that validations will use the ordering defined by
// `message_id`, meaning we ignore network reordering, like the StreamHandler.
let mut messages_cache = HashMap::new();
let mut last_message_id = 0;

while let Some((Ok(message), _)) = broadcasted_messages_receiver.next().await {
messages_cache.insert(message.message_id, message.clone());

if message.message == papyrus_protobuf::consensus::StreamMessageBody::Fin {
last_message_id = message.message_id;
}
StreamMessageBody::Content(ProposalPart::BlockInfo(_)) => {
// TODO(Asmaa): Add validation for block info.
// Check that we got the Fin message and all previous messages.
if last_message_id > 0
&& (0..=last_message_id).all(|id| messages_cache.contains_key(&id))
{
break;
}
StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => {
// TODO(Arni): add calculate_transaction_hash to consensus transaction and use
// it here.
received_tx_hashes.extend(transactions.transactions.iter().map(|tx| match tx {
ConsensusTransaction::RpcTransaction(tx) => {
let starknet_api_tx =
starknet_api::transaction::Transaction::from(tx.clone());
starknet_api_tx.calculate_transaction_hash(chain_id).unwrap()
}
ConsensusTransaction::L1Handler(tx) => {
tx.calculate_transaction_hash(chain_id, &TransactionVersion::ZERO).unwrap()
}
}));
}

let StreamMessage {
stream_id: first_stream_id,
message: init_message,
message_id: incoming_message_id,
} = messages_cache.remove(&0).expect("Stream is missing its first message");

assert_eq!(
incoming_message_id, 0,
"Expected the first message in the stream to have id 0, got {}",
incoming_message_id
);
let StreamMessageBody::Content(ProposalPart::Init(incoming_proposal_init)) = init_message
else {
panic!("Expected an init message. Got: {:?}", init_message)
};

let mut received_tx_hashes = Vec::new();
let mut got_proposal_fin = false;
let mut got_channel_fin = false;
for i in 1_u64..messages_cache.len().try_into().unwrap() {
let StreamMessage { message, stream_id, message_id: _ } =
messages_cache.remove(&i).expect("Stream should have all consecutive messages");
assert_eq!(stream_id, first_stream_id, "Expected the same stream id for all messages");
match message {
StreamMessageBody::Content(ProposalPart::Init(init)) => {
panic!("Unexpected init: {:?}", init)
}
StreamMessageBody::Content(ProposalPart::Fin(..)) => {
got_proposal_fin = true;
}
StreamMessageBody::Content(ProposalPart::BlockInfo(_)) => {
// TODO(Asmaa): Add validation for block info.
}
StreamMessageBody::Content(ProposalPart::Transactions(transactions)) => {
// TODO(Arni): add calculate_transaction_hash to consensus transaction and use
// it here.
received_tx_hashes.extend(transactions.transactions.iter().map(|tx| {
match tx {
ConsensusTransaction::RpcTransaction(tx) => {
let starknet_api_tx =
starknet_api::transaction::Transaction::from(tx.clone());
starknet_api_tx.calculate_transaction_hash(&self.chain_id).unwrap()
}
ConsensusTransaction::L1Handler(tx) => tx
.calculate_transaction_hash(
&self.chain_id,
&TransactionVersion::ZERO,
)
.unwrap(),
}
}));

self.accumulated_txs.lock().await.add_transactions(
incoming_proposal_init.height,
incoming_proposal_init.round,
&received_tx_hashes,
);
}
StreamMessageBody::Fin => {
got_channel_fin = true;
}
}
StreamMessageBody::Fin => {
got_channel_fin = true;
if got_proposal_fin && got_channel_fin {
break;
}
}
if got_proposal_fin && got_channel_fin {
assert!(
received_tx_hashes.len() == expected_batched_tx_hashes.len(),
"Expected {} transactions, got {}",
expected_batched_tx_hashes.len(),
received_tx_hashes.len()
);
break;
}
}

received_tx_hashes.sort();
let mut expected_batched_tx_hashes = expected_batched_tx_hashes.to_vec();
expected_batched_tx_hashes.sort();
assert_eq!(
received_tx_hashes, expected_batched_tx_hashes,
"Unexpected transactions in block number {expected_height}"
);
}
Loading