Skip to content

Commit

Permalink
Hook up EventStore in Sui/Authority (MystenLabs#2698)
Browse files Browse the repository at this point in the history
  • Loading branch information
Evan Chan authored Jun 29, 2022
1 parent 8010d67 commit ab9d80b
Show file tree
Hide file tree
Showing 19 changed files with 423 additions and 145 deletions.
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

58 changes: 48 additions & 10 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ use std::{
collections::{BTreeMap, HashMap, HashSet, VecDeque},
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
Arc,
},
};
use sui_adapter::adapter;
use sui_config::genesis::Genesis;
use sui_storage::{
event_store::{EventStore, EventStoreType, StoredEvent},
write_ahead_log::{DBTxGuard, TxGuard, WriteAheadLog},
IndexStore,
};

use sui_types::{
base_types::*,
batch::{TxSequenceNumber, UpdateItem},
Expand Down Expand Up @@ -231,6 +233,8 @@ impl AuthorityMetrics {
pub type StableSyncAuthoritySigner =
Pin<Arc<dyn signature::Signer<AuthoritySignature> + Send + Sync>>;

const DEFAULT_QUERY_LIMIT: usize = 1000;

pub struct AuthorityState {
// Fixed size, static, identity of the authority
/// The name of this authority.
Expand Down Expand Up @@ -273,6 +277,9 @@ pub struct AuthorityState {
pub consensus_guardrail: AtomicUsize,

pub metrics: AuthorityMetrics,

// Cache the latest checkpoint number to avoid expensive locking to access checkpoint store
latest_checkpoint_num: AtomicU64,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -695,9 +702,10 @@ impl AuthorityState {

// Emit events
if let Some(event_handler) = &self.event_handler {
let checkpoint_num = self.latest_checkpoint_num.load(Ordering::Relaxed);
event_handler
.process_events(&effects.effects, timestamp_ms)
.await;
.process_events(&effects.effects, timestamp_ms, seq, checkpoint_num)
.await?;
}

Ok(())
Expand Down Expand Up @@ -987,9 +995,9 @@ impl AuthorityState {
secret: StableSyncAuthoritySigner,
store: Arc<AuthorityStore>,
indexes: Option<Arc<IndexStore>>,
event_store: Option<Arc<EventStoreType>>,
checkpoints: Option<Arc<Mutex<CheckpointStore>>>,
genesis: &Genesis,
enable_event_processing: bool,
prometheus_registry: &prometheus::Registry,
) -> Self {
let (tx, _rx) = tokio::sync::broadcast::channel(BROADCAST_CAPACITY);
Expand Down Expand Up @@ -1035,11 +1043,8 @@ impl AuthorityState {
.get_last_epoch_info()
.expect("Fail to load the current epoch info");

let event_handler = if enable_event_processing {
Some(Arc::new(EventHandler::new(store.clone())))
} else {
None
};
let event_handler = event_store.map(|es| Arc::new(EventHandler::new(store.clone(), es)));

let mut state = AuthorityState {
name,
secret,
Expand All @@ -1061,6 +1066,7 @@ impl AuthorityState {
),
consensus_guardrail: AtomicUsize::new(0),
metrics: AuthorityMetrics::new(prometheus_registry),
latest_checkpoint_num: AtomicU64::new(0),
};

// Process tx recovery log first, so that the batch and checkpoint recovery (below)
Expand Down Expand Up @@ -1329,6 +1335,34 @@ impl AuthorityState {
Ok(self.get_indexes()?.get_transactions_to_addr(address)?)
}

/// Returns a full handle to the event store, including inserts... so be careful!
fn get_event_store(&self) -> Option<Arc<EventStoreType>> {
self.event_handler
.as_ref()
.map(|handler| handler.event_store.clone())
}

/// Returns a set of events corresponding to a given transaction, in order events were emitted
pub async fn get_events_for_transaction(
&self,
digest: TransactionDigest,
) -> Result<Vec<StoredEvent>, SuiError> {
let es = self.get_event_store().ok_or(SuiError::NoEventStore)?;
es.events_for_transaction(digest).await
}

/// Returns a whole set of events for a range of time
pub async fn get_events_for_timerange(
&self,
start_time: u64,
end_time: u64,
limit: Option<usize>,
) -> Result<Vec<StoredEvent>, SuiError> {
let es = self.get_event_store().ok_or(SuiError::NoEventStore)?;
es.event_iterator(start_time, end_time, limit.unwrap_or(DEFAULT_QUERY_LIMIT))
.await
}

pub async fn insert_genesis_object(&self, object: Object) {
self.database
.insert_genesis_object(object)
Expand Down Expand Up @@ -1529,14 +1563,18 @@ impl ExecutionState for AuthorityState {
ConsensusTransaction::Checkpoint(fragment) => {
let seq = consensus_index;
if let Some(checkpoint) = &self.checkpoints {
let mut checkpoint = checkpoint.lock();
checkpoint
.lock()
.handle_internal_fragment(seq, *fragment, &self.committee.load(), self)
.map_err(|e| SuiError::from(&e.to_string()[..]))?;

// NOTE: The method `handle_internal_fragment` is idempotent, so we don't need
// to persist the consensus index. If the validator crashes, this transaction
// may be resent to the checkpoint logic that will simply ignore it.

// Cache the next checkpoint number if it changes
self.latest_checkpoint_num
.store(checkpoint.next_checkpoint(), Ordering::Relaxed);
}

// TODO: This return time is not ideal. The authority submitting the checkpoint fragment
Expand Down
2 changes: 0 additions & 2 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ pub struct SuiDataStore<S> {
owner_index: DBMap<(Owner, ObjectID), ObjectInfo>,

/// This is map between the transaction digest and transactions found in the `transaction_lock`.
/// NOTE: after a lock is deleted (after a certificate is processed) the corresponding entry here
/// could be deleted, but right now this is only done on gateways, not done on authorities.
transactions: DBMap<TransactionDigest, TransactionEnvelope<S>>,

/// This is a map between the transaction digest and the corresponding certificate for all
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ impl ConfigurableBatchActionClient {
store,
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,9 @@ impl LocalAuthorityClient {
secret.clone(),
store,
None,
None,
Some(Arc::new(Mutex::new(checkpoints))),
genesis,
false,
&prometheus::Registry::new(),
)
.await;
Expand Down
8 changes: 4 additions & 4 deletions crates/sui-core/src/checkpoints/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,9 +788,9 @@ async fn test_batch_to_checkpointing() {
secret,
store.clone(),
None,
None,
Some(checkpoints.clone()),
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down Expand Up @@ -879,8 +879,8 @@ async fn test_batch_to_checkpointing_init_crash() {
store.clone(),
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down Expand Up @@ -961,9 +961,9 @@ async fn test_batch_to_checkpointing_init_crash() {
secret,
store.clone(),
None,
None,
Some(checkpoints.clone()),
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down Expand Up @@ -1480,9 +1480,9 @@ pub async fn checkpoint_tests_setup(
secret,
store.clone(),
None,
None,
Some(checkpoint.clone()),
&genesis,
false,
&prometheus::Registry::new(),
)
.await;
Expand Down
59 changes: 45 additions & 14 deletions crates/sui-core/src/event_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use move_bytecode_utils::module_cache::SyncModuleCache;
use serde_json::Value;
use sui_json_rpc_api::rpc_types::{SuiMoveStruct, SuiMoveValue};
use tokio_stream::Stream;
use tracing::{debug, error};
use tracing::{debug, error, trace};

use sui_storage::event_store::{EventStore, EventStoreType};
use sui_types::base_types::TransactionDigest;
use sui_types::{
error::{SuiError, SuiResult},
Expand All @@ -30,35 +31,60 @@ pub const EVENT_DISPATCH_BUFFER_SIZE: usize = 1000;
pub struct EventHandler {
module_cache: SyncModuleCache<ResolverWrapper<AuthorityStore>>,
event_streamer: Streamer<EventEnvelope, EventFilter>,
pub(crate) event_store: Arc<EventStoreType>,
}

impl EventHandler {
pub fn new(validator_store: Arc<AuthorityStore>) -> Self {
pub fn new(validator_store: Arc<AuthorityStore>, event_store: Arc<EventStoreType>) -> Self {
let streamer = Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE);
Self {
module_cache: SyncModuleCache::new(ResolverWrapper(validator_store)),
event_streamer: streamer,
event_store,
}
}

pub async fn process_events(&self, effects: &TransactionEffects, timestamp_ms: u64) {
pub async fn process_events(
&self,
effects: &TransactionEffects,
timestamp_ms: u64,
seq_num: u64,
checkpoint_num: u64,
) -> SuiResult {
let res: Result<Vec<_>, _> = effects
.events
.iter()
.map(|e| self.create_envelope(e, effects.transaction_digest, seq_num, timestamp_ms))
.collect();
let envelopes = res?;

// Ingest all envelopes together at once (for efficiency) into Event Store
self.event_store
.add_events(&envelopes, checkpoint_num)
.await?;
trace!(
num_events = envelopes.len(),
digest =? effects.transaction_digest,
checkpoint_num, "Finished writing events to event store"
);

// serially dispatch event processing to honor events' orders.
for event in &effects.events {
if let Err(e) = self
.process_event(event, timestamp_ms, effects.transaction_digest)
.await
{
for envelope in envelopes {
if let Err(e) = self.event_streamer.send(envelope).await {
error!(error =? e, "Failed to send EventEnvelope to dispatch");
}
}

Ok(())
}

pub async fn process_event(
fn create_envelope(
&self,
event: &Event,
timestamp_ms: u64,
digest: TransactionDigest,
) -> SuiResult {
seq_num: u64,
timestamp_ms: u64,
) -> Result<EventEnvelope, SuiError> {
let json_value = match event {
Event::MoveEvent {
type_, contents, ..
Expand All @@ -76,9 +102,14 @@ impl EventHandler {
}
_ => None,
};
let envelope = EventEnvelope::new(timestamp_ms, digest, event.clone(), json_value);
// TODO store events here
self.event_streamer.send(envelope).await

Ok(EventEnvelope::new(
timestamp_ms,
Some(digest),
seq_num,
event.clone(),
json_value,
))
}

pub fn subscribe(&self, filter: EventFilter) -> impl Stream<Item = EventEnvelope> {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/unit_tests/authority_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1592,8 +1592,8 @@ pub async fn init_state() -> AuthorityState {
store,
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await
Expand Down
6 changes: 3 additions & 3 deletions crates/sui-core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ pub(crate) async fn init_state(
store,
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await
Expand Down Expand Up @@ -771,8 +771,8 @@ async fn test_safe_batch_stream() {
store.clone(),
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down Expand Up @@ -818,8 +818,8 @@ async fn test_safe_batch_stream() {
store,
None,
None,
None,
&sui_config::genesis::Genesis::get_default_genesis(),
false,
&prometheus::Registry::new(),
)
.await;
Expand Down
Loading

0 comments on commit ab9d80b

Please sign in to comment.