Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove BatchV1 and related code gating BatchV2 #13077

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 5 additions & 46 deletions crates/sui-core/src/consensus_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ use eyre::WrapErr;
use mysten_metrics::monitored_scope;
use prometheus::{register_int_counter_with_registry, IntCounter, Registry};
use std::sync::Arc;
use sui_protocol_config::ProtocolConfig;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
use crate::checkpoints::CheckpointServiceNotify;
use crate::transaction_manager::TransactionManager;
use async_trait::async_trait;
use narwhal_types::{validate_batch_version, BatchAPI};
use narwhal_types::BatchAPI;
use narwhal_worker::TransactionValidator;
use sui_types::messages_consensus::{ConsensusTransaction, ConsensusTransactionKind};
use tap::TapFallible;
Expand Down Expand Up @@ -61,17 +60,9 @@ impl TransactionValidator for SuiTxValidator {
Ok(())
}

async fn validate_batch(
&self,
b: &narwhal_types::Batch,
protocol_config: &ProtocolConfig,
) -> Result<(), Self::Error> {
async fn validate_batch(&self, b: &narwhal_types::Batch) -> Result<(), Self::Error> {
let _scope = monitored_scope("ValidateBatch");

// TODO: Remove once we have upgraded to protocol version 12.
validate_batch_version(b, protocol_config)
.map_err(|err| eyre::eyre!(format!("Invalid Batch: {err}")))?;

let txs = b
.transactions()
.iter()
Expand Down Expand Up @@ -172,7 +163,7 @@ mod tests {
consensus_validator::{SuiTxValidator, SuiTxValidatorMetrics},
};

use narwhal_test_utils::{get_protocol_config, latest_protocol_version};
use narwhal_test_utils::latest_protocol_version;
use narwhal_types::Batch;
use narwhal_worker::TransactionValidator;
use sui_types::signature::GenericSignature;
Expand Down Expand Up @@ -230,9 +221,7 @@ mod tests {
.collect();

let batch = Batch::new(transaction_bytes, latest_protocol_config);
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
let res_batch = validator.validate_batch(&batch).await;
assert!(res_batch.is_ok(), "{res_batch:?}");

let bogus_transaction_bytes: Vec<_> = certificates
Expand All @@ -248,37 +237,7 @@ mod tests {
.collect();

let batch = Batch::new(bogus_transaction_bytes, latest_protocol_config);
let res_batch = validator
.validate_batch(&batch, latest_protocol_config)
.await;
assert!(res_batch.is_err());

// TODO: Remove once we have upgraded to protocol version 12.
// protocol version 11 should only support BatchV1
let protocol_config_v11 = &get_protocol_config(11);
let batch_v1 = Batch::new(vec![], protocol_config_v11);

// Case #1: Receive BatchV1 and network has not upgraded to 12 so we are okay
let res_batch = validator
.validate_batch(&batch_v1, protocol_config_v11)
.await;
assert!(res_batch.is_ok());
// Case #2: Receive BatchV1 but network has upgraded to 12 so we fail because we expect BatchV2
let res_batch = validator
.validate_batch(&batch_v1, latest_protocol_config)
.await;
assert!(res_batch.is_err());

let batch_v2 = Batch::new(vec![], latest_protocol_config);
// Case #3: Receive BatchV2 but network is still in v11 so we fail because we expect BatchV1
let res_batch = validator
.validate_batch(&batch_v2, protocol_config_v11)
.await;
let res_batch = validator.validate_batch(&batch).await;
assert!(res_batch.is_err());
// Case #4: Receive BatchV2 and network is upgraded to 12 so we are okay
let res_batch = validator
.validate_batch(&batch_v2, latest_protocol_config)
.await;
assert!(res_batch.is_ok());
}
}
82 changes: 34 additions & 48 deletions narwhal/executor/src/subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct Inner {
authority_id: AuthorityIdentifier,
worker_cache: WorkerCache,
committee: Committee,
protocol_config: ProtocolConfig,
_protocol_config: ProtocolConfig,
client: NetworkClient,
metrics: Arc<ExecutorMetrics>,
}
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn create_and_run_subscriber(
authority_id: AuthorityIdentifier,
worker_cache: WorkerCache,
committee: Committee,
protocol_config: ProtocolConfig,
_protocol_config: ProtocolConfig,
rx_shutdown: ConditionalBroadcastReceiver,
rx_sequence: metered_channel::Receiver<CommittedSubDag>,
client: NetworkClient,
Expand All @@ -136,7 +136,7 @@ async fn create_and_run_subscriber(
inner: Arc::new(Inner {
authority_id,
committee,
protocol_config,
_protocol_config,
worker_cache,
client,
metrics,
Expand Down Expand Up @@ -376,63 +376,49 @@ impl Subscriber {
}

fn record_fetched_batch_metrics(inner: &Inner, batch: &Batch, digest: &BatchDigest) {
// TODO: Remove once we have upgraded to protocol version 12.
if inner.protocol_config.narwhal_versioned_metadata() {
let metadata = batch.versioned_metadata();
if let Some(received_at) = metadata.received_at() {
let remote_duration = received_at.elapsed().as_secs_f64();
debug!(
"Batch was fetched for execution after being received from another worker {}s ago.",
remote_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["other"])
.observe(remote_duration);
} else {
let local_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
debug!(
"Batch was fetched for execution after being created locally {}s ago.",
local_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["own"])
.observe(local_duration);
};

let batch_fetch_duration = batch
let metadata = batch.versioned_metadata();
if let Some(received_at) = metadata.received_at() {
let remote_duration = received_at.elapsed().as_secs_f64();
debug!(
"Batch was fetched for execution after being received from another worker {}s ago.",
remote_duration
);
inner
.metrics
.batch_execution_local_latency
.with_label_values(&["other"])
.observe(remote_duration);
} else {
let local_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
inner
.metrics
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
"Batch {:?} took {} seconds since it has been created to when it has been fetched for execution",
digest,
batch_fetch_duration,
"Batch was fetched for execution after being created locally {}s ago.",
local_duration
);
} else {
let batch_fetch_duration = batch.metadata().created_at.elapsed().as_secs_f64();
inner
.metrics
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
.batch_execution_local_latency
.with_label_values(&["own"])
.observe(local_duration);
};

let batch_fetch_duration = batch
.versioned_metadata()
.created_at()
.elapsed()
.as_secs_f64();
inner
.metrics
.batch_execution_latency
.observe(batch_fetch_duration);
debug!(
"Batch {:?} took {} seconds since it has been created to when it has been fetched for execution",
digest,
batch_fetch_duration,
);
}
}
}

Expand Down
19 changes: 1 addition & 18 deletions narwhal/network/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use parking_lot::RwLock;
use tokio::{select, time::sleep};
use types::{
error::LocalClientError, FetchBatchesRequest, FetchBatchesResponse, PrimaryToWorker,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage, WorkerToPrimary,
WorkerOthersBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage, WorkerToPrimary,
};

use crate::traits::{PrimaryToWorkerClient, WorkerToPrimaryClient};
Expand Down Expand Up @@ -172,22 +171,6 @@ impl PrimaryToWorkerClient for NetworkClient {

#[async_trait]
impl WorkerToPrimaryClient for NetworkClient {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError> {
let c = self.get_worker_to_primary_handler().await?;
select! {
resp = c.report_our_batch(Request::new(request)) => {
resp.map_err(|e| LocalClientError::Internal(format!("{e:?}")))?;
Ok(())
},
() = self.shutdown_notify.wait() => {
Err(LocalClientError::ShuttingDown)
},
}
}
async fn report_own_batch(
&self,
request: WorkerOwnBatchMessage,
Expand Down
9 changes: 1 addition & 8 deletions narwhal/network/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use crypto::NetworkPublicKey;
use types::{
error::LocalClientError, FetchBatchesRequest, FetchBatchesResponse, FetchCertificatesRequest,
FetchCertificatesResponse, RequestBatchesRequest, RequestBatchesResponse,
WorkerOthersBatchMessage, WorkerOurBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage,
WorkerOthersBatchMessage, WorkerOwnBatchMessage, WorkerSynchronizeMessage,
};

pub trait ReliableNetwork<Request: Clone + Send + Sync> {
Expand Down Expand Up @@ -60,12 +59,6 @@ pub trait PrimaryToWorkerClient {

#[async_trait]
pub trait WorkerToPrimaryClient {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: WorkerOurBatchMessage,
) -> Result<(), LocalClientError>;

async fn report_own_batch(
&self,
request: WorkerOwnBatchMessage,
Expand Down
14 changes: 4 additions & 10 deletions narwhal/node/src/generate_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ use std::{fs::File, io::Write};
use structopt::{clap::arg_enum, StructOpt};
use types::{
Batch, BatchDigest, Certificate, CertificateDigest, Header, HeaderDigest, HeaderV1Builder,
Metadata, MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOurBatchMessage,
WorkerOwnBatchMessage, WorkerSynchronizeMessage,
MetadataV1, VersionedMetadata, WorkerOthersBatchMessage, WorkerOwnBatchMessage,
WorkerSynchronizeMessage,
};

#[allow(clippy::mutable_key_type)]
Expand Down Expand Up @@ -112,12 +112,7 @@ fn get_registry() -> Result<Registry> {
);
tracer.trace_value(&mut samples, &worker_index)?;

let our_batch = WorkerOurBatchMessage {
digest: BatchDigest([0u8; 32]),
worker_id: 0,
metadata: Metadata { created_at: 0 },
};
let our_batch_v2 = WorkerOwnBatchMessage {
let own_batch = WorkerOwnBatchMessage {
digest: BatchDigest([0u8; 32]),
worker_id: 0,
metadata: VersionedMetadata::V1(MetadataV1 {
Expand All @@ -135,8 +130,7 @@ fn get_registry() -> Result<Registry> {
is_certified: true,
};

tracer.trace_value(&mut samples, &our_batch)?;
tracer.trace_value(&mut samples, &our_batch_v2)?;
tracer.trace_value(&mut samples, &own_batch)?;
tracer.trace_value(&mut samples, &others_batch)?;
tracer.trace_value(&mut samples, &sync)?;

Expand Down
18 changes: 0 additions & 18 deletions narwhal/node/tests/staged/narwhal.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ AuthorityIdentifier:
Batch:
ENUM:
0:
V1:
NEWTYPE:
TYPENAME: BatchV1
1:
V2:
NEWTYPE:
TYPENAME: BatchV2
Expand All @@ -16,13 +12,6 @@ BatchDigest:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
BatchV1:
STRUCT:
- transactions:
SEQ:
SEQ: U8
- metadata:
TYPENAME: Metadata
BatchV2:
STRUCT:
- transactions:
Expand Down Expand Up @@ -113,13 +102,6 @@ WorkerOthersBatchMessage:
- digest:
TYPENAME: BatchDigest
- worker_id: U32
WorkerOurBatchMessage:
STRUCT:
- digest:
TYPENAME: BatchDigest
- worker_id: U32
- metadata:
TYPENAME: Metadata
WorkerOwnBatchMessage:
STRUCT:
- digest:
Expand Down
30 changes: 1 addition & 29 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use types::{
FetchCertificatesResponse, Header, HeaderAPI, MetadataAPI, PreSubscribedBroadcastSender,
PrimaryToPrimary, PrimaryToPrimaryServer, RequestVoteRequest, RequestVoteResponse, Round,
SendCertificateRequest, SendCertificateResponse, Vote, VoteInfoAPI, WorkerOthersBatchMessage,
WorkerOurBatchMessage, WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
WorkerOwnBatchMessage, WorkerToPrimary, WorkerToPrimaryServer,
};

#[cfg(any(test))]
Expand Down Expand Up @@ -1015,34 +1015,6 @@ struct WorkerReceiverHandler {

#[async_trait]
impl WorkerToPrimary for WorkerReceiverHandler {
// TODO: Remove once we have upgraded to protocol version 12.
async fn report_our_batch(
&self,
request: anemo::Request<WorkerOurBatchMessage>,
) -> Result<anemo::Response<()>, anemo::rpc::Status> {
let message = request.into_body();

let (tx_ack, rx_ack) = oneshot::channel();
let response = self
.tx_our_digests
.send(OurDigestMessage {
digest: message.digest,
worker_id: message.worker_id,
timestamp: message.metadata.created_at,
ack_channel: Some(tx_ack),
})
.await
.map(|_| anemo::Response::new(()))
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

// If we are ok, then wait for the ack
rx_ack
.await
.map_err(|e| anemo::rpc::Status::internal(e.to_string()))?;

Ok(response)
}

async fn report_own_batch(
&self,
request: anemo::Request<WorkerOwnBatchMessage>,
Expand Down
Loading