Skip to content

Commit

Permalink
Mempool API to Get All Addresses From Parking Lot (#14740)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariria authored Oct 9, 2024
1 parent 28530a7 commit 0cd5233
Show file tree
Hide file tree
Showing 15 changed files with 179 additions and 6 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ where
let config = OnChainJWKConsensusConfig::default_enabled();
println!("Flag `INITIALIZE_JWK_CONSENSUS` detected, will enable JWK Consensus for all default OIDC providers in genesis: {:?}", config);
Some(config)
},
}
_ => None,
};
})))
Expand Down Expand Up @@ -704,6 +704,7 @@ pub fn setup_environment_and_start_node(
indexer_runtime,
indexer_grpc_runtime,
internal_indexer_db_runtime,
mempool_client_sender,
) = services::bootstrap_api_and_indexer(
&node_config,
db_rw.clone(),
Expand All @@ -714,6 +715,9 @@ pub fn setup_environment_and_start_node(
indexer_grpc_port_tx,
)?;

// Set mempool client sender in order to enable the Mempool API in the admin service
admin_service.set_mempool_client_sender(mempool_client_sender);

// Create mempool and get the consensus to mempool sender
let (mempool_runtime, consensus_to_mempool_sender) =
services::start_mempool_runtime_and_get_consensus_sender(
Expand Down
8 changes: 6 additions & 2 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use aptos_indexer_grpc_table_info::runtime::{
bootstrap as bootstrap_indexer_table_info, bootstrap_internal_indexer_db,
};
use aptos_logger::{debug, telemetry_log_writer::TelemetryLog, LoggerFilterUpdater};
use aptos_mempool::{network::MempoolSyncMsg, MempoolClientRequest, QuorumStoreRequest};
use aptos_mempool::{
network::MempoolSyncMsg, MempoolClientRequest, MempoolClientSender, QuorumStoreRequest,
};
use aptos_mempool_notifications::MempoolNotificationListener;
use aptos_network::application::{interface::NetworkClientInterface, storage::PeersAndMetadata};
use aptos_network_benchmark::{run_netbench_service, NetbenchMessage};
Expand Down Expand Up @@ -59,6 +61,7 @@ pub fn bootstrap_api_and_indexer(
Option<Runtime>,
Option<Runtime>,
Option<Runtime>,
MempoolClientSender,
)> {
// Create the mempool client and sender
let (mempool_client_sender, mempool_client_receiver) =
Expand Down Expand Up @@ -120,7 +123,7 @@ pub fn bootstrap_api_and_indexer(
node_config,
chain_id,
db_rw.reader.clone(),
mempool_client_sender,
mempool_client_sender.clone(),
)?;

Ok((
Expand All @@ -130,6 +133,7 @@ pub fn bootstrap_api_and_indexer(
indexer_runtime,
indexer_grpc,
db_indexer_runtime,
mempool_client_sender,
))
}

Expand Down
2 changes: 2 additions & 0 deletions crates/aptos-admin-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ aptos-consensus = { workspace = true }
aptos-crypto = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-mempool = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-system-utils = { workspace = true }
aptos-types = { workspace = true }
bcs = { workspace = true }
futures-channel = { workspace = true }
http = { workspace = true }
hyper = { workspace = true }
sha256 = { workspace = true }
Expand Down
55 changes: 55 additions & 0 deletions crates/aptos-admin-service/src/server/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_logger::info;
use aptos_mempool::{MempoolClientRequest, MempoolClientSender};
use aptos_system_utils::utils::{reply_with, reply_with_status};
use aptos_types::account_address::AccountAddress;
use futures_channel::oneshot::Canceled;
use http::{Request, Response, StatusCode};
use hyper::Body;

pub async fn mempool_handle_parking_lot_address_request(
_req: Request<Body>,
mempool_client_sender: MempoolClientSender,
) -> hyper::Result<Response<Body>> {
match get_parking_lot_addresses(mempool_client_sender).await {
Ok(addresses) => {
info!("Finished getting parking lot addresses from mempool.");
match bcs::to_bytes(&addresses) {
Ok(addresses) => Ok(reply_with(vec![], addresses)),
Err(e) => {
info!("Failed to bcs serialize parking lot addresses from mempool: {e:?}");
Ok(reply_with_status(
StatusCode::INTERNAL_SERVER_ERROR,
e.to_string(),
))
},
}
},
Err(e) => {
info!("Failed to get parking lot addresses from mempool: {e:?}");
Ok(reply_with_status(
StatusCode::INTERNAL_SERVER_ERROR,
e.to_string(),
))
},
}
}

async fn get_parking_lot_addresses(
mempool_client_sender: MempoolClientSender,
) -> Result<Vec<(AccountAddress, u64)>, Canceled> {
let (sender, receiver) = futures_channel::oneshot::channel();

match mempool_client_sender
.clone()
.try_send(MempoolClientRequest::GetAddressesFromParkingLot(sender))
{
Ok(_) => receiver.await,
Err(e) => {
info!("Failed to send request for GetAddressesFromParkingLot: {e:?}");
Err(Canceled)
},
}
}
27 changes: 27 additions & 0 deletions crates/aptos-admin-service/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use aptos_consensus::{
};
use aptos_infallible::RwLock;
use aptos_logger::info;
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReaderWriter;
use aptos_system_utils::utils::reply_with_status;
#[cfg(target_os = "linux")]
Expand All @@ -26,6 +27,7 @@ use std::{
use tokio::runtime::Runtime;

mod consensus;
mod mempool;

#[derive(Default)]
pub struct Context {
Expand All @@ -34,6 +36,7 @@ pub struct Context {
aptos_db: RwLock<Option<Arc<DbReaderWriter>>>,
consensus_db: RwLock<Option<Arc<StorageWriteProxy>>>,
quorum_store_db: RwLock<Option<Arc<QuorumStoreDB>>>,
mempool_client_sender: RwLock<Option<MempoolClientSender>>,
}

impl Context {
Expand All @@ -49,6 +52,10 @@ impl Context {
*self.consensus_db.write() = Some(consensus_db);
*self.quorum_store_db.write() = Some(quorum_store_db);
}

fn set_mempool_client_sender(&self, mempool_client_sender: MempoolClientSender) {
*self.mempool_client_sender.write() = Some(mempool_client_sender);
}
}

pub struct AdminService {
Expand Down Expand Up @@ -107,6 +114,11 @@ impl AdminService {
.set_consensus_dbs(consensus_db, quorum_store_db)
}

pub fn set_mempool_client_sender(&self, mempool_client_sender: MempoolClientSender) {
self.context
.set_mempool_client_sender(mempool_client_sender)
}

fn start(&self, address: SocketAddr, enabled: bool) {
let context = self.context.clone();
self.runtime.spawn(async move {
Expand Down Expand Up @@ -210,6 +222,21 @@ impl AdminService {
))
}
},
(hyper::Method::GET, "/debug/mempool/parking-lot/addresses") => {
let mempool_client_sender = context.mempool_client_sender.read().clone();
if mempool_client_sender.is_some() {
mempool::mempool_handle_parking_lot_address_request(
req,
mempool_client_sender.unwrap(),
)
.await
} else {
Ok(reply_with_status(
StatusCode::NOT_FOUND,
"Mempool parking lot is not available.",
))
}
},
_ => Ok(reply_with_status(StatusCode::NOT_FOUND, "Not found.")),
}
}
Expand Down
7 changes: 7 additions & 0 deletions mempool/src/core_mempool/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,13 @@ impl ParkingLotIndex {
pub(crate) fn size(&self) -> usize {
self.size
}

pub(crate) fn get_addresses(&self) -> Vec<(AccountAddress, u64)> {
self.data
.iter()
.map(|(addr, txns)| (*addr, txns.len() as u64))
.collect::<Vec<(AccountAddress, u64)>>()
}
}

/// Logical pointer to `MempoolTransaction`.
Expand Down
4 changes: 4 additions & 0 deletions mempool/src/core_mempool/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,4 +597,8 @@ impl Mempool {
pub fn get_transaction_store(&self) -> &TransactionStore {
&self.transactions
}

pub fn get_parking_lot_addresses(&self) -> Vec<(AccountAddress, u64)> {
self.transactions.get_parking_lot_addresses()
}
}
4 changes: 4 additions & 0 deletions mempool/src/core_mempool/transaction_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -916,4 +916,8 @@ impl TransactionStore {
pub(crate) fn get_transactions(&self) -> &HashMap<AccountAddress, AccountTransactions> {
&self.transactions
}

pub(crate) fn get_parking_lot_addresses(&self) -> Vec<(AccountAddress, u64)> {
self.parking_lot_index.get_addresses()
}
}
5 changes: 3 additions & 2 deletions mempool/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ pub const SUCCESS_LABEL: &str = "success";
// Bounded executor task labels
pub const CLIENT_EVENT_LABEL: &str = "client_event";
pub const CLIENT_EVENT_GET_TXN_LABEL: &str = "client_event_get_txn";
pub const CLIENT_EVENT_GET_PARKING_LOT_ADDRESSES: &str = "client_event_get_parking_lot_addresses";
pub const RECONFIG_EVENT_LABEL: &str = "reconfig";
pub const PEER_BROADCAST_EVENT_LABEL: &str = "peer_broadcast";

Expand Down Expand Up @@ -284,7 +285,7 @@ pub static CORE_MEMPOOL_GC_EVENT_COUNT: Lazy<IntCounterVec> = Lazy::new(|| {
"aptos_core_mempool_gc_event_count",
"Number of times the periodic garbage-collection event occurs, regardless of how many txns were actually removed",
&["type"])
.unwrap()
.unwrap()
});

/// Counter for number of periodic client garbage-collection (=GC) events that happen with eager
Expand Down Expand Up @@ -362,7 +363,7 @@ static MEMPOOL_SERVICE_TXNS: Lazy<HistogramVec> = Lazy::new(|| {
&["type"],
TXN_COUNT_BUCKETS.clone()
)
.unwrap()
.unwrap()
});

pub fn mempool_service_transactions(label: &'static str, num: usize) {
Expand Down
5 changes: 5 additions & 0 deletions mempool/src/shared_mempool/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ async fn handle_client_request<NetworkClient, TransactionValidator>(
))
.await;
},
MempoolClientRequest::GetAddressesFromParkingLot(callback) => {
bounded_executor
.spawn(tasks::process_parking_lot_addresses(smp.clone(), callback))
.await;
},
}
}

