Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Perf

### 2026-01-21

- Improve snap sync logging with table format and visual progress bars [#5977](https://github.com/lambdaclass/ethrex/pull/5977)
Comment on lines +5 to +7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually affect performance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's just visual modification (that is displayed every 30 secs).


### 2026-01-20

- Remove `ethrex-threadpool` crate and move `ThreadPool` to `ethrex-trie` [#5925](https://github.com/lambdaclass/ethrex/pull/5925)
Expand Down
211 changes: 62 additions & 149 deletions crates/networking/p2p/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::{
io,
net::SocketAddr,
sync::{Arc, atomic::Ordering},
time::{Duration, SystemTime},
time::Duration,
};
use tokio::net::{TcpListener, TcpSocket, UdpSocket};
use tokio_util::task::TaskTracker;
Expand Down Expand Up @@ -200,177 +200,92 @@ pub async fn periodically_show_peer_stats_during_syncing(
// We just clamp it to the max to avoid showing the user confusing data
let headers_downloaded =
u64::min(METRICS.downloaded_headers.get(), headers_to_download);
let headers_remaining = headers_to_download.saturating_sub(headers_downloaded);
let headers_download_progress = if headers_to_download == 0 {
"0%".to_string()
let headers_percentage = if headers_to_download == 0 {
0.0
} else {
(headers_downloaded as f64 / headers_to_download as f64) * 100.0
};
let elapsed_secs = start.elapsed().as_secs();
let headers_per_second = if elapsed_secs == 0 {
0
} else {
format!(
"{:.2}%",
(headers_downloaded as f64 / headers_to_download as f64) * 100.0
)
headers_downloaded / elapsed_secs
};

// Account leaves metrics
let account_leaves_downloaded =
METRICS.downloaded_account_tries.load(Ordering::Relaxed);
let account_leaves_inserted = METRICS.account_tries_inserted.load(Ordering::Relaxed);
let account_leaves_inserted_percentage = if account_leaves_downloaded != 0 {
(account_leaves_inserted as f64 / account_leaves_downloaded as f64) * 100.0
} else {
0.0
};
let account_leaves_pending =
account_leaves_downloaded.saturating_sub(account_leaves_inserted);
let account_leaves_time = format_duration({
let end_time = METRICS
.account_tries_download_end_time
.lock()
.await
.unwrap_or(SystemTime::now());

METRICS
.account_tries_download_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.unwrap_or(Duration::from_secs(0))
})
.unwrap_or(Duration::from_secs(0))
});
let account_leaves_inserted_time = format_duration({
let end_time = METRICS
.account_tries_insert_end_time
.lock()
.await
.unwrap_or(SystemTime::now());

METRICS
.account_tries_insert_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.unwrap_or(Duration::from_secs(0))
})
.unwrap_or(Duration::from_secs(0))
});
let accounts_per_second =
if let Some(start_time) = *METRICS.account_tries_download_start_time.lock().await {
let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0);
if elapsed_secs == 0 {
0
} else {
account_leaves_downloaded / elapsed_secs
}
} else {
0
};

// Storage leaves metrics
let storage_leaves_downloaded = METRICS.storage_leaves_downloaded.get();
let storage_leaves_inserted = METRICS.storage_leaves_inserted.get();
let storage_leaves_inserted_percentage = if storage_leaves_downloaded != 0 {
storage_leaves_inserted as f64 / storage_leaves_downloaded as f64 * 100.0
} else {
0.0
};
// We round up because of the accounts whose slots get downloaded and then not used
let storage_leaves_inserted_percentage =
(storage_leaves_inserted_percentage * 10.0).round() / 10.0;
let storage_leaves_time = format_duration({
let end_time = METRICS
.storage_tries_download_end_time
.lock()
.await
.unwrap_or(SystemTime::now());

METRICS
.storage_tries_download_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.unwrap_or(Duration::from_secs(0))
})
.unwrap_or(Duration::from_secs(0))
});
let storage_leaves_inserted_time = format_duration({
let end_time = METRICS
.storage_tries_insert_end_time
.lock()
.await
.unwrap_or(SystemTime::now());

METRICS
.storage_tries_insert_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.unwrap_or(Duration::from_secs(0))
})
.unwrap_or(Duration::from_secs(0))
});

