Skip to content

Commit

Permalink
[revert] revert all changes on committed sub dag timestamp (MystenLab…
Browse files Browse the repository at this point in the history
…s#10427)

## Description 

Reverting changes as those will be incompatible via protocol upgrade:
* MystenLabs#10394
* MystenLabs#10393
* MystenLabs#10210

## Test Plan 


---
If your changes are not user-facing and not a breaking change, you can
skip the following section. Otherwise, please indicate what changed, and
then add to the Release Notes section as highlighted during the release
process.

### Type of Change (Check all that apply)

- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
akichidis authored Apr 5, 2023
1 parent 73d30ba commit bb06515
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 237 deletions.
6 changes: 1 addition & 5 deletions crates/sui-core/src/authority/epoch_start_configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use enum_dispatch::enum_dispatch;
use serde::{Deserialize, Serialize};

use sui_types::epoch_data::EpochData;
use sui_types::messages_checkpoint::{CheckpointDigest, CheckpointTimestamp};
use sui_types::messages_checkpoint::CheckpointDigest;
use sui_types::sui_system_state::epoch_start_sui_system_state::{
EpochStartSystemState, EpochStartSystemStateTrait,
};
Expand Down Expand Up @@ -42,10 +42,6 @@ impl EpochStartConfiguration {
self.epoch_digest(),
)
}

pub fn epoch_start_timestamp_ms(&self) -> CheckpointTimestamp {
self.epoch_start_state().epoch_start_timestamp_ms()
}
}

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
Expand Down
26 changes: 9 additions & 17 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sui_types::messages::{
ConsensusTransaction, ConsensusTransactionKey, ConsensusTransactionKind,
VerifiedExecutableTransaction, VerifiedTransaction,
};

use sui_types::storage::ParentSync;

use tracing::{debug, error, instrument};
Expand Down Expand Up @@ -130,22 +131,8 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

/* (serialized, transaction, output_cert) */
let mut transactions = vec![];
let timestamp = consensus_output.sub_dag.commit_timestamp;
let leader_author = consensus_output.sub_dag.leader.header().author();

let epoch_start = self
.epoch_store
.epoch_start_config()
.epoch_start_timestamp_ms();
let timestamp = if timestamp < epoch_start {
error!(
"Unexpected commit timestamp {timestamp} less then epoch start time {epoch_start}, author {leader_author}, round {round}",

);
epoch_start
} else {
timestamp
};
// Narwhal enforces some invariants on the header.created_at, so we can use it as a timestamp
let timestamp = *consensus_output.sub_dag.leader.header().created_at();