Expand Down
20 changes: 20 additions & 0 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use aptos_metrics_core::HistogramTimer;
use aptos_network::application::interface::NetworkClientInterface;
use aptos_storage_interface::state_view::LatestDbStateCheckpointView;
use aptos_types::{
account_address::AccountAddress,
mempool_status::{MempoolStatus, MempoolStatusCode},
on_chain_config::{OnChainConfigPayload, OnChainConfigProvider, OnChainConsensusConfig},
transaction::SignedTransaction,
Expand Down Expand Up @@ -151,6 +152,25 @@ pub(crate) async fn process_client_transaction_submission<NetworkClient, Transac
}
}

/// Processes request for all addresses in parking lot
pub(crate) async fn process_parking_lot_addresses<NetworkClient, TransactionValidator>(
smp: SharedMempool<NetworkClient, TransactionValidator>,
callback: oneshot::Sender<Vec<(AccountAddress, u64)>>,
) where
NetworkClient: NetworkClientInterface<MempoolSyncMsg>,
TransactionValidator: TransactionValidation + 'static,
{
let addresses = smp.mempool.lock().get_parking_lot_addresses();

if callback.send(addresses).is_err() {
warn!(LogSchema::event_log(
LogEntry::JsonRpc,
LogEvent::CallbackFail
));
counters::CLIENT_CALLBACK_FAIL.inc();
}
}

