Skip to content

Commit

Permalink
Submit all certificates to consensus (MystenLabs#5772)
Browse files Browse the repository at this point in the history
This change submits all certificates to consensus. Owned object certificates are submitted along with shared object certificates.

`handle_certificate` however only blocks on shared objects certificates before executing them. Owned object certificates are processed right away without waiting for them to be sequenced in consensus.

Note that more work needs to be done to make sure that all owned object certificates are sequenced before end of epoch, this PR adds TODO to `ConsensusAdapter` for that.

This PR also changes slightly how inflight metrics are handled, and fixes an issue there - previously inflight counter was not decreased if certificate timed out.

Some notes on how this expected to affect existing environments:

* For private testnet [consensus utilization](http://grafana.shared.internal.sui.io:3000/goto/5lZ2pjN4k?orgId=1) is around 20% on slowest nodes. This is expected to double as private testnet has 50/50 of shared and owned objects transactions. 40% utilization is still ok and have some headroom for recovery

* For dev net [consensus utilization](http://grafana.shared.internal.sui.io:3000/goto/5GropjN4z?orgId=1) reaches 15% at peak. This peak corresponds to [~100 TPS](http://grafana.shared.internal.sui.io:3000/goto/NIc-pjNVk?orgId=1). [Owned object TPS](http://grafana.shared.internal.sui.io:3000/goto/6uefpjH4k?orgId=1) is around ~10, so adding owned object certificates should not add too much load as well.
  • Loading branch information
andll authored Nov 2, 2022
1 parent 9b95020 commit 1aba919
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 74 deletions.
21 changes: 9 additions & 12 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2221,13 +2221,6 @@ impl AuthorityState {
}

fn verify_narwhal_transaction(&self, certificate: &CertifiedTransaction) -> SuiResult {
// Ensure the input is a shared object certificate. Remember that Byzantine authorities
// may input anything into consensus.
fp_ensure!(
certificate.contains_shared_object(),
SuiError::NotASharedObjectTransaction
);

// Check the certificate. Remember that Byzantine authorities may input anything into
// consensus.
certificate.verify_signatures(&self.committee.load())
Expand Down Expand Up @@ -2326,11 +2319,15 @@ impl AuthorityState {
Some(certificate.clone()),
)])?;

self.database
.lock_shared_objects(&certificate, consensus_index)
.await?;

Ok(())
if certificate.contains_shared_object() {
self.database
.lock_shared_objects(&certificate, consensus_index)
.await
} else {
self.database
.record_owned_object_cert_from_consensus(&certificate, consensus_index)
.await
}
}
ConsensusTransactionKind::Checkpoint(fragment) => {
match &fragment.message {
Expand Down
36 changes: 30 additions & 6 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1129,8 +1129,17 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
.contains_key(digest)?)
}

pub async fn record_owned_object_cert_from_consensus(
&self,
certificate: &VerifiedCertificate,
consensus_index: ExecutionIndicesWithHash,
) -> Result<(), SuiError> {
let write_batch = self.epoch_tables().last_consensus_index.batch();
self.finish_consensus_message_process(write_batch, certificate, consensus_index)
}

/// Lock a sequence number for the shared objects of the input transaction. Also update the
/// last consensus index.
/// last consensus index and consensus_message_processed table.
/// This function must only be called from the consensus task (i.e. from handle_consensus_transaction).
///
/// Caller is responsible to call consensus_message_processed before this method
Expand Down Expand Up @@ -1176,7 +1185,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
"locking shared objects");

// Make an iterator to update the last consensus index.
let index_to_write = iter::once((LAST_CONSENSUS_INDEX_ADDR, consensus_index));

