Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ledger-tool: Get shreds from BigTable blocks #1638

Merged
merged 12 commits into from
Jun 20, 2024
299 changes: 296 additions & 3 deletions ledger-tool/src/bigtable.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
//! The `bigtable` subcommand
use {
crate::{
args::{load_genesis_arg, snapshot_args},
ledger_path::canonicalize_ledger_path,
load_and_process_ledger_or_exit, open_genesis_config_by,
output::{
encode_confirmed_block, CliBlockWithEntries, CliEntries,
EncodedConfirmedBlockWithEntries,
},
parse_process_options, LoadAndProcessLedgerOutput,
},
clap::{
value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand,
},
crossbeam_channel::unbounded,
futures::stream::FuturesUnordered,
log::{debug, error, info},
log::{debug, error, info, warn},
serde_json::json,
solana_clap_utils::{
input_parsers::pubkey_of,
Expand All @@ -22,11 +25,17 @@ use {
display::println_transaction, CliBlock, CliTransaction, CliTransactionConfirmation,
OutputFormat,
},
solana_entry::entry::{create_ticks, Entry},
solana_ledger::{
bigtable_upload::ConfirmedBlockUploadConfig, blockstore::Blockstore,
bigtable_upload::ConfirmedBlockUploadConfig,
blockstore::Blockstore,
blockstore_options::AccessType,
shred::{ProcessShredsStats, ReedSolomonCache, Shredder},
},
solana_sdk::{
clock::Slot, hash::Hash, pubkey::Pubkey, shred_version::compute_shred_version,
signature::Signature, signer::keypair::keypair_from_seed,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Signature},
solana_storage_bigtable::CredentialType,
solana_transaction_status::{ConfirmedBlock, UiTransactionEncoding, VersionedConfirmedBlock},
std::{
Expand Down Expand Up @@ -164,6 +173,170 @@ async fn entries(
Ok(())
}

struct ShredConfig {
shred_version: u16,
num_hashes_per_tick: u64,
num_ticks_per_slot: u64,
allow_mock_poh: bool,
}

async fn shreds(
blockstore: Arc<Blockstore>,
starting_slot: Slot,
ending_slot: Slot,
shred_config: ShredConfig,
config: solana_storage_bigtable::LedgerStorageConfig,
) -> Result<(), Box<dyn std::error::Error>> {
let bigtable = solana_storage_bigtable::LedgerStorage::new_with_config(config)
.await
.map_err(|err| format!("Failed to connect to storage: {err:?}"))?;

// Make the range inclusive of both starting and ending slot
let limit = ending_slot.saturating_sub(starting_slot).saturating_add(1) as usize;
let mut slots = bigtable.get_confirmed_blocks(starting_slot, limit).await?;
slots.retain(|&slot| slot <= ending_slot);

// Create a "dummy" keypair to sign the shreds that will later be created.
//
// The validator shred ingestion path sigverifies shreds from the network
// using the known leader for any given slot. It is unlikely that a user of
// this tool will have access to these leader identity keypairs. However,
// shred sigverify occurs prior to Blockstore::insert_shreds(). Thus, the
// shreds being signed with the "dummy" keyapir can still be inserted and
// later read/replayed/etc
let keypair = keypair_from_seed(&[0; 64])?;
let ShredConfig {
shred_version,
num_hashes_per_tick,
num_ticks_per_slot,
allow_mock_poh,
} = shred_config;

for slot in slots.iter() {
let block = bigtable.get_confirmed_block(*slot).await?;
let entry_summaries = match bigtable.get_entries(*slot).await {
Ok(summaries) => Some(summaries),
Err(err) => {
let err_msg = format!("Failed to get PoH entries for {slot}: {err}");

if allow_mock_poh {
warn!("{err_msg}. Will create mock PoH entries instead.");
} else {
return Err(format!(
"{err_msg}. Try passing --allow-mock-poh to allow \
creation of shreds with mocked PoH entries"
))?;
}
None
}
};

let entries = match entry_summaries {
Some(entry_summaries) => entry_summaries
.enumerate()
.map(|(i, entry_summary)| {
let num_hashes = entry_summary.num_hashes;
let hash = entry_summary.hash;
let starting_transaction_index = entry_summary.starting_transaction_index;
let num_transactions = entry_summary.num_transactions as usize;

let Some(transactions) = block.transactions.get(
starting_transaction_index..starting_transaction_index + num_transactions,
) else {
let num_block_transactions = block.transactions.len();
return Err(format!(
"Entry summary {i} for slot {slot} with starting_transaction_index \
{starting_transaction_index} and num_transactions {num_transactions} \
is in conflict with the block, which has {num_block_transactions} \
transactions"
));
};
let transactions = transactions
.iter()
.map(|tx_with_meta| tx_with_meta.get_transaction())
.collect();

Ok(Entry {
num_hashes,
hash,
transactions,
})
})
.collect::<Result<Vec<Entry>, std::string::String>>()?,
None => {
let num_total_ticks = ((slot - block.parent_slot) * num_ticks_per_slot) as usize;
let num_total_entries = num_total_ticks + block.transactions.len();
let mut entries = Vec::with_capacity(num_total_entries);

// Create virtual tick entries for any skipped slots
//
// These ticks are necessary so that the tick height is
// advanced to the proper value when this block is processed.
//
// Additionally, a blockhash will still be inserted into the
// recent blockhashes sysvar for skipped slots. So, these
// virtual ticks will have the proper PoH
let num_skipped_slots = slot - block.parent_slot - 1;
if num_skipped_slots > 0 {
let num_virtual_ticks = num_skipped_slots * num_ticks_per_slot;
let parent_blockhash = Hash::from_str(&block.previous_blockhash)?;
let virtual_ticks_entries =
create_ticks(num_virtual_ticks, num_hashes_per_tick, parent_blockhash);
entries.extend(virtual_ticks_entries.into_iter());
}

// Create transaction entries
//
// Keep it simple and just do one transaction per Entry
let transaction_entries = block.transactions.iter().map(|tx_with_meta| Entry {
num_hashes: 0,
hash: Hash::default(),
transactions: vec![tx_with_meta.get_transaction()],
});
entries.extend(transaction_entries.into_iter());

// Create the tick entries for this slot
//
// We do not know the intermediate hashes, so just use default
// hash for all ticks. The exception is the final tick; the
// final tick determines the blockhash so set it the known
// blockhash from the bigtable block
let blockhash = Hash::from_str(&block.blockhash)?;
let tick_entries = (0..num_ticks_per_slot).map(|idx| {
let hash = if idx == num_ticks_per_slot - 1 {
blockhash
} else {
Hash::default()
};
Entry {
num_hashes: 0,
hash,
transactions: vec![],
}
});
entries.extend(tick_entries.into_iter());

entries
}
};

let shredder = Shredder::new(*slot, block.parent_slot, 0, shred_version)?;
let (data_shreds, _coding_shreds) = shredder.entries_to_shreds(
&keypair,
&entries,
true, // last_in_slot
None, // chained_merkle_root
0, // next_shred_index
0, // next_code_index
false, // merkle_variant
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
);
blockstore.insert_shreds(data_shreds, None, false)?;
}
Ok(())
}

async fn blocks(
starting_slot: Slot,
limit: usize,
Expand Down Expand Up @@ -848,6 +1021,45 @@ impl BigTableSubCommand for App<'_, '_> {
.required(true),
),
)
.subcommand(
SubCommand::with_name("shreds")
.about(
"Get confirmed blocks from BigTable, reassemble the transactions \
and entries, shred the block and then insert the shredded blocks into \
the local Blockstore",
)
.arg(load_genesis_arg())
.args(&snapshot_args())
.arg(
Arg::with_name("starting_slot")
.long("starting-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Start shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("ending_slot")
.long("ending-slot")
.validator(is_slot)
.value_name("SLOT")
.takes_value(true)
.required(true)
.help("Stop shred creation at this slot (inclusive)"),
)
.arg(
Arg::with_name("allow_mock_poh")
.long("allow-mock-poh")
.takes_value(false)
.help(
"For slots where PoH entries are unavailable, allow the \
generation of mock PoH entries. The mock PoH entries enable \
the shredded block(s) to be replayable if PoH verification is \
disabled.",
),
),
)
.subcommand(
SubCommand::with_name("confirm")
.about("Confirm transaction by signature")
Expand Down Expand Up @@ -1142,6 +1354,87 @@ pub fn bigtable_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) {
};
runtime.block_on(entries(slot, output_format, config))
}
("shreds", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let ending_slot = value_t_or_exit!(arg_matches, "ending_slot", Slot);
steviez marked this conversation as resolved.
Show resolved Hide resolved
if starting_slot > ending_slot {
eprintln!(
"The specified --starting-slot {starting_slot} must be less than or equal to \
the specified --ending-slot {ending_slot}."
);
exit(1);
}
let allow_mock_poh = arg_matches.is_present("allow_mock_poh");

let ledger_path = canonicalize_ledger_path(ledger_path);
let process_options = parse_process_options(&ledger_path, arg_matches);
let genesis_config = open_genesis_config_by(&ledger_path, arg_matches);
let blockstore = Arc::new(crate::open_blockstore(
&ledger_path,
arg_matches,
AccessType::Primary,
));
let LoadAndProcessLedgerOutput { bank_forks, .. } = load_and_process_ledger_or_exit(
arg_matches,
&genesis_config,
blockstore.clone(),
process_options,
None,
);

let bank = bank_forks.read().unwrap().working_bank();
// If mock PoH is allowed, ensure that the requested slots are in
// the same epoch as the working bank. This will ensure the values
// extracted from the Bank are accurate for the slot range
if allow_mock_poh {
let working_bank_epoch = bank.epoch();
let epoch_schedule = bank.epoch_schedule();
let starting_epoch = epoch_schedule.get_epoch(starting_slot);
let ending_epoch = epoch_schedule.get_epoch(ending_slot);
if starting_epoch != ending_epoch {
eprintln!(
"The specified --starting-slot and --ending-slot must be in the\
same epoch. --starting-slot {starting_slot} is in epoch {starting_epoch},\
but --ending-slot {ending_slot} is in epoch {ending_epoch}."
);
exit(1);
}
if starting_epoch != working_bank_epoch {
eprintln!(
"The range of slots between --starting-slot and --ending-slot are in a \
different epoch than the working bank. The specified range is in epoch \
{starting_epoch}, but the working bank is in {working_bank_epoch}."
);
exit(1);
}
}

let shred_version =
compute_shred_version(&genesis_config.hash(), Some(&bank.hard_forks()));
let num_hashes_per_tick = bank.hashes_per_tick().unwrap_or(0);
let num_ticks_per_slot = bank.ticks_per_slot();
let shred_config = ShredConfig {
shred_version,
num_hashes_per_tick,
num_ticks_per_slot,
allow_mock_poh,
};

let config = solana_storage_bigtable::LedgerStorageConfig {
read_only: true,
instance_name,
app_profile_id,
..solana_storage_bigtable::LedgerStorageConfig::default()
};

runtime.block_on(shreds(
blockstore,
starting_slot,
ending_slot,
shred_config,
config,
))
}
("blocks", Some(arg_matches)) => {
let starting_slot = value_t_or_exit!(arg_matches, "starting_slot", Slot);
let limit = value_t_or_exit!(arg_matches, "limit", usize);
Expand Down