// Healing stuff
let heal_time = format_duration({
let end_time = METRICS
.heal_end_time
.lock()
.await
.unwrap_or(SystemTime::now());
let storage_per_second =
if let Some(start_time) = *METRICS.storage_tries_download_start_time.lock().await {
let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0);
if elapsed_secs == 0 {
0
} else {
storage_leaves_downloaded / elapsed_secs
}
} else {
0
};

METRICS
.heal_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.expect("Failed to get storage tries download time")
})
.unwrap_or(Duration::from_secs(0))
});
// Healing metrics
let healed_accounts = METRICS
.global_state_trie_leafs_healed
.load(Ordering::Relaxed);
let healed_storages = METRICS
.global_storage_tries_leafs_healed
.load(Ordering::Relaxed);
let heal_current_throttle =
if METRICS.healing_empty_try_recv.load(Ordering::Relaxed) == 0 {
"Database"
} else {
"Peers"
};

// Bytecode metrics
let bytecodes_download_time = format_duration({
let end_time = METRICS
.bytecode_download_end_time
.lock()
.await
.unwrap_or(SystemTime::now());

METRICS
.bytecode_download_start_time
.lock()
.await
.map(|start_time| {
end_time
.duration_since(start_time)
.expect("Failed to get storage tries download time")
})
.unwrap_or(Duration::from_secs(0))
});

let bytecodes_downloaded = METRICS.downloaded_bytecodes.load(Ordering::Relaxed);
let bytecodes_per_second =
if let Some(start_time) = *METRICS.bytecode_download_start_time.lock().await {
let elapsed_secs = start_time.elapsed().map(|d| d.as_secs()).unwrap_or(0);
if elapsed_secs == 0 {
0
} else {
bytecodes_downloaded / elapsed_secs
}
} else {
0
};

// Truncate hash to first 6 hex chars
let head_short = format!("{:x}", current_header_hash);
let head_short = &head_short[..6.min(head_short.len())];

info!(
r#"
P2P Snap Sync | elapsed {elapsed} | peers {peer_number} | step {current_step} | head {current_header_hash:x}
headers : {headers_downloaded}/{headers_to_download} ({headers_download_progress}), remaining {headers_remaining}
accounts: downloaded {account_leaves_downloaded} @ {account_leaves_time} | inserted {account_leaves_inserted} ({account_leaves_inserted_percentage:.1}%) in {account_leaves_inserted_time} | pending {account_leaves_pending}
storage : downloaded {storage_leaves_downloaded} @ {storage_leaves_time} | inserted {storage_leaves_inserted} ({storage_leaves_inserted_percentage:.1}%) in {storage_leaves_inserted_time}
healing : accounts {healed_accounts}, storages {healed_storages}, elapsed {heal_time}, throttle {heal_current_throttle}
bytecodes: downloaded {bytecodes_downloaded} in {bytecodes_download_time}"#
───────────────────────────────────────────────────────────────────────
SNAP SYNC │ {elapsed} │ {peer_number} peers │ {current_step} │ {head_short}
───────────────────────────────────────────────────────────────────────
1. Headers Downloaded {headers_downloaded:>13} {headers_percentage:>5.1}% {headers_per_second} headers/s
2. Accounts Downloaded {account_leaves_downloaded:>13} {accounts_per_second} accounts/s
3. Accounts Inserted {account_leaves_inserted:>13}
4. Storage Downloaded {storage_leaves_downloaded:>13} {storage_per_second} storage slots/s
5. Storage Inserted {storage_leaves_inserted:>13}
6. Healing: {healed_accounts} accounts
7. Healing: {healed_storages} storages
8. Bytecodes Downloaded {bytecodes_downloaded:>13} {bytecodes_per_second} bytecodes/s
Comment on lines +277 to +284
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a nit, but with this formatting we get:

1. Headers Downloaded      1791878       85.9%       9685 headers/s
2. Accounts Downloaded           0                   0 accounts/s
...

Could we make it look like this (aligning the unit names)?

1. Headers Downloaded      1791878       85.9%       9685 headers/s
2. Accounts Downloaded           0                   0    accounts/s
...

───────────────────────────────────────────────────────────────────────"#
);
}
tokio::time::sleep(Duration::from_secs(10)).await;
tokio::time::sleep(Duration::from_secs(30)).await;
}
}