// Holding _tx_lock avoids the following race:
// - we check effects_exist, returns false
Expand Down Expand Up @@ -1218,13 +1226,29 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {

write_batch = write_batch
.insert_batch(&self.epoch_tables().next_object_versions, schedule_to_write)?;
write_batch =
write_batch.insert_batch(&self.epoch_tables().last_consensus_index, index_to_write)?;
write_batch = write_batch.insert_batch(

self.finish_consensus_message_process(write_batch, certificate, consensus_index)
}

/// When we finish processing certificate from consensus we record this information.
/// Tables updated:
/// * consensus_message_processed - indicate that this certificate was processed by consensus
/// * last_consensus_index - records last processed position in consensus stream
/// Self::consensus_message_processed returns true after this call for given certificate
fn finish_consensus_message_process(
&self,
mut batch: DBBatch,
certificate: &VerifiedCertificate,
consensus_index: ExecutionIndicesWithHash,
) -> SuiResult {
let transaction_digest = *certificate.digest();
let index_to_write = iter::once((LAST_CONSENSUS_INDEX_ADDR, consensus_index));
batch = batch.insert_batch(&self.epoch_tables().last_consensus_index, index_to_write)?;
batch = batch.insert_batch(
&self.epoch_tables().consensus_message_processed,
iter::once((transaction_digest, true)),
)?;
write_batch.write().map_err(SuiError::from)
Ok(batch.write()?)
}

pub fn transactions_in_seq_range(
Expand Down
27 changes: 17 additions & 10 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,13 +386,15 @@ impl ValidatorService {
metrics: Arc<ValidatorServiceMetrics>,
) -> Result<tonic::Response<TransactionInfoResponse>, tonic::Status> {
let certificate = request.into_inner();
let is_consensus_tx = certificate.contains_shared_object();
let shared_object_tx = certificate.contains_shared_object();

let _metrics_guard = start_timer(if is_consensus_tx {
metrics.handle_certificate_consensus_latency.clone()
let _metrics_guard = if shared_object_tx {
metrics.handle_certificate_consensus_latency.start_timer()
} else {
metrics.handle_certificate_non_consensus_latency.clone()
});
metrics
.handle_certificate_non_consensus_latency
.start_timer()
};

// 1) Check if cert already executed
let tx_digest = *certificate.digest();
Expand All @@ -411,16 +413,21 @@ impl ValidatorService {
return Err(tonic::Status::from(SuiError::ValidatorHaltedAtEpochEnd));
}

// 4) If it's a shared object transaction and requires consensus, we need to do so.
// This will wait until either timeout or we have heard back from consensus.
if is_consensus_tx && !state.consensus_message_processed(&certificate)? {
// 4) All certificates are sent to consensus (at least by some authorities)
// For shared objects this will wait until either timeout or we have heard back from consensus.
// For owned objects this will return without waiting for certificate to be sequenced
if !state.consensus_message_processed(&certificate)? {
// Note that num_inflight_transactions() only include user submitted transactions, and only user txns can be dropped here.
// This backpressure should not affect system transactions, e.g. for checkpointing.
if consensus_adapter.num_inflight_transactions() > MAX_PENDING_CONSENSUS_TRANSACTIONS {
return Err(tonic::Status::resource_exhausted("Reached {MAX_PENDING_CONSENSUS_TRANSACTIONS} concurrent consensus transactions",
));
}
let _metrics_guard = start_timer(metrics.consensus_latency.clone());
let _metrics_guard = if shared_object_tx {
Some(metrics.consensus_latency.start_timer())
} else {
None
};
consensus_adapter.submit(&state.name, &certificate).await?;
}

Expand All @@ -446,7 +453,7 @@ impl ValidatorService {
// if this validator hasn't executed some of the causal dependencies.
// And that's ok because there must exist 2f+1 that has. So we can
// afford this validator returning error.
err @ Err(SuiError::TransactionInputObjectsErrors { .. }) if is_consensus_tx => {
err @ Err(SuiError::TransactionInputObjectsErrors { .. }) if shared_object_tx => {
if retry_delay_ms >= 12800 {
return Err(tonic::Status::from(err.unwrap_err()));
}
Expand Down
88 changes: 59 additions & 29 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,19 +272,28 @@ impl ConsensusAdapter {
.expect("Serializing consensus transaction cannot fail");
let bytes = Bytes::from(serialized.clone());

// Notify the consensus listener that we are expecting to process this certificate.
let (waiter, signals) = ConsensusWaiter::new();

let consensus_input = ConsensusListenerMessage::New(serialized, signals);
self.tx_consensus_listener
.send(consensus_input)
.await
.expect("Failed to notify consensus listener");
let waiter = if certificate.contains_shared_object() {
// Notify the consensus listener that we are expecting to process this certificate.
let (waiter, signals) = ConsensusWaiter::new();

let consensus_input = ConsensusListenerMessage::New(serialized, signals);
self.tx_consensus_listener
.send(consensus_input)
.await
.expect("Failed to notify consensus listener");
Some(waiter)
} else {
None
};
// Check if this authority submits the transaction to consensus.
let now = Instant::now();
let _timer = self
.opt_metrics
.as_ref()
.map(|m| m.sequencing_certificate_latency.start_timer());
let should_submit = Self::should_submit(certificate);
if should_submit {
let _inflight_guard = if should_submit {
// todo - we need stronger guarantees for checkpoints here (issue #5763)
// todo - for owned objects this can also be done async
self.consensus_client
.clone()
.submit_transaction(TransactionProto { transaction: bytes })
Expand All @@ -293,14 +302,16 @@ impl ConsensusAdapter {
.tap_err(|r| {
error!("Submit transaction failed with: {:?}", r);
})?;
let inflight = self
.num_inflight_transactions
.fetch_add(1, Ordering::SeqCst);
self.opt_metrics.as_ref().map(|metrics| {
metrics.sequencing_certificate_attempt.inc();
metrics.sequencing_certificate_inflight.set(inflight as i64);
});
}
Some(InflightDropGuard::acquire(self))
} else {
None
};

let waiter = if let Some(waiter) = waiter {
waiter
} else {
return Ok(());
};

// TODO: make consensus guarantee delivery after submit_transaction() returns, and avoid the timeout below.
let result = match timeout(self.timeout, waiter.wait_for_result()).await {
Expand Down Expand Up @@ -332,19 +343,38 @@ impl ConsensusAdapter {
}
};

if should_submit {
let inflight = self
.num_inflight_transactions
.fetch_sub(1, Ordering::SeqCst);
let elapsed_secs = now.elapsed().as_secs_f64();
// Store the latest latency
self.opt_metrics.as_ref().map(|metrics| {
metrics.sequencing_certificate_inflight.set(inflight as i64);
metrics.sequencing_certificate_latency.observe(elapsed_secs);
});
result
}
}

/// Tracks number of inflight consensus requests and relevant metrics
struct InflightDropGuard<'a> {
adapter: &'a ConsensusAdapter,
}

impl<'a> InflightDropGuard<'a> {
pub fn acquire(adapter: &'a ConsensusAdapter) -> Self {
let inflight = adapter
.num_inflight_transactions
.fetch_add(1, Ordering::SeqCst);
if let Some(metrics) = adapter.opt_metrics.as_ref() {
metrics.sequencing_certificate_attempt.inc();
metrics.sequencing_certificate_inflight.set(inflight as i64);
}
Self { adapter }
}
}

result
impl<'a> Drop for InflightDropGuard<'a> {
fn drop(&mut self) {
let inflight = self
.adapter
.num_inflight_transactions
.fetch_sub(1, Ordering::SeqCst);
// Store the latest latency
if let Some(metrics) = self.adapter.opt_metrics.as_ref() {
metrics.sequencing_certificate_inflight.set(inflight as i64);
}
}
}

Expand Down
19 changes: 2 additions & 17 deletions crates/sui/tests/shared_objects_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,23 +292,8 @@ async fn shared_object_sync() {
.unwrap();
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

// Let's submit the transaction to the out-of-date authorities.
// Right now grpc doesn't send back the error message in a recoverable way.
// Ideally we expect Err(SuiError::SharedObjectLockingFailure(_)).
let _err = submit_shared_object_transaction(
increment_counter_transaction.clone(),
&configs.validator_set()[1..],
)
.await
.unwrap_err();

// Now send the missing certificates to the outdated authorities. We also re-send
// the transaction to the first authority who should simply ignore it.
let effects =
submit_single_owner_transaction(create_counter_transaction, configs.validator_set()).await;
assert!(matches!(effects.status, ExecutionStatus::Success { .. }));

// Now we can try again with the shared-object transaction who failed before.
// Submit transactions to out-of-date authorities.
// It will succeed because we share owned object certificates through narwhal
let effects = submit_shared_object_transaction(
increment_counter_transaction,
&configs.validator_set()[1..],
Expand Down

0 comments on commit 1aba919

Please sign in to comment.