Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dump state to s3 by multiple nodes #9049

Merged
merged 27 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Protocol Changes

### Non-protocol Changes
* Dump state by multiple nodes, each node will refer to s3 for which parts need to be dumped. [#9049](https://github.com/near/nearcore/pull/9049)

## 1.34.0

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions chain/client/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,3 +487,13 @@ pub(crate) static STATE_SYNC_DUMP_PUT_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy:
)
.unwrap()
});

pub(crate) static STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED: Lazy<HistogramVec> = Lazy::new(|| {
try_create_histogram_vec(
"near_state_sync_dump_list_object_elapsed_sec",
"Latency of ls in external storage",
&["shard_id"],
Some(exponential_buckets(0.001, 1.6, 25).unwrap()),
)
.unwrap()
});
92 changes: 87 additions & 5 deletions chain/client/src/sync/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use near_primitives::shard_layout::ShardUId;
use near_primitives::state_part::PartId;
use near_primitives::static_clock::StaticClock;
use near_primitives::syncing::{get_num_state_parts, ShardStateSyncResponse};
use near_primitives::types::{AccountId, EpochHeight, ShardId, StateRoot};
use near_primitives::types::{AccountId, EpochHeight, EpochId, ShardId, StateRoot};
use rand::seq::SliceRandom;
use rand::{thread_rng, Rng};
use std::collections::HashMap;
Expand All @@ -60,6 +60,9 @@ pub const MAX_STATE_PART_REQUEST: u64 = 16;
/// Number of state parts already requested stored as pending.
/// This number should not exceed MAX_STATE_PART_REQUEST times (number of peers in the network).
pub const MAX_PENDING_PART: u64 = MAX_STATE_PART_REQUEST * 10000;
/// Time limit per state dump iteration.
/// A node must check external storage for parts to dump again once time is up.
pub const STATE_DUMP_ITERATION_TIME_LIMIT_SECS: u64 = 300;

pub enum StateSyncResult {
/// No shard has changed its status
Expand Down Expand Up @@ -179,6 +182,50 @@ impl ExternalConnection {
}
}
}

fn extract_file_name_from_full_path(full_path: String) -> String {
return Self::extract_file_name_from_path_buf(PathBuf::from(full_path));
}

fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String {
return path_buf.file_name().unwrap().to_str().unwrap().to_string();
}

pub async fn list_state_parts(
&self,
shard_id: ShardId,
directory_path: &str,
) -> Result<Vec<String>, anyhow::Error> {
let _timer = metrics::STATE_SYNC_DUMP_LIST_OBJECT_ELAPSED
.with_label_values(&[&shard_id.to_string()])
.start_timer();
match self {
ExternalConnection::S3 { bucket } => {
let prefix = format!("{}/", directory_path);
let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?;
tracing::debug!(target: "state_sync_dump", shard_id, ?directory_path, "List state parts in s3");
let mut file_names = vec![];
for res in list_results {
for obj in res.contents {
file_names.push(Self::extract_file_name_from_full_path(obj.key))
}
}
Ok(file_names)
}
ExternalConnection::Filesystem { root_dir } => {
let path = root_dir.join(directory_path);
tracing::debug!(target: "state_sync_dump", shard_id, ?path, "List state parts in local directory");
std::fs::create_dir_all(&path)?;
let mut file_names = vec![];
let files = std::fs::read_dir(&path)?;
for file in files {
let file_name = Self::extract_file_name_from_path_buf(file?.path());
file_names.push(file_name);
}
Ok(file_names)
}
}
}
}