/// Processes get transaction by hash request by client.
pub(crate) async fn process_client_get_transaction<NetworkClient, TransactionValidator>(
smp: SharedMempool<NetworkClient, TransactionValidator>,
Expand Down
8 changes: 7 additions & 1 deletion mempool/src/shared_mempool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use aptos_infallible::{Mutex, RwLock};
use aptos_network::application::interface::NetworkClientInterface;
use aptos_storage_interface::DbReader;
use aptos_types::{
mempool_status::MempoolStatus, transaction::SignedTransaction, vm_status::DiscardedVMStatus,
account_address::AccountAddress, mempool_status::MempoolStatus, transaction::SignedTransaction,
vm_status::DiscardedVMStatus,
};
use aptos_vm_validator::vm_validator::TransactionValidation;
use futures::{
Expand Down Expand Up @@ -235,8 +236,13 @@ pub type SubmissionStatus = (MempoolStatus, Option<DiscardedVMStatus>);
pub type SubmissionStatusBundle = (SignedTransaction, SubmissionStatus);

pub enum MempoolClientRequest {
/// Submits a transaction to the mempool and returns its submission status
SubmitTransaction(SignedTransaction, oneshot::Sender<Result<SubmissionStatus>>),
/// Retrieves a signed transaction from the mempool using its hash
GetTransactionByHash(HashValue, oneshot::Sender<Option<SignedTransaction>>),
/// Retrieves all addresses with transactions in the mempool's parking lot and
/// the number of transactions for each address
GetAddressesFromParkingLot(oneshot::Sender<Vec<(AccountAddress, u64)>>),
}

pub type MempoolClientSender = mpsc::Sender<MempoolClientRequest>;
Expand Down
23 changes: 23 additions & 0 deletions mempool/src/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,29 @@ async fn test_rebroadcast_retry_is_empty() {
.await;
}

#[tokio::test]
async fn test_get_all_addresses_from_parking_lot() {
let mut node = MempoolTestFrameworkBuilder::single_validator();

// Add second txn. Using TXN_2 here because sequence number needs to be higher than expected
// to be put in parking lot
node.add_txns_via_client(&TXN_2).await;

// Check to make sure transaction is in parking lot
let addresses = node.get_parking_lot_txns_via_client().await;
let address = addresses.first().unwrap().0;
let txn_size = addresses.first().unwrap().1;

// Assert that the address matches
assert_eq!(
address.to_string(),
TXN_2.first().unwrap().address.to_string()
);

// Assert there is only one transaction for that address
assert_eq!(txn_size, 1);
}

// -- Multi node tests below here --

/// Tests if the node is a VFN, and it's getting forwarded messages from a PFN. It should forward
Expand Down
9 changes: 9 additions & 0 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ impl MempoolNode {
}
}

pub async fn get_parking_lot_txns_via_client(&mut self) -> Vec<(AccountAddress, u64)> {
let (sender, receiver) = oneshot::channel();
self.mempool_client_sender
.send(MempoolClientRequest::GetAddressesFromParkingLot(sender))
.await
.unwrap();
receiver.await.unwrap()
}

/// Asynchronously waits for up to 1 second for txns to appear in mempool
pub async fn wait_on_txns_in_mempool(&self, txns: &[TestTransaction]) {
for _ in 0..10 {
Expand Down

0 comments on commit 0cd5233

Please sign in to comment.