Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[authority sync] Provide an interface for explorers to sync with single authority #509

Merged
merged 36 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
f295ff4
Added tables for execution sequence and blocks
Feb 21, 2022
fd9fedc
Added architecture skeleton
Feb 21, 2022
12584a9
Rename to batch
Feb 22, 2022
9bacf5e
FMT
Feb 22, 2022
8ae557a
Added batch db open logic, and tests
Feb 22, 2022
fe359d9
Rename block to batch in code and comments
Feb 22, 2022
864ca6f
Batch logic and tests
Feb 22, 2022
fed3e4b
Appease clippy & fmt
Feb 22, 2022
9c3ba3e
Add out of order test
Feb 22, 2022
2dbead9
Logic to fix the database
Feb 22, 2022
f5477e2
Remove unused error
Feb 22, 2022
3a13f05
Rename test
Feb 22, 2022
99e3599
Added comments
Feb 22, 2022
dbdf817
Make fmt happy
Feb 22, 2022
0adec15
Minor changes
Feb 23, 2022
01d6172
Define clean consutructors
Feb 23, 2022
49a3a45
Clean Licence
Feb 23, 2022
01776c9
Integrations of batch listener into authority & tests
Feb 23, 2022
bd62573
Make fmt & clippy happy
Feb 23, 2022
060c85b
Move from usize to u64 for seq numbers
Feb 24, 2022
2ede6ea
Make fmt / clippy happy
Feb 24, 2022
a086b7f
Do not add genesis to transaction sequence
Mar 1, 2022
3d1c3cb
Updated from review comments
Mar 1, 2022
a76a810
Remove confusing comment
Mar 1, 2022
945942d
Added hashes to batches
Mar 2, 2022
eae08cd
Updated names to Batch(-er)
Mar 2, 2022
a1f4ca6
Make fmt happy
Mar 2, 2022
ba2ef2a
Created structures for signed batches
Mar 2, 2022
775c5de
Handle SignedBatches instead of Batches
Mar 2, 2022
03cee3c
Remove pub from file
Mar 2, 2022
afcba11
Appease clippy
Mar 2, 2022
1e09d11
Turn on format test and do fmt
Mar 2, 2022
6b7ae14
Use TxSequenceNumber
Mar 3, 2022
bd63cb0
Allow gaps in the sequence + simplify
Mar 3, 2022
3992943
Updated structures
Mar 3, 2022
3c94310
Fixed clippy on incoming?
Mar 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Batch logic and tests
  • Loading branch information
George Danezis committed Mar 3, 2022
commit 864ca6fb52da9238aeca2a64b63d2269cb3c2063
112 changes: 106 additions & 6 deletions sui_core/src/authority_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ use serde::{Deserialize, Serialize};
use std::sync::Arc;
use sui_types::base_types::*;
use sui_types::error::{SuiError, SuiResult};

use std::collections::BTreeMap;
use std::time::Duration;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::time::interval;

use typed_store::Map;

#[cfg(test)]
Expand Down Expand Up @@ -34,8 +39,19 @@ The architecture is as follows:

*/

pub type BroadcastPair = (
tokio::sync::broadcast::Sender<UpdateItem>,
tokio::sync::broadcast::Receiver<UpdateItem>,
);

/// Either a freshly sequenced transaction hash or a batch
pub struct UpdateItem {}
#[derive(
Eq, PartialEq, Ord, PartialOrd, Copy, Clone, Hash, Debug, Serialize, Deserialize,
)]
pub enum UpdateItem {
Transaction((usize, TransactionDigest)),
Batch(AuthorityBatch)
}