/// Helper to track state sync.
Expand Down Expand Up @@ -726,6 +773,7 @@ impl StateSync {
part_id,
download,
shard_id,
epoch_id,
epoch_height,
state_num_parts,
&chain_id.clone(),
Expand Down Expand Up @@ -1172,6 +1220,7 @@ fn request_part_from_external_storage(
part_id: u64,
download: &mut DownloadStatus,
shard_id: ShardId,
epoch_id: &EpochId,
epoch_height: EpochHeight,
num_parts: u64,
chain_id: &str,
Expand All @@ -1189,7 +1238,8 @@ fn request_part_from_external_storage(
download.state_requests_count += 1;
download.last_target = None;

let location = external_storage_location(chain_id, epoch_height, shard_id, part_id, num_parts);
let location =
external_storage_location(chain_id, epoch_id, epoch_height, shard_id, part_id, num_parts);
let download_response = download.response.clone();
near_performance_metrics::actix::spawn("StateSync", {
async move {
Expand Down Expand Up @@ -1454,20 +1504,38 @@ impl<T: Clone> Iterator for SamplerLimited<T> {
/// Construct a location on the external storage.
pub fn external_storage_location(
chain_id: &str,
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
part_id: u64,
num_parts: u64,
) -> String {
format!(
"{}/{}",
location_prefix(chain_id, epoch_height, shard_id),
location_prefix(chain_id, epoch_height, epoch_id, shard_id),
part_filename(part_id, num_parts)
)
}

pub fn location_prefix(chain_id: &str, epoch_height: u64, shard_id: u64) -> String {
format!("chain_id={}/epoch_height={}/shard_id={}", chain_id, epoch_height, shard_id)
pub fn external_storage_location_directory(
chain_id: &str,
epoch_id: &EpochId,
epoch_height: u64,
shard_id: u64,
) -> String {
location_prefix(chain_id, epoch_height, epoch_id, shard_id)
}

pub fn location_prefix(
chain_id: &str,
epoch_height: u64,
epoch_id: &EpochId,
shard_id: u64,
) -> String {
format!(
"chain_id={}/epoch_height={}/epoch_id={}/shard_id={}",
chain_id, epoch_height, epoch_id.0, shard_id
)
}

pub fn part_filename(part_id: u64, num_parts: u64) -> String {
Expand All @@ -1494,6 +1562,17 @@ pub fn get_num_parts_from_filename(s: &str) -> Option<u64> {
None
}

pub fn get_part_id_from_filename(s: &str) -> Option<u64> {
if let Some(captures) = match_filename(s) {
if let Some(part_id) = captures.get(1) {
if let Ok(part_id) = part_id.as_str().parse::<u64>() {
return Some(part_id);
}
}
}
None
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -1637,5 +1716,8 @@ mod test {

assert_eq!(get_num_parts_from_filename(&filename), Some(15));
assert_eq!(get_num_parts_from_filename("123123"), None);

assert_eq!(get_part_id_from_filename(&filename), Some(5));
assert_eq!(get_part_id_from_filename("123123"), None);
}
}
2 changes: 0 additions & 2 deletions core/primitives/src/syncing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,5 @@ pub enum StateSyncDumpProgress {
/// Block hash of the first block of the epoch.
/// The dumped state corresponds to the state before applying this block.
sync_hash: CryptoHash,
/// Progress made.
parts_dumped: u64,
},
}
4 changes: 2 additions & 2 deletions integration-tests/src/tests/nearcore/sync_state_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,10 +398,10 @@ fn sync_empty_state() {
});
}

/// Runs one node for some time, which dumps state to a temp directory.
/// Start the second node which gets state parts from that temp directory.
#[test]
#[cfg_attr(not(feature = "expensive_tests"), ignore)]
/// Runs one node for some time, which dumps state to a temp directory.
/// Start the second node which gets state parts from that temp directory.
fn sync_state_dump() {
heavy_test(|| {
init_integration_logger();
Expand Down
1 change: 1 addition & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ num-rational.workspace = true
once_cell.workspace = true
rand.workspace = true
rayon.workspace = true
regex.workspace = true
rlimit.workspace = true
rust-s3.workspace = true
serde.workspace = true
Expand Down
Loading