Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[follower] Remove transaction from batch, and verify the digests received. #1431

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions sui_core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ impl crate::authority::AuthorityState {
if !transactions.is_empty() {
// Make a new batch, to put the old transactions not in a batch in.
let last_signed_batch = SignedBatch::new(
AuthorityBatch::make_next(&last_batch, &transactions[..]),
// Unwrap safe due to check not empty
AuthorityBatch::make_next(&last_batch, &transactions[..])?,
&*self.secret,
self.name,
);
Expand Down Expand Up @@ -155,13 +156,15 @@ impl crate::authority::AuthorityState {

// Logic to make a batch
if make_batch {
// Test it is not empty.
if current_batch.is_empty() {
continue;
}

// Make and store a new batch.
let new_batch = SignedBatch::new(
AuthorityBatch::make_next(&prev_batch, &current_batch),
// Unwrap safe since we tested above it is not empty
AuthorityBatch::make_next(&prev_batch, &current_batch).unwrap(),
&*self.secret,
self.name,
);
Expand Down
111 changes: 67 additions & 44 deletions sui_core/src/safe_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::io;
use sui_types::crypto::PublicKeyBytes;
use sui_types::{base_types::*, committee::*, fp_ensure};

use sui_types::batch::{AuthorityBatch, SignedBatch, UpdateItem};
use sui_types::batch::{AuthorityBatch, SignedBatch, TxSequenceNumber, UpdateItem};
use sui_types::{
error::{SuiError, SuiResult},
messages::*,
Expand Down Expand Up @@ -174,6 +174,10 @@ impl<C> SafeClient<C> {
&self,
request: BatchInfoRequest,
signed_batch: &SignedBatch,
transactions_and_last_batch: &Option<(
Vec<(TxSequenceNumber, TransactionDigest)>,
AuthorityBatch,
)>,
) -> SuiResult {
// check the signature of the batch
signed_batch
Expand All @@ -190,21 +194,20 @@ impl<C> SafeClient<C> {
}
);

// reconstruct the batch and make sure the constructed digest matches the provided one
let provided_digest = signed_batch.batch.transactions_digest;
// If we have seen a previous batch, use it to make sure the next batch
// is constructed correctly:

let reconstructed_batch = AuthorityBatch::make_next_with_previous_digest(
Some(provided_digest),
&signed_batch.batch.transaction_batch.0,
);
let computed_digest = reconstructed_batch.transactions_digest;
if let Some((transactions, prev_batch)) = transactions_and_last_batch {
let reconstructed_batch = AuthorityBatch::make_next(prev_batch, transactions)?;

fp_ensure!(
reconstructed_batch == signed_batch.batch,
SuiError::ByzantineAuthoritySuspicion {
authority: self.address
}
);
}

fp_ensure!(
provided_digest == computed_digest,
SuiError::ByzantineAuthoritySuspicion {
authority: self.address
}
);
Ok(())
}

Expand Down Expand Up @@ -310,39 +313,59 @@ where
.handle_batch_stream(request.clone())
.await?;

let seq_requested = request.end - request.start;
let mut seq_to_be_returned = seq_requested as usize;
// check for overflow
if seq_requested > usize::MAX as u64 {
seq_to_be_returned = usize::MAX;
}

let client = self.clone();
let stream = Box::pin(
batch_info_items
.then(move |batch_info_item| {
let req_clone = request.clone();
let client = client.clone();
async move {
match &batch_info_item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
if let Err(err) =
client.check_update_item_batch_response(req_clone, signed_batch)
{
client.report_client_error(err.clone());
return Err(err);
}
batch_info_item
}
Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, _digest)))) => {
batch_info_item
}
Err(e) => Err(e.clone()),
let address = self.address;
let stream = Box::pin(batch_info_items.scan(
(0u64, None),
move |(seq, txs_and_last_batch), batch_info_item| {
let req_clone = request.clone();
let client = client.clone();

// We check if we have exceeded the batch boundary for this request.
if !(*seq < request.end) {
// If we exceed it return None to end stream
return futures::future::ready(None);
}

let x = match &batch_info_item {
Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch))) => {
if let Err(err) = client.check_update_item_batch_response(
req_clone,
&signed_batch,
&txs_and_last_batch,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be populated with the transactions from the batch first? I think this is what confused me before, that we don't store the digests of each transaction in the batch. If we use all the digests to create the batch, but then we don't pass in all the digests to reconstruct the batch, is it still possible we get the same batch after reconstruction?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you can see for the first batch, I just check the signature, and only check the second batch for its actual strucure. Maybe there are some gaps, but indeed without the transactions before it makes no sense to check very much.

) {
client.report_client_error(err.clone());
Some(Err(err))
} else {
// Save the seqeunce number of this batch
*seq = signed_batch.batch.next_sequence_number;
// Insert a fresh vector for the new batch of transactions
let _ =
txs_and_last_batch.insert((Vec::new(), signed_batch.batch.clone()));
Some(batch_info_item)
}
}
})
.take(seq_to_be_returned),
);
Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest)))) => {
// A stream always starts with a batch, so the previous should have initialized it.
// And here we insert the tuple into the batch.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In one of the batch tests, I saw that there were a few individual transactions sent before a batch was sent, also is it not ever expected that we would have individual transactions sent only like how we see in the benchmark with the default parameters set?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, right now we indeed have a gap in testing. We test only against a mock server. It would be nice to test against a real authority to make sure we catch something like this.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I remembered the follower server side should be sending batch --> (trasnactions+ -> batch) + , so all transactions are fully enclosed by batches. We should test to see if there is any issue with this?

if txs_and_last_batch
.as_mut()
.map(|txs| txs.0.push((*seq, *digest)))
.is_none()
{
let err = SuiError::ByzantineAuthoritySuspicion { authority: address };
client.report_client_error(err.clone());
Some(Err(err))
} else {
Some(batch_info_item)
}
}
Err(e) => Some(Err(e.clone())),
};

