Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

[wip] quick hack to enable parallel processing of entries #1515

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 2 additions & 49 deletions src/replicate_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,54 +81,8 @@ impl ReplicateStage {
.to_owned(),
);

let mut res = Ok(());
let last_entry_id = {
let mut num_entries_to_write = entries.len();
for (i, entry) in entries.iter().enumerate() {
res = bank.process_entry(&entry);
Bank::process_entry_votes(
&bank,
&entry,
*entry_height + i as u64 + 1,
&mut *leader_scheduler.write().unwrap(),
);

{
let ls_lock = leader_scheduler.read().unwrap();
if ls_lock.is_leader_rotation_height(
// i is zero indexed, so current entry height for this entry is actually the
// old entry height + i + 1
*entry_height + i as u64 + 1,
) {
let my_id = keypair.pubkey();
let scheduled_leader =
ls_lock.get_scheduled_leader(*entry_height + i as u64 + 1).expect("Scheduled leader id should never be unknown while processing entries");
cluster_info.write().unwrap().set_leader(scheduled_leader);
if my_id == scheduled_leader {
num_entries_to_write = i + 1;
break;
}
}
}

if res.is_err() {
// TODO: This will return early from the first entry that has an erroneous
// transaction, instad of processing the rest of the entries in the vector
// of received entries. This is in line with previous behavior when
// bank.process_entries() was used to process the entries, but doesn't solve the
// issue that the bank state was still changed, leading to inconsistencies with the
// leader as the leader currently should not be publishing erroneous transactions
break;
}
}

// If leader rotation happened, only write the entries up to leader rotation.
entries.truncate(num_entries_to_write);
entries
.last()
.expect("Entries cannot be empty at this point")
.id
};
bank.process_entries(&entries)?;
let last_entry_id = bank.last_id();

if let Some(sender) = vote_blob_sender {
send_validator_vote(bank, keypair, cluster_info, sender)?;
Expand All @@ -151,7 +105,6 @@ impl ReplicateStage {
}

*entry_height += entries_len;
res?;
Ok(last_entry_id)
}

Expand Down