Skip to content

Commit

Permalink
Verify archive state using a one shot in memory store (MystenLabs#12730)
Browse files Browse the repository at this point in the history
## Description 

Added a shared in memory store which only keeps last checkpoint and its
content in memory. This helps iterate over checkpoints really fast as we
don't need to use a db and can run verification without writing to
rocksdb ever.

## Test Plan 

Added unit tests
  • Loading branch information
sadhansood authored Jul 5, 2023
1 parent 94803f6 commit 04092e7
Show file tree
Hide file tree
Showing 11 changed files with 421 additions and 19 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/sui-archival/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
authors = ["Mysten Labs <build@mystenlabs.com>"]

[dependencies]
indicatif.workspace = true
integer-encoding.workspace = true
anyhow.workspace = true
serde.workspace = true
Expand All @@ -24,6 +25,7 @@ prometheus.workspace = true
zstd.workspace = true
typed-store.workspace = true
typed-store-derive.workspace = true
sui-config.workspace = true
sui-types.workspace = true
sui-storage.workspace = true
sui-protocol-config.workspace = true
Expand Down
106 changes: 106 additions & 0 deletions crates/sui-archival/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,33 @@ pub mod writer;
#[cfg(test)]
mod tests;

use crate::reader::ArchiveReader;
use anyhow::{anyhow, Result};
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use bytes::Bytes;
use fastcrypto::hash::{HashFunction, Sha3_256};
use indicatif::{ProgressBar, ProgressStyle};
use num_enum::IntoPrimitive;
use num_enum::TryFromPrimitive;
use object_store::path::Path;
use object_store::DynObjectStore;
use serde::{Deserialize, Serialize};
use std::io::{BufWriter, Cursor, Read, Seek, SeekFrom, Write};
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use sui_config::genesis::Genesis;
use sui_config::node::ArchiveReaderConfig;
use sui_storage::blob::{Blob, BlobEncoding};
use sui_storage::object_store::util::{get, put};
use sui_storage::object_store::ObjectStoreConfig;
use sui_storage::{compute_sha3_checksum, SHA3_BYTES};
use sui_types::base_types::ExecutionData;
use sui_types::messages_checkpoint::{FullCheckpointContents, VerifiedCheckpointContents};
use sui_types::storage::{ReadStore, SingleCheckpointSharedInMemoryStore, WriteStore};
use tracing::info;

/// Checkpoints and summaries are persisted as blob files. Files are committed to local store
/// by duration or file size. Committed files are synced with the remote store continuously. Files are
Expand Down Expand Up @@ -273,3 +285,97 @@ pub async fn write_manifest(manifest: Manifest, remote_store: Arc<DynObjectStore
put(&path, bytes, remote_store).await?;
Ok(())
}

pub async fn verify_archive_with_genesis_config(
genesis: &std::path::Path,
remote_store_config: ObjectStoreConfig,
concurrency: usize,
interactive: bool,
) -> Result<()> {
let genesis = Genesis::load(genesis).unwrap();
let genesis_committee = genesis.committee()?;
let mut store = SingleCheckpointSharedInMemoryStore::default();
let contents = genesis.checkpoint_contents();
let fullcheckpoint_contents = FullCheckpointContents::from_contents_and_execution_data(
contents.clone(),
std::iter::once(ExecutionData::new(
genesis.transaction().clone(),
genesis.effects().clone(),
)),
);
store.insert_genesis_state(
genesis.checkpoint(),
VerifiedCheckpointContents::new_unchecked(fullcheckpoint_contents),
genesis_committee,
);
verify_archive_with_local_store(store, remote_store_config, concurrency, interactive).await
}

pub async fn verify_archive_with_local_store<S>(
store: S,
remote_store_config: ObjectStoreConfig,
concurrency: usize,
interactive: bool,
) -> Result<()>
where
S: WriteStore + Clone + Send + 'static,
<S as ReadStore>::Error: std::error::Error,
{
let config = ArchiveReaderConfig {
remote_store_config,
download_concurrency: NonZeroUsize::new(concurrency).unwrap(),
use_for_pruning_watermark: false,
};
let archive_reader = ArchiveReader::new(config)?;
archive_reader.sync_manifest_once().await?;
let latest_checkpoint_in_archive = archive_reader.latest_available_checkpoint().await?;
info!(
"Latest available checkpoint in archive store: {}",
latest_checkpoint_in_archive
);
let latest_checkpoint = store
.get_highest_synced_checkpoint()
.map_err(|_| anyhow!("Failed to read highest synced checkpoint"))?
.sequence_number;
info!("Highest synced checkpoint in db: {latest_checkpoint}");
let txn_counter = Arc::new(AtomicU64::new(0));
let checkpoint_counter = Arc::new(AtomicU64::new(0));
let progress_bar = if interactive {
let progress_bar = ProgressBar::new(latest_checkpoint_in_archive).with_style(
ProgressStyle::with_template("[{elapsed_precise}] {wide_bar} {pos}/{len}({msg})")
.unwrap(),
);
let cloned_progress_bar = progress_bar.clone();
let cloned_counter = txn_counter.clone();
let cloned_checkpoint_counter = checkpoint_counter.clone();
let instant = Instant::now();
tokio::spawn(async move {
loop {
let total_checkpoints_loaded = cloned_checkpoint_counter.load(Ordering::Relaxed);
let total_checkpoints_per_sec =
total_checkpoints_loaded as f64 / instant.elapsed().as_secs_f64();
let total_txns_per_sec =
cloned_counter.load(Ordering::Relaxed) as f64 / instant.elapsed().as_secs_f64();
cloned_progress_bar.set_position(latest_checkpoint + total_checkpoints_loaded);
cloned_progress_bar.set_message(format!(
"checkpoints/s: {}, txns/s: {}",
total_checkpoints_per_sec, total_txns_per_sec
));
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
Some(progress_bar)
} else {
None
};
archive_reader
.read(store.clone(), 1..u64::MAX, txn_counter, checkpoint_counter)
.await?;
progress_bar.iter().for_each(|p| p.finish_and_clear());
let end = store
.get_highest_synced_checkpoint()
.map_err(|_| anyhow!("Failed to read watermark"))?
.sequence_number;
info!("Highest verified checkpoint: {}", end);
Ok(())
}
10 changes: 1 addition & 9 deletions crates/sui-archival/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use futures::{StreamExt, TryStreamExt};
use object_store::DynObjectStore;
use rand::seq::SliceRandom;
use std::future;
use std::num::NonZeroUsize;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use sui_config::node::ArchiveReaderConfig;
use sui_storage::object_store::util::get;
use sui_storage::object_store::ObjectStoreConfig;
use sui_storage::{make_iterator, verify_checkpoint};
use sui_types::messages_checkpoint::{
CertifiedCheckpointSummary, CheckpointSequenceNumber,
Expand All @@ -28,13 +27,6 @@ use tokio::sync::oneshot::Sender;
use tokio::sync::{oneshot, Mutex};
use tracing::info;

#[derive(Debug, Clone)]
pub struct ArchiveReaderConfig {
pub remote_store_config: ObjectStoreConfig,
pub download_concurrency: NonZeroUsize,
pub use_for_pruning_watermark: bool,
}

// ArchiveReaderBalancer selects archives for reading based on whether they can fulfill a checkpoint request
#[derive(Default, Debug, Clone)]
pub struct ArchiveReaderBalancer {
Expand Down
137 changes: 133 additions & 4 deletions crates/sui-archival/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,28 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::reader::{ArchiveReader, ArchiveReaderConfig};
use crate::reader::ArchiveReader;
use crate::writer::ArchiveWriter;
use crate::{read_manifest, write_manifest, Manifest};
use anyhow::{Context, Result};
use crate::{read_manifest, verify_archive_with_local_store, write_manifest, Manifest};
use anyhow::{anyhow, Context, Result};
use more_asserts as ma;
use object_store::DynObjectStore;
use prometheus::Registry;
use std::fs;
use std::fs::File;
use std::io::Write;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use sui_config::node::ArchiveReaderConfig;
use sui_storage::object_store::util::path_to_filesystem;
use sui_storage::object_store::{ObjectStoreConfig, ObjectStoreType};
use sui_storage::{FileCompression, StorageFormat};
use sui_swarm_config::test_utils::{empty_contents, CommitteeFixture};
use sui_types::messages_checkpoint::{VerifiedCheckpoint, VerifiedCheckpointContents};
use sui_types::storage::{ReadStore, SharedInMemoryStore};
use sui_types::storage::{ReadStore, SharedInMemoryStore, SingleCheckpointSharedInMemoryStore};
use tempfile::tempdir;

struct TestState {
Expand All @@ -28,6 +32,8 @@ struct TestState {
remote_path: PathBuf,
local_store: Arc<DynObjectStore>,
remote_store: Arc<DynObjectStore>,
local_store_config: ObjectStoreConfig,
remote_store_config: ObjectStoreConfig,
committee: CommitteeFixture,
}

Expand Down Expand Up @@ -98,6 +104,8 @@ async fn setup_test_state(temp_dir: PathBuf) -> anyhow::Result<TestState> {
remote_path,
local_store,
remote_store,
local_store_config,
remote_store_config,
committee,
})
}
Expand Down Expand Up @@ -241,3 +249,124 @@ async fn test_archive_reader_e2e() -> Result<(), anyhow::Error> {
kill.send(())?;
Ok(())
}

#[tokio::test]
async fn test_verify_archive_with_oneshot_store() -> Result<(), anyhow::Error> {
let test_store = SharedInMemoryStore::default();
let test_state = setup_test_state(temp_dir()).await?;
let kill = test_state.archive_writer.start(test_store.clone()).await?;
let mut latest_archived_checkpoint_seq_num = 0;
while latest_archived_checkpoint_seq_num < 10 {
insert_checkpoints_and_verify_manifest(&test_state, test_store.clone(), None).await?;
let new_latest_archived_checkpoint_seq_num = test_state
.archive_reader
.latest_available_checkpoint()
.await?;
ma::assert_ge!(
new_latest_archived_checkpoint_seq_num,
latest_archived_checkpoint_seq_num
);
latest_archived_checkpoint_seq_num = new_latest_archived_checkpoint_seq_num;
tokio::time::sleep(Duration::from_secs(1)).await;
}
ma::assert_ge!(latest_archived_checkpoint_seq_num, 10);
let genesis_checkpoint = test_store
.get_checkpoint_by_sequence_number(0)?
.context("Missing genesis checkpoint")?;
let genesis_checkpoint_content = test_store
.get_full_checkpoint_contents_by_sequence_number(0)?
.context("Missing genesis checkpoint")?;
let mut read_store = SingleCheckpointSharedInMemoryStore::default();
read_store.insert_genesis_state(
genesis_checkpoint,
VerifiedCheckpointContents::new_unchecked(genesis_checkpoint_content),
test_state.committee.committee().to_owned(),
);

// Verification should pass
assert!(verify_archive_with_local_store(
read_store,
test_state.remote_store_config.clone(),
1,
false
)
.await
.is_ok());
kill.send(())?;
Ok(())
}

#[tokio::test]
async fn test_verify_archive_with_oneshot_store_bad_data() -> Result<(), anyhow::Error> {
let test_store = SharedInMemoryStore::default();
let test_state = setup_test_state(temp_dir()).await?;
let kill = test_state.archive_writer.start(test_store.clone()).await?;
let mut latest_archived_checkpoint_seq_num = 0;
while latest_archived_checkpoint_seq_num < 10 {
insert_checkpoints_and_verify_manifest(&test_state, test_store.clone(), None).await?;
let new_latest_archived_checkpoint_seq_num = test_state
.archive_reader
.latest_available_checkpoint()
.await?;
ma::assert_ge!(
new_latest_archived_checkpoint_seq_num,
latest_archived_checkpoint_seq_num
);
latest_archived_checkpoint_seq_num = new_latest_archived_checkpoint_seq_num;
tokio::time::sleep(Duration::from_secs(1)).await;
}
ma::assert_ge!(latest_archived_checkpoint_seq_num, 10);

// Corrupt the .chk and .sum files in the archive
let dir = fs::read_dir(test_state.remote_path)?;
let mut num_files_corrupted = 0;
for file in dir {
let file = file?;
let file_metadata = file.metadata()?;
if file_metadata.is_dir() {
// epoch dir
let epoch_dir = fs::read_dir(file.path())?;
for epoch_file in epoch_dir {
let epoch_file = epoch_file?;
// epoch dir should only have checkpoint files and no dir
assert!(epoch_file.metadata()?.is_file());
if epoch_file
.file_name()
.into_string()
.map_err(|_| anyhow!("Failed to read file name"))?
.ends_with(".chk")
{
let mut f = File::options().write(true).open(epoch_file.path())?;
f.write_all("hello_world".as_bytes())?;
num_files_corrupted += 1;
}
}
}
}
ma::assert_gt!(num_files_corrupted, 0);
let genesis_checkpoint = test_store
.get_checkpoint_by_sequence_number(0)?
.context("Missing genesis checkpoint")?;
let genesis_checkpoint_content = test_store
.get_full_checkpoint_contents_by_sequence_number(0)?
.context("Missing genesis checkpoint")?;
let mut read_store = SingleCheckpointSharedInMemoryStore::default();
read_store.insert_genesis_state(
genesis_checkpoint,
VerifiedCheckpointContents::new_unchecked(genesis_checkpoint_content),
test_state.committee.committee().to_owned(),
);

// Verification should fail
assert!(verify_archive_with_local_store(
read_store,
test_state.remote_store_config.clone(),
1,
false
)
.await
.is_err());
kill.send(())?;

Ok(())
}
1 change: 0 additions & 1 deletion crates/sui-config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ shared-crypto.workspace = true
sui-keys.workspace = true
sui-protocol-config.workspace = true
sui-storage.workspace = true
sui-archival.workspace = true
sui-types.workspace = true
workspace-hack = { version = "0.1", path = "../workspace-hack" }

Expand Down
Loading

0 comments on commit 04092e7

Please sign in to comment.