futures::future::ready(x)
},
));
Ok(Box::pin(stream))
}
}
158 changes: 78 additions & 80 deletions sui_core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,6 @@ async fn test_batch_store_retrieval() {
.batches_and_transactions(94, 120)
.expect("Retrieval failed!");

println!("{:?}", batches);
assert_eq!(3, batches.len());
assert_eq!(90, batches.first().unwrap().batch.next_sequence_number);
assert_eq!(115, batches.last().unwrap().batch.next_sequence_number);
Expand Down Expand Up @@ -471,30 +470,39 @@ impl AuthorityAPI for TrustworthyAuthorityClient {
let name = self.0.lock().await.name;
let batch_size = 3;

let stream = stream::unfold(
(request.start, AuthorityBatch::initial()),
move |(seq, last_batch)| {
let auth_secret = secret.clone();
async move {
if seq <= request.end {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let item = SignedBatch::new(next.clone(), &*auth_secret, name);
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
}
},
);
let mut items = Vec::new();
let mut last_batch = AuthorityBatch::initial();
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
let mut seq = 0;
while last_batch.next_sequence_number < request.end {
let mut transactions = Vec::new();
for _i in 0..batch_size {
let rnd = TransactionDigest::random();
transactions.push((seq, rnd));
items.push(BatchInfoResponseItem(UpdateItem::Transaction((seq, rnd))));
seq += 1;
}

let new_batch = AuthorityBatch::make_next(&last_batch, &transactions).unwrap();
last_batch = new_batch;
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
}

items.reverse();

let stream = stream::unfold(items, |mut items| async move {
if let Some(item) = items.pop() {
Some((Ok(item), items))
} else {
None
}
});
Ok(Box::pin(stream))
}
}
Expand Down Expand Up @@ -575,57 +583,45 @@ impl AuthorityAPI for ByzantineAuthorityClient {
let name = self.0.lock().await.name;
let batch_size = 3;

let stream = stream::unfold(
(request.start, AuthorityBatch::initial()),
move |(seq, last_batch)| {
let auth_secret = secret.clone();
async move {
if request.end % 2 == 0 {
if seq <= request.end {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let mut item = SignedBatch::new(next.clone(), &*auth_secret, name);
// Remove a transaction after creating the batch
item.batch.transaction_batch.0.pop();
// And then add in a fake transaction
item.batch
.transaction_batch
.0
.push((seq + batch_size, TransactionDigest::random()));
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
} else {
// Byzantine authority sends you too much, not what you asked for.
if seq <= request.end * 100 {
let mut transactions = Vec::new();
for i in 0..batch_size {
transactions.push((seq + i, TransactionDigest::random()));
}
let next = AuthorityBatch::make_next_with_previous_digest(
Some(last_batch.digest()),
&transactions,
);

let item = SignedBatch::new(next.clone(), &*auth_secret, name);
let response = BatchInfoResponseItem(UpdateItem::Batch(item));
Some((Ok(response), (seq + batch_size, next)))
} else {
None
}
}
}
},
);
let mut items = Vec::new();
let mut last_batch = AuthorityBatch::initial();
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
let mut seq = 0;
while last_batch.next_sequence_number < request.end {
let mut transactions = Vec::new();
for _i in 0..batch_size {
let rnd = TransactionDigest::random();
transactions.push((seq, rnd));
items.push(BatchInfoResponseItem(UpdateItem::Transaction((seq, rnd))));
seq += 1;
}

// Introduce byzantine behaviour:
// Pop last transaction
let (seq, _) = transactions.pop().unwrap();
// Insert a different one
transactions.push((seq, TransactionDigest::random()));

let new_batch = AuthorityBatch::make_next(&last_batch, &transactions).unwrap();
last_batch = new_batch;
items.push({
let item = SignedBatch::new(last_batch.clone(), &*secret, name);
BatchInfoResponseItem(UpdateItem::Batch(item))
});
}

items.reverse();

let stream = stream::unfold(items, |mut items| async move {
if let Some(item) = items.pop() {
Some((Ok(item), items))
} else {
None
}
});
Ok(Box::pin(stream))
}
}
Expand Down Expand Up @@ -678,8 +674,9 @@ async fn test_safe_batch_stream() {
.collect::<Vec<Result<BatchInfoResponseItem, SuiError>>>()
.await;

// Length should be within sequenced range
assert!(items.len() <= (request.end - request.start) as usize && !items.is_empty());
// Check length
assert!(!items.is_empty());
assert_eq!(items.len(), 15 + 6); // 15 items, and 6 batches (enclosing them)

let mut error_found = false;
for item in items {
Expand Down Expand Up @@ -722,8 +719,9 @@ async fn test_safe_batch_stream() {
.collect::<Vec<Result<BatchInfoResponseItem, SuiError>>>()
.await;

// Length should be within sequenced range, despite authority that never stop sending
assert!(items.len() <= (request.end - request.start) as usize && !items.is_empty());
// Check length
assert!(!items.is_empty());
assert_eq!(items.len(), 15 + 6); // 15 items, and 6 batches (enclosing them)

let request_b = BatchInfoRequest { start: 0, end: 10 };
batch_stream = safe_client_from_byzantine
Expand Down
8 changes: 0 additions & 8 deletions sui_core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ AuthorityBatch:
TUPLEARRAY:
CONTENT: U8
SIZE: 32
- transaction_batch:
TYPENAME: TransactionBatch
AuthoritySignInfo:
STRUCT:
- authority:
Expand Down Expand Up @@ -762,12 +760,6 @@ SuiError:
- error: STR
93:
OnlyOneConsensusClientPermitted: UNIT
TransactionBatch:
NEWTYPESTRUCT:
SEQ:
TUPLE:
- U64
- TYPENAME: TransactionDigest
TransactionData:
STRUCT:
- kind:
Expand Down
Loading