Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event Store Initial Implementation #2507

Merged
merged 11 commits into from
Jun 15, 2022
Prev Previous commit
Next Next commit
Add missing implementations, plus a limit check
  • Loading branch information
velvia committed Jun 9, 2022
commit f03c024bd637a1a5ae10227812b90ef5e2ccc95d
1 change: 1 addition & 0 deletions crates/sui-storage/src/event_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ trait EventStore {
pub enum EventStoreError {
GenericError(Box<dyn std::error::Error>),
SqlError(sqlx::Error),
LimitTooHigh(usize),
}

impl From<sqlx::Error> for EventStoreError {
Expand Down
59 changes: 54 additions & 5 deletions crates/sui-storage/src/event_store/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
use super::*;

use async_trait::async_trait;
use futures::TryFutureExt;
use serde_json::{json, Value};

use sqlx::{sqlite::SqliteRow, Executor, Row, SqlitePool};
use sui_types::event::Event;
use tracing::{debug, info};

/// Maximum number of events one can ask for right now
const MAX_LIMIT: usize = 5000;

/// Sqlite-based Event Store
///
/// ## Data Model
Expand Down Expand Up @@ -164,6 +166,20 @@ const TX_QUERY: &str = "SELECT * FROM events WHERE tx_digest = ?";
const QUERY_BY_TYPE: &str = "SELECT * FROM events WHERE timestamp >= ? AND \
timestamp < ? AND event_type = ? ORDER BY timestamp DESC LIMIT ?";

const QUERY_BY_MODULE: &str = "SELECT * FROM events WHERE timestamp >= ? AND \
timestamp < ? AND package_id = ? AND module_name = ? ORDER BY timestamp DESC LIMIT ?";

const QUERY_BY_CHECKPOINT: &str =
"SELECT * FROM events WHERE checkpoint >= ? AND checkpoint <= ? LIMIT ?";

fn check_limit(limit: usize) -> Result<(), EventStoreError> {
if limit <= MAX_LIMIT {
Ok(())
} else {
Err(EventStoreError::LimitTooHigh(limit))
}
}

#[async_trait]
impl EventStore for SqlEventStore {
type EventIt = std::vec::IntoIter<StoredEvent>;
Expand Down Expand Up @@ -218,6 +234,7 @@ impl EventStore for SqlEventStore {
event_type: EventType,
limit: usize,
) -> Result<Self::EventIt, EventStoreError> {
check_limit(limit)?;
let rows = sqlx::query(QUERY_BY_TYPE)
.persistent(true)
.bind(start_time as i64)
Expand All @@ -236,7 +253,7 @@ impl EventStore for SqlEventStore {
end_time: u64,
limit: usize,
) -> Result<Self::EventIt, EventStoreError> {
// TODO: check limit is not too high
check_limit(limit)?;
let rows = sqlx::query(TS_QUERY)
.bind(start_time as i64)
.bind(end_time as i64)
Expand All @@ -253,7 +270,16 @@ impl EventStore for SqlEventStore {
end_checkpoint: u64,
limit: usize,
) -> Result<Self::EventIt, EventStoreError> {
unimplemented!()
// TODO: a limit maybe doesn't make sense here. May change to unbounded iterator?
longbowlu marked this conversation as resolved.
Show resolved Hide resolved
check_limit(limit)?;
let rows = sqlx::query(QUERY_BY_CHECKPOINT)
.bind(start_checkpoint as i64)
.bind(end_checkpoint as i64)
.bind(limit as i64)
.map(sql_row_to_event)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter())
}

async fn events_by_module_id(
Expand All @@ -263,7 +289,18 @@ impl EventStore for SqlEventStore {
module: ModuleId,
limit: usize,
) -> Result<Self::EventIt, EventStoreError> {
unimplemented!()
check_limit(limit)?;
let rows = sqlx::query(QUERY_BY_MODULE)
.persistent(true)
.bind(start_time as i64)
.bind(end_time as i64)
.bind(module.address().to_vec())
.bind(module.name().to_string())
.bind(limit as i64)
.map(sql_row_to_event)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter())
}

async fn total_event_count(&self) -> Result<usize, EventStoreError> {
Expand Down Expand Up @@ -457,5 +494,17 @@ mod tests {

// TODO: test MoveEvents

// TODO: test limit
#[tokio::test]
async fn test_eventstore_max_limit() -> Result<(), EventStoreError> {
telemetry_subscribers::init_for_testing();

// Initialize store
let db = SqlEventStore::new_sqlite(":memory:").await?;
db.initialize().await?;

let res = db.event_iterator(1_000_000, 1_002_000, 100_000).await;
assert!(matches!(res, Err(EventStoreError::LimitTooHigh(100_000))));

Ok(())
}
}
14 changes: 12 additions & 2 deletions crates/sui-types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use name_variant::NamedVariant;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_with::{serde_as, Bytes};
use strum_macros::{EnumDiscriminants, EnumVariantNames};
use strum::VariantNames;
use strum_macros::{EnumDiscriminants, EnumVariantNames};

use crate::{
base_types::{ObjectID, SequenceNumber, SuiAddress, TransactionDigest},
Expand Down Expand Up @@ -61,9 +61,19 @@ pub enum TransferType {
}

/// Specific type of event
// Developer note: PLEASE only append new entries, do not modify existing entries (binary compat)
velvia marked this conversation as resolved.
Show resolved Hide resolved
#[serde_as]
#[derive(
Eq, Debug, Clone, PartialEq, NamedVariant, Deserialize, Serialize, Hash, EnumDiscriminants, EnumVariantNames
Eq,
Debug,
Clone,
PartialEq,
NamedVariant,
Deserialize,
Serialize,
Hash,
EnumDiscriminants,
EnumVariantNames,
)]
#[strum_discriminants(name(EventType))]
pub enum Event {
Expand Down