Skip to content

Commit

Permalink
feat: dump state to s3 by multiple nodes (#9049)
Browse files Browse the repository at this point in the history
This change enables multiple nodes to dump state simultaneously. The nodes do not need to communicate to each other, they use s3 to check what's dumped and what's not, and each node will pick parts from un-dumped parts periodically and generate and upload these parts to s3. 
A node will keep dumping state parts for a shard of an epoch until it finds out that all parts for a shard for that epoch has been dumped. Then, the node will move on to the latest complete epoch locally for that shard. If a shard took more than an epoch's time to finish dumping, then we'd expect to see some epochs of state dump being skipped for the shard.
  • Loading branch information
ppca authored May 18, 2023
1 parent 16ce6b6 commit d331000
Show file tree
Hide file tree
Showing 9 changed files with 290 additions and 133 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Protocol Changes

### Non-protocol Changes
* Dump state by multiple nodes, each node will refer to s3 for which parts need to be dumped. [#9049](https://github.com/near/nearcore/pull/9049)

## 1.34.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,13 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
)
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_list_object_elapsed_sec",
"Latency of ls in external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
92 changes: 87 additions & 5 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::state_part::PartId;
use near_primitives::static_clock::StaticClock;
use near_primitives::syncing::{get_num_state_parts, ShardStateSyncResponse};
use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot};
use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot};
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
Expand All @@ -60,6 +60,9 @@ pub const MAX_STATE_PART_REQUEST: u64 = 16;
/// Number of state parts already requested stored as pending.
/// This number should not exceed MAX_STATE_PART_REQUEST times (number of peers in the network).
pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000;
/// Time limit per state dump iteration.
/// A node must check external storage for parts to dump again once time is up.
pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300;

pub enum StateSyncResult {
/// No shard has changed its status
Expand Down Expand Up @@ -179,6 +182,50 @@ impl ExternalConnection {
}
}
}

fn extract_file_name_from_full_path(full_path: String) -> String {
return Self::extract_file_name_from_path_buf(PathBuf::from(full_path));
}

fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String {
return path_buf.file_name().unwrap().to_str().unwrap().to_string();
}

pub async fn list_state_parts(
&self,
shard_id: ShardId,
directory_path: &str,
) -> Result<Vec<String>, anyhow::Error> {
let _timer = metrics::STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
match self {
ExternalConnection::S3 { bucket } => {
let prefix = format!("{}/", directory_path);
let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?;
tracing::debug!(target: "state_sync_dump", shard_id, ?directory_path, "List state parts in s3");
let mut file_names = vec![];
for res in list_results {
for obj in res.contents {
file_names.push(Self::extract_file_name_from_full_path(obj.key))
}
}
Ok(file_names)
}
ExternalConnection::Filesystem { root_dir } => {
let path = root_dir.join(directory_path);
tracing::debug!(target: "state_sync_dump", shard_id, ?path, "List state parts in local directory");
std::fs::create_dir_all(&path)?;
let mut file_names = vec![];
let files = std::fs::read_dir(&path)?;
for file in files {
let file_name = Self::extract_file_name_from_path_buf(file?.path());
file_names.push(file_name);
}
Ok(file_names)
}
}
}
}

/// Helper to track state sync.
Expand Down Expand Up @@ -726,6 +773,7 @@ impl StateSync {
part_id,
download,
shard_id,
epoch_id,
epoch_height,
state_num_parts,
&chain_id.clone(),
Expand Down Expand Up @@ -1172,6 +1220,7 @@ fn request_part_from_external_storage(
part_id: u64,
download: &mut DownloadStatus,
shard_id: ShardId,
epoch_id: &EpochId,
epoch_height: EpochHeight,
num_parts: u64,
chain_id: &str,
Expand All @@ -1189,7 +1238,8 @@ fn request_part_from_external_storage(
download.state_requests_count += 1;
download.last_target = None;

let location = external_storage_location(chain_id, epoch_height, shard_id, part_id, num_parts);
let location =
external_storage_location(chain_id, epoch_id, epoch_height, shard_id, part_id, num_parts);
let download_response = download.response.clone();
near_performance_metrics::actix::spawn("StateSync", {
async move {
Expand Down Expand Up @@ -1454,20 +1504,38 @@ impl<T: Clone> Iterator for SamplerLimited<T> {
/// Construct a location on the external storage.
pub fn external_storage_location(
chain_id: &str,
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
part_id: u64,
num_parts: u64,
) -> String {
format!(
"{}/{}",
location_prefix(chain_id, epoch_height, shard_id),
location_prefix(chain_id, epoch_height, epoch_id, shard_id),
part_filename(part_id, num_parts)
)
}

pub fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String {
format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id)
pub fn external_storage_location_directory(
chain_id: &str,
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
) -> String {
location_prefix(chain_id, epoch_height, epoch_id, shard_id)
}

pub fn location_prefix(
chain_id: &str,
epoch_height: u64,
epoch_id: &EpochId,
shard_id: u64,
) -> String {
format!(
"chain_id={}/epoch_height={}/epoch_id={}/shard_id={}",
chain_id, epoch_height, epoch_id.0, shard_id
)
}

pub fn part_filename(part_id: u64, num_parts: u64) -> String {
Expand All @@ -1494,6 +1562,17 @@ pub fn get_num_parts_from_filename(s: &str) -> Option<u64> {
None
}

pub fn get_part_id_from_filename(s: &str) -> Option<u64> {
if let Some(captures) = match_filename(s) {
if let Some(part_id) = captures.get(1) {
if let Ok(part_id) = part_id.as_str().parse::<u64>() {
return Some(part_id);
}
}
}
None
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -1637,5 +1716,8 @@ mod test {

assert_eq!(get_num_parts_from_filename(&filename), Some(15));
assert_eq!(get_num_parts_from_filename("123123"), None);

assert_eq!(get_part_id_from_filename(&filename), Some(5));
assert_eq!(get_part_id_from_filename("123123"), None);
}
}
2 changes: 0 additions & 2 deletions core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,5 @@ pub enum StateSyncDumpProgress {
/// Block hash of the first block of the epoch.
/// The dumped state corresponds to the state before applying this block.
sync_hash: CryptoHash,
/// Progress made.
parts_dumped: u64,
},
}
4 changes: 2 additions & 2 deletions integration-tests/src/tests/nearcore/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,10 @@ fn sync_empty_state() {
});
}

/// Runs one node for some time, which dumps state to a temp directory.
/// Start the second node which gets state parts from that temp directory.
#[test]
#[cfg_attr(not(feature = "expensive_tests"), ignore)]
/// Runs one node for some time, which dumps state to a temp directory.
/// Start the second node which gets state parts from that temp directory.
fn sync_state_dump() {
heavy_test(|| {
init_integration_logger();
Expand Down
1 change: 1 addition & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
rayon.workspace = true
regex.workspace = true
rlimit.workspace = true
rust-s3.workspace = true
serde.workspace = true
Expand Down
Loading

0 comments on commit d331000

Please sign in to comment.