let prologue_transaction = self.consensus_commit_prologue_transaction(round, timestamp);
transactions.push((
Expand All @@ -165,7 +152,12 @@ impl<T: ParentSync + Send + Sync> ExecutionState for ConsensusHandler<T> {

self.metrics
.consensus_committed_subdags
.with_label_values(&[&leader_author.to_string()])
.with_label_values(&[&consensus_output
.sub_dag
.leader
.header()
.author()
.to_string()])
.inc();
for (cert, batches) in consensus_output.batches {
let author = cert.header().author();
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/benches/process_certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub fn process_certificates(c: &mut Criterion) {
let store = make_consensus_store(&store_path);
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));

let mut state = ConsensusState::new(metrics.clone(), gc_depth);
let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);

let data_size: usize = certificates
.iter()
Expand Down
61 changes: 27 additions & 34 deletions narwhal/consensus/src/bullshark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ impl ConsensusProtocol for Bullshark {
.iter()
.rev()
{
let sub_dag_index = state.next_sub_dag_index();
let sub_dag_index = state.latest_sub_dag_index + 1;
let _span = error_span!("bullshark_process_sub_dag", sub_dag_index);

debug!("Leader {:?} has enough support", leader);
Expand All @@ -178,23 +178,23 @@ impl ConsensusProtocol for Bullshark {

total_committed_certificates += sequence.len();

// We resolve the reputation score that should be stored alongside with this sub dag.
let reputation_score = self.resolve_reputation_score(state, &sequence, sub_dag_index);
// We update the reputation score stored in state
let reputation_score = self.update_reputation_score(state, &sequence, sub_dag_index);

let sub_dag = CommittedSubDag::new(
sequence,
leader.clone(),
let sub_dag = CommittedSubDag {
certificates: sequence,
leader: leader.clone(),
sub_dag_index,
reputation_score,
state.last_committed_sub_dag.as_ref(),
);
};

// Persist the update.
self.store
.write_consensus_state(&state.last_committed, &sub_dag)?;

// Update the last sub dag
state.last_committed_sub_dag = Some(sub_dag.clone());
// Increase the global consensus index.
state.latest_sub_dag_index = sub_dag_index;
state.last_committed_leader = Some(sub_dag.leader.digest());

committed_sub_dags.push(sub_dag);
}
Expand Down Expand Up @@ -302,59 +302,52 @@ impl Bullshark {
dag.get(&round).and_then(|x| x.get(&leader))
}

/// Calculates the reputation score for the current commit by taking into account the reputation
/// scores from the previous commit (assuming that exists). It returns the updated reputation score.
fn resolve_reputation_score(
&self,
/// Updates and calculates the reputation score for the current commit managing any internal state.
/// It returns the updated reputation score.
fn update_reputation_score(
&mut self,
state: &mut ConsensusState,
committed_sequence: &[Certificate],
sub_dag_index: u64,
) -> ReputationScores {
// we reset the scores for every schedule change window, or initialise when it's the first
// sub dag we are going to create.
// we reset the scores for every schedule change window.
// TODO: when schedule change is implemented we should probably change a little bit
// this logic here.
let mut reputation_score =
if sub_dag_index == 1 || sub_dag_index % self.num_sub_dags_per_schedule == 0 {
ReputationScores::new(&self.committee)
} else {
state
.last_committed_sub_dag
.as_ref()
.expect("Committed sub dag should always exist for sub_dag_index > 1")
.reputation_score
.clone()
};
if sub_dag_index % self.num_sub_dags_per_schedule == 0 {
state.last_consensus_reputation_score = ReputationScores::new(&self.committee)
}

// update the score for the previous leader. If no previous leader exists,
// then this is the first time we commit a leader, so no score update takes place
if let Some(last_committed_sub_dag) = state.last_committed_sub_dag.as_ref() {
if let Some(previous_leader) = state.last_committed_leader {
for certificate in committed_sequence {
// TODO: we could iterate only the certificates of the round above the previous leader's round
if certificate
.header()
.parents()
.iter()
.any(|digest| *digest == last_committed_sub_dag.leader.digest())
.any(|digest| *digest == previous_leader)
{
reputation_score.add_score(certificate.origin(), 1);
state
.last_consensus_reputation_score
.add_score(certificate.origin(), 1);
}
}
}

// we check if this is the last sub dag of the current schedule. If yes then we mark the
// we check if this is the last subdag of the current schedule. If yes then we mark the
// scores as final_of_schedule = true so any downstream user can now that those are the last
// ones calculated for the current schedule.
reputation_score.final_of_schedule =
state.last_consensus_reputation_score.final_of_schedule =
(sub_dag_index + 1) % self.num_sub_dags_per_schedule == 0;

// Always ensure that all the authorities are present in the reputation scores - even
// when score is zero.
assert_eq!(
reputation_score.total_authorities() as usize,
state.last_consensus_reputation_score.total_authorities() as usize,
self.committee.size()
);

reputation_score
state.last_consensus_reputation_score.clone()
}
}
69 changes: 24 additions & 45 deletions narwhal/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use tokio::{sync::watch, task::JoinHandle};
use tracing::{debug, info, instrument};
use types::{
metered_channel, Certificate, CertificateAPI, CertificateDigest, CommittedSubDag,
CommittedSubDagShell, ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI, Round,
Timestamp,
CommittedSubDagShell, ConditionalBroadcastReceiver, ConsensusStore, HeaderAPI,
ReputationScores, Round, Timestamp,
};

#[cfg(test)]
Expand All @@ -39,8 +39,13 @@ pub struct ConsensusState {
/// Keeps the last committed round for each authority. This map is used to clean up the dag and
/// ensure we don't commit twice the same certificate.
pub last_committed: HashMap<AuthorityIdentifier, Round>,
/// The last committed sub dag. If value is None, it means that we haven't committed any sub dag yet.
pub last_committed_sub_dag: Option<CommittedSubDag>,
/// Used to populate the index in the sub-dag construction.
pub latest_sub_dag_index: SequenceNumber,
/// The last calculated consensus reputation score
pub last_consensus_reputation_score: ReputationScores,
/// The last committed sub dag leader. This allow us to calculate the reputation score of the nodes
/// that vote for the last leader.
pub last_committed_leader: Option<CertificateDigest>,
/// Keeps the latest committed certificate (and its parents) for every authority. Anything older
/// must be regularly cleaned up through the function `update`.
pub dag: Dag,
Expand All @@ -49,13 +54,15 @@ pub struct ConsensusState {
}

impl ConsensusState {
pub fn new(metrics: Arc<ConsensusMetrics>, gc_depth: Round) -> Self {
pub fn new(metrics: Arc<ConsensusMetrics>, committee: &Committee, gc_depth: Round) -> Self {
Self {
last_round: ConsensusRound::default(),
gc_depth,
last_committed: Default::default(),
latest_sub_dag_index: 0,
dag: Default::default(),
last_committed_sub_dag: None,
last_consensus_reputation_score: ReputationScores::new(committee),
last_committed_leader: None,
metrics,
}
}
Expand All @@ -67,58 +74,38 @@ impl ConsensusState {
recovered_last_committed: HashMap<AuthorityIdentifier, Round>,
latest_sub_dag: Option<CommittedSubDagShell>,
cert_store: CertificateStore,
committee: &Committee,
) -> Self {
let last_round = ConsensusRound::new_with_gc_depth(last_committed_round, gc_depth);

let dag = Self::construct_dag_from_cert_store(
&cert_store,
cert_store,
&recovered_last_committed,
last_round.gc_round,
)
.expect("error when recovering DAG from store");
metrics.recovered_consensus_state.inc();

let last_committed_sub_dag = if let Some(latest_sub_dag) = latest_sub_dag.as_ref() {
let certificates = latest_sub_dag
.certificates
.iter()
.map(|s| {
cert_store
.read(*s)
.unwrap()
.expect("Certificate should be found in database")
})
.collect();

let leader = cert_store
.read(latest_sub_dag.leader)
.unwrap()
.expect("Certificate should be found in database");

Some(CommittedSubDag {
certificates,
leader,
sub_dag_index: latest_sub_dag.sub_dag_index,
reputation_score: latest_sub_dag.reputation_score.clone(),
commit_timestamp: latest_sub_dag.commit_timestamp,
})
} else {
None
};
let (latest_sub_dag_index, last_consensus_reputation_score, last_committed_leader) =
latest_sub_dag
.map(|s| (s.sub_dag_index, s.reputation_score, Some(s.leader)))
.unwrap_or((0, ReputationScores::new(committee), None));

Self {
gc_depth,
last_round,
last_committed: recovered_last_committed,
last_committed_sub_dag,
last_consensus_reputation_score,
latest_sub_dag_index,
last_committed_leader,
dag,
metrics,
}
}

#[instrument(level = "info", skip_all)]
pub fn construct_dag_from_cert_store(
cert_store: &CertificateStore,
cert_store: CertificateStore,
last_committed: &HashMap<AuthorityIdentifier, Round>,
gc_round: Round,
) -> Result<Dag, ConsensusError> {
Expand Down Expand Up @@ -242,15 +229,6 @@ impl ConsensusState {
panic!("Parent round not found in DAG for {certificate:?}!");
}
}

/// Provides the next index to be used for the next produced sub dag
pub fn next_sub_dag_index(&self) -> SequenceNumber {
self.last_committed_sub_dag
.as_ref()
.map(|s| s.sub_dag_index)
.unwrap_or_default()
+ 1
}
}

/// Describe how to sequence input certificates.
Expand Down Expand Up @@ -371,6 +349,7 @@ where
recovered_last_committed,
latest_sub_dag,
cert_store,
&committee,
);

tx_consensus_round_updates
Expand Down
12 changes: 6 additions & 6 deletions narwhal/consensus/src/tests/bullshark_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ async fn delayed_certificates_are_rejected() {
test_utils::make_certificates_with_epoch(&committee, 1..=5, epoch, &genesis, &ids);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone(), gc_depth);
let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);
let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

// Populate DAG with the rounds up to round 5 so we trigger commits
Expand Down Expand Up @@ -618,7 +618,7 @@ async fn submitting_equivocating_certificate_should_error() {
test_utils::make_certificates_with_epoch(&committee, 1..=1, epoch, &genesis, &ids);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone(), gc_depth);
let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);
let mut bullshark =
Bullshark::new(committee.clone(), store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

Expand Down Expand Up @@ -676,7 +676,7 @@ async fn reset_consensus_scores_on_every_schedule_change() {
test_utils::make_certificates_with_epoch(&committee, 1..=50, epoch, &genesis, &ids);

let store = make_consensus_store(&test_utils::temp_dir());
let mut state = ConsensusState::new(metrics.clone(), gc_depth);
let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);
let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

// Populate DAG with the rounds up to round 50 so we trigger commits
Expand Down Expand Up @@ -853,7 +853,7 @@ async fn garbage_collection_basic() {
let store = make_consensus_store(&test_utils::temp_dir());

let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH);
let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH);
let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

// Now start feeding the certificates per round
Expand Down Expand Up @@ -953,7 +953,7 @@ async fn slow_node() {
// Create Bullshark consensus engine
let store = make_consensus_store(&test_utils::temp_dir());
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH);
let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH);
let mut bullshark =
Bullshark::new(committee.clone(), store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

Expand Down Expand Up @@ -1126,7 +1126,7 @@ async fn not_enough_support_and_missing_leaders_and_gc() {
// Create Bullshark consensus engine
let store = make_consensus_store(&test_utils::temp_dir());
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let mut state = ConsensusState::new(metrics.clone(), GC_DEPTH);
let mut state = ConsensusState::new(metrics.clone(), &committee, GC_DEPTH);
let mut bullshark = Bullshark::new(committee, store, metrics, NUM_SUB_DAGS_PER_SCHEDULE);

let mut committed = false;
Expand Down
2 changes: 1 addition & 1 deletion narwhal/consensus/src/tests/randomized_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ fn generate_and_run_execution_plans(

// Now create a new Bullshark engine
let metrics = Arc::new(ConsensusMetrics::new(&Registry::new()));
let mut state = ConsensusState::new(metrics.clone(), gc_depth);
let mut state = ConsensusState::new(metrics.clone(), &committee, gc_depth);
let mut bullshark = Bullshark::new(
committee.clone(),
store.clone(),
Expand Down
Loading

0 comments on commit bb06515

Please sign in to comment.