pub struct BatcherSender {
/// Channel for sending updates.
Expand All @@ -45,6 +61,8 @@ pub struct BatcherSender {
pub struct BatcherManager {
/// Channel for receiving updates
tx_recv: Receiver<(usize, TransactionDigest)>,
/// The sender end of the broadcast channel used to send updates to listeners
tx_broadcast: tokio::sync::broadcast::Sender<UpdateItem>,
/// Copy of the database to write batches and read transactions.
db: Arc<AuthorityStore>,
}
Expand All @@ -64,17 +82,26 @@ impl BatcherSender {
}

impl BatcherManager {
pub fn new(db: Arc<AuthorityStore>, capacity: usize) -> (BatcherSender, BatcherManager) {
pub fn new(
db: Arc<AuthorityStore>,
capacity: usize,
) -> (BatcherSender, BatcherManager, BroadcastPair) {
let (tx_send, tx_recv) = channel(capacity);
let (tx_broadcast, rx_broadcast) = tokio::sync::broadcast::channel(capacity);
let sender = BatcherSender { tx_send };
let manager = BatcherManager { tx_recv, db };
(sender, manager)
let manager = BatcherManager {
tx_recv,
tx_broadcast: tx_broadcast.clone(),
db,
};
(sender, manager, (tx_broadcast, rx_broadcast))
}

/// Starts the manager service / tokio task
pub fn start_service() {}

async fn init_from_database(&self) -> Result<AuthorityBatch, SuiError> {
// First read the last batch in the db
let mut last_batch = match self.db.batches.iter().skip_prior_to(&usize::MAX)?.next() {
Some((_, last_batch)) => last_batch,
None => {
Expand Down Expand Up @@ -121,14 +148,87 @@ impl BatcherManager {
Ok(last_batch)
}

pub async fn run_service(&self) -> SuiResult {
pub async fn run_service(&mut self, min_batch_size: usize, max_delay: Duration) -> SuiResult {
// We first use the state of the database to establish what the current
// latest batch is.
let _last_batch = self.init_from_database().await?;
let mut _last_batch = self.init_from_database().await?;

// Then we operate in a loop, where for each new update we consider
// whether to create a new batch or not.

let mut interval = interval(max_delay);
let mut exit = false;
let mut make_batch;

// The structures we use to build the next batch. The current_batch holds the sequence
// of transactions in order, following the last batch. The loose transactions holds
// transactions we may have received out of order.
let (mut current_batch, mut loose_transactions): (
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
Vec<(usize, TransactionDigest)>,
BTreeMap<usize, TransactionDigest>,
) = (Vec::new(), BTreeMap::new());
let mut next_seq_number = _last_batch.total_size;

while !exit {
// Reset the flags.
make_batch = false;

// check if we should make a new block
tokio::select! {
_ = interval.tick() => {
// Every so often we check if we should make a batch
// smaller than the max size. But never empty.
huitseeker marked this conversation as resolved.
Show resolved Hide resolved
make_batch = true;
},
item_option = self.tx_recv.recv() => {

match item_option {
None => {
make_batch = true;
exit = true;
},
Comment on lines +217 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this exit supposed to capture a notion of "at quiescence"?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Th exit here is very mundane: when the authority state closes (the only thing that should really have a sender for the channel) then the batcher makes the last batch and also closes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are many such names for those handles throughout the code base (e.g. the complete of the SpawnedServer). It would be great to use a single one for the concept. The one I've seen most frequently used for this is a "cancellation" channel.

Some((seq, tx_digest)) => {

loose_transactions.insert(seq, tx_digest);
while loose_transactions.contains_key(&next_seq_number) {
let next_item = (next_seq_number, loose_transactions.remove(&next_seq_number).unwrap());
// Send the update
let _ = self.tx_broadcast.send(UpdateItem::Transaction(next_item));
current_batch.push(next_item);
next_seq_number += 1;
}

if current_batch.len() >= min_batch_size {
make_batch = true;
}
}
}
}
}

huitseeker marked this conversation as resolved.
Show resolved Hide resolved
// Logic to make a batch
if make_batch {
if current_batch.is_empty() {
continue;
}

// Make and store a new batch.
let new_batch = AuthorityBatch {
total_size: next_seq_number,
previous_total_size: _last_batch.total_size,
};
self.db.batches.insert(&new_batch.total_size, &new_batch)?;

// Send the update
let _ = self.tx_broadcast.send(UpdateItem::Batch(new_batch.clone()));

// A new batch is actually made, so we reset the conditions.
_last_batch = new_batch;
current_batch.clear();
interval.reset();
}
}

// When a new batch is created we send a notification to all who have
// registered an interest.

Expand Down
60 changes: 57 additions & 3 deletions sui_core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn test_open_manager() {
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

// TEST 1: init from an empty database should return to a zero block
let (_send, manager) = BatcherManager::new(store.clone(), 100);
let (_send, manager, _pair) = BatcherManager::new(store.clone(), 100);
let last_block = manager
.init_from_database()
.await
Expand All @@ -47,7 +47,7 @@ async fn test_open_manager() {
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

let (_send, manager) = BatcherManager::new(store.clone(), 100);
let (_send, manager, _pair) = BatcherManager::new(store.clone(), 100);
let last_block = manager
.init_from_database()
.await
Expand All @@ -68,9 +68,63 @@ async fn test_open_manager() {
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

let (_send, manager) = BatcherManager::new(store.clone(), 100);
let (_send, manager, _pair) = BatcherManager::new(store.clone(), 100);
let last_block = manager.init_from_database().await;

assert_eq!(last_block, Err(SuiError::StorageCorrupt));
}
}

#[tokio::test]
async fn test_batch_manager_happypath() {
// let (_, authority_key) = get_key_pair();

// Create a random directory to store the DB
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

// Create an authority
let mut opts = rocksdb::Options::default();
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

// TEST 1: init from an empty database should return to a zero block
let (_send, mut manager, _pair) = BatcherManager::new(store.clone(), 100);

let _join = tokio::spawn(async move {
manager.run_service(1000, Duration::from_millis(500)).await.expect("Service returns with no errors");
drop(manager);
});

// Send a transaction.
let tx_zero = TransactionDigest::new([0; 32].try_into().unwrap());
_send
.send_item(0, tx_zero.clone())
.await
.expect("Send to the channel.");

// First we get a transaction update
let (_tx, mut rx) = _pair;
assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Transaction((0,_))));

// Then we (eventually) get a batch
assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_)));

_send
.send_item(1, tx_zero.clone())
.await
.expect("Send to the channel.");

// When we close the sending channel we also also end the service task
drop(_send);
drop(_tx);

_join.await.expect("No errors in task");

// But the block is made, and sent as a notification.
assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Transaction((1,_))));
assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_)));
assert!(matches!(rx.recv().await, Err(_)));

}