Expand Down Expand Up @@ -404,7 +319,5 @@ fn format_duration(duration: Duration) -> String {
let hours = total_seconds / 3600;
let minutes = (total_seconds % 3600) / 60;
let seconds = total_seconds % 60;
let milliseconds = total_seconds / 1000;

format!("{hours:02}h {minutes:02}m {seconds:02}s {milliseconds:02}ms")
format!("{hours:02}:{minutes:02}:{seconds:02}")
}
30 changes: 15 additions & 15 deletions crates/networking/p2p/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl PeerHandler {

let sync_head_number_retrieval_start = SystemTime::now();

info!("Retrieving sync head block number from peers");
debug!("Retrieving sync head block number from peers");

let mut retries = 1;

Expand Down Expand Up @@ -242,7 +242,7 @@ impl PeerHandler {
.elapsed()
.unwrap_or_default();

info!("Sync head block number retrieved");
debug!("Sync head block number retrieved");

*METRICS.time_to_retrieve_sync_head_block.lock().await =
Some(sync_head_number_retrieval_elapsed);
Expand Down Expand Up @@ -277,7 +277,7 @@ impl PeerHandler {

// 3) create tasks that will request a chunk of headers from a peer

info!("Starting to download block headers from peers");
debug!("Starting to download block headers from peers");

*METRICS.headers_download_start_time.lock().await = Some(SystemTime::now());

Expand Down Expand Up @@ -352,7 +352,7 @@ impl PeerHandler {

let Some((startblock, chunk_limit)) = tasks_queue_not_started.pop_front() else {
if downloaded_count >= block_count {
info!("All headers downloaded successfully");
debug!("All headers downloaded successfully");
break;
}

Expand Down Expand Up @@ -395,7 +395,7 @@ impl PeerHandler {
let elapsed = start_time.elapsed().unwrap_or_default();

debug!(
"Downloaded {} headers in {} seconds",
"Downloaded all headers ({}) in {} seconds",
ret.len(),
format_duration(elapsed)
);
Expand All @@ -413,7 +413,7 @@ impl PeerHandler {

match downloaded_headers.cmp(&unique_headers.len()) {
std::cmp::Ordering::Equal => {
info!("All downloaded headers are unique");
debug!("All downloaded headers are unique");
}
std::cmp::Ordering::Greater => {
warn!(
Expand Down Expand Up @@ -655,7 +655,7 @@ impl PeerHandler {
let (task_sender, mut task_receiver) =
tokio::sync::mpsc::channel::<(Vec<AccountRangeUnit>, H256, Option<(H256, H256)>)>(1000);

info!("Starting to download account ranges from peers");
debug!("Starting to download account ranges from peers");

*METRICS.account_tries_download_start_time.lock().await = Some(SystemTime::now());

Expand Down Expand Up @@ -755,7 +755,7 @@ impl PeerHandler {

let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
if completed_tasks >= chunk_count {
info!("All account ranges downloaded successfully");
debug!("All account ranges downloaded successfully");
break;
}
continue;
Expand All @@ -764,7 +764,7 @@ impl PeerHandler {
let tx = task_sender.clone();

if block_is_stale(pivot_header) {
info!("request_account_range became stale, updating pivot");
debug!("request_account_range became stale, updating pivot");
*pivot_header = update_pivot(
pivot_header.number,
pivot_header.timestamp,
Expand Down Expand Up @@ -979,7 +979,7 @@ impl PeerHandler {
}
let (task_sender, mut task_receiver) = tokio::sync::mpsc::channel::<TaskResult>(1000);

info!("Starting to download bytecodes from peers");
debug!("Starting to download bytecodes from peers");

METRICS
.bytecodes_to_download
Expand Down Expand Up @@ -1042,7 +1042,7 @@ impl PeerHandler {

let Some((chunk_start, chunk_end)) = tasks_queue_not_started.pop_front() else {
if completed_tasks >= chunk_count {
info!("All bytecodes downloaded successfully");
debug!("All bytecodes downloaded successfully");
break;
}
continue;
Expand Down Expand Up @@ -1117,7 +1117,7 @@ impl PeerHandler {
METRICS
.downloaded_bytecodes
.fetch_add(downloaded_count, Ordering::Relaxed);
info!(
debug!(
"Finished downloading bytecodes, total bytecodes: {}",
all_bytecode_hashes.len()
);
Expand Down Expand Up @@ -1927,7 +1927,7 @@ impl PeerHandler {
skip: 0,
reverse: false,
});
info!("get_block_header: requesting header with number {block_number}");
debug!("get_block_header: requesting header with number {block_number}");
match PeerHandler::make_request(
&mut self.peer_table,
peer_id,
Expand All @@ -1951,10 +1951,10 @@ impl PeerHandler {
}
}
Ok(_other_msgs) => {
info!("Received unexpected message from peer");
debug!("Received unexpected message from peer");
}
Err(PeerConnectionError::Timeout) => {
info!("Timeout while waiting for sync head from peer");
debug!("Timeout while waiting for sync head from peer");
}
// TODO: we need to check, this seems a scenario where the peer channel does teardown
// after we sent the backend message
Expand Down
Loading