Skip to content

Commit

Permalink
Cherry picks of #10961 and #11017 (#11050)
Browse files Browse the repository at this point in the history
  • Loading branch information
gegaowp authored Apr 18, 2023
1 parent 7fc3e02 commit 0055b6b
Show file tree
Hide file tree
Showing 7 changed files with 244 additions and 334 deletions.
2 changes: 1 addition & 1 deletion crates/sui-indexer/benches/indexer_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fn indexer_benchmark(c: &mut Criterion) {
});

c.bench_function("persist_checkpoint", |b| {
b.iter(|| rt.block_on(store.persist_all_checkpoint_data(&checkpoints.pop().unwrap())))
b.iter(|| store.persist_all_checkpoint_data(&checkpoints.pop().unwrap()))
});

let mut checkpoints = (20..100).cycle().map(CheckpointId::SequenceNumber);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
DROP TABLE IF EXISTS objects;
DROP TABLE IF EXISTS objects_history;
-- DROP TABLE IF EXISTS objects_history;

DROP TYPE IF EXISTS owner_type;
DROP TYPE IF EXISTS bcs_bytes;
Expand Down
151 changes: 76 additions & 75 deletions crates/sui-indexer/migrations/2022-12-01-034426_objects/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,82 +39,83 @@ CREATE TABLE objects
CREATE INDEX objects_owner_address ON objects (owner_type, owner_address);
CREATE INDEX objects_tx_digest ON objects (previous_transaction);

CREATE TABLE objects_history
(
epoch BIGINT NOT NULL,
checkpoint BIGINT NOT NULL,
object_id address NOT NULL,
version BIGINT NOT NULL,
object_digest base58digest NOT NULL,
owner_type owner_type NOT NULL,
owner_address address,
old_owner_type owner_type,
old_owner_address address,
initial_shared_version BIGINT,
previous_transaction base58digest NOT NULL,
object_type VARCHAR NOT NULL,
object_status object_status NOT NULL,
has_public_transfer BOOLEAN NOT NULL,
storage_rebate BIGINT NOT NULL,
bcs bcs_bytes[] NOT NULL,
CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint)
) PARTITION BY RANGE (checkpoint);
CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint);
CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version);
CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address);
CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address);
-- fast-path partition for the most recent objects before checkpoint, range is half-open.
-- partition name need to match regex of '.*(_partition_)\d+'.
CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0);
CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);
-- NOTE(gegaowp): remove object history so that it will not be created over DB reset / migration run.
-- CREATE TABLE objects_history
-- (
-- epoch BIGINT NOT NULL,
-- checkpoint BIGINT NOT NULL,
-- object_id address NOT NULL,
-- version BIGINT NOT NULL,
-- object_digest base58digest NOT NULL,
-- owner_type owner_type NOT NULL,
-- owner_address address,
-- old_owner_type owner_type,
-- old_owner_address address,
-- initial_shared_version BIGINT,
-- previous_transaction base58digest NOT NULL,
-- object_type VARCHAR NOT NULL,
-- object_status object_status NOT NULL,
-- has_public_transfer BOOLEAN NOT NULL,
-- storage_rebate BIGINT NOT NULL,
-- bcs bcs_bytes[] NOT NULL,
-- CONSTRAINT objects_history_pk PRIMARY KEY (object_id, version, checkpoint)
-- ) PARTITION BY RANGE (checkpoint);
-- CREATE INDEX objects_history_checkpoint_index ON objects_history (checkpoint);
-- CREATE INDEX objects_history_id_version_index ON objects_history (object_id, version);
-- CREATE INDEX objects_history_owner_index ON objects_history (owner_type, owner_address);
-- CREATE INDEX objects_history_old_owner_index ON objects_history (old_owner_type, old_owner_address);
-- -- fast-path partition for the most recent objects before checkpoint, range is half-open.
-- -- partition name need to match regex of '.*(_partition_)\d+'.
-- CREATE TABLE objects_history_fast_path_partition_0 PARTITION OF objects_history FOR VALUES FROM (-1) TO (0);
-- CREATE TABLE objects_history_partition_0 PARTITION OF objects_history FOR VALUES FROM (0) TO (MAXVALUE);

CREATE OR REPLACE FUNCTION objects_modified_func() RETURNS TRIGGER AS
$body$
BEGIN
IF (TG_OP = 'INSERT') THEN
INSERT INTO objects_history
VALUES (NEW.epoch, NEW.checkpoint, NEW.object_id, NEW.version, NEW.object_digest, NEW.owner_type,
NEW.owner_address, NULL, NULL,
NEW.initial_shared_version,
NEW.previous_transaction, NEW.object_type, NEW.object_status, NEW.has_public_transfer,
NEW.storage_rebate, NEW.bcs);
RETURN NEW;
ELSEIF (TG_OP = 'UPDATE') THEN
INSERT INTO objects_history
VALUES (NEW.epoch, NEW.checkpoint, NEW.object_id, NEW.version, NEW.object_digest, NEW.owner_type,
NEW.owner_address, OLD.owner_type, OLD.owner_address,
NEW.initial_shared_version,
NEW.previous_transaction, NEW.object_type, NEW.object_status, NEW.has_public_transfer,
NEW.storage_rebate, NEW.bcs);
-- MUSTFIX(gegaowp): we cannot update checkpoint in-place, b/c checkpoint is a partition key,
-- we need to prune old data in this partition periodically, like pruning old epochs upon new epoch.
RETURN NEW;
ELSIF (TG_OP = 'DELETE') THEN
-- object deleted from the main table, archive the history for that object
DELETE FROM objects_history WHERE object_id = old.object_id;
RETURN OLD;
ELSE
RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,NOW();
RETURN NULL;
END IF;
-- CREATE OR REPLACE FUNCTION objects_modified_func() RETURNS TRIGGER AS
-- $body$
-- BEGIN
-- IF (TG_OP = 'INSERT') THEN
-- INSERT INTO objects_history
-- VALUES (NEW.epoch, NEW.checkpoint, NEW.object_id, NEW.version, NEW.object_digest, NEW.owner_type,
-- NEW.owner_address, NULL, NULL,
-- NEW.initial_shared_version,
-- NEW.previous_transaction, NEW.object_type, NEW.object_status, NEW.has_public_transfer,
-- NEW.storage_rebate, NEW.bcs);
-- RETURN NEW;
-- ELSEIF (TG_OP = 'UPDATE') THEN
-- INSERT INTO objects_history
-- VALUES (NEW.epoch, NEW.checkpoint, NEW.object_id, NEW.version, NEW.object_digest, NEW.owner_type,
-- NEW.owner_address, OLD.owner_type, OLD.owner_address,
-- NEW.initial_shared_version,
-- NEW.previous_transaction, NEW.object_type, NEW.object_status, NEW.has_public_transfer,
-- NEW.storage_rebate, NEW.bcs);
-- -- MUSTFIX(gegaowp): we cannot update checkpoint in-place, b/c checkpoint is a partition key,
-- -- we need to prune old data in this partition periodically, like pruning old epochs upon new epoch.
-- RETURN NEW;
-- ELSIF (TG_OP = 'DELETE') THEN
-- -- object deleted from the main table, archive the history for that object
-- DELETE FROM objects_history WHERE object_id = old.object_id;
-- RETURN OLD;
-- ELSE
-- RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - Other action occurred: %, at %',TG_OP,NOW();
-- RETURN NULL;
-- END IF;

EXCEPTION
WHEN data_exception THEN
RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
WHEN unique_violation THEN
RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
WHEN OTHERS THEN
RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
RETURN NULL;
END;
$body$
LANGUAGE plpgsql;
-- EXCEPTION
-- WHEN data_exception THEN
-- RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [DATA EXCEPTION] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
-- RETURN NULL;
-- WHEN unique_violation THEN
-- RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [UNIQUE] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
-- RETURN NULL;
-- WHEN OTHERS THEN
-- RAISE WARNING '[OBJECTS_MODIFIED_FUNC] - UDF ERROR [OTHER] - SQLSTATE: %, SQLERRM: %',SQLSTATE,SQLERRM;
-- RETURN NULL;
-- END;
-- $body$
-- LANGUAGE plpgsql;

CREATE TRIGGER objects_history
AFTER INSERT OR UPDATE OR DELETE
ON objects
FOR EACH ROW
EXECUTE PROCEDURE objects_modified_func();
-- CREATE TRIGGER objects_history
-- AFTER INSERT OR UPDATE OR DELETE
-- ON objects
-- FOR EACH ROW
-- EXECUTE PROCEDURE objects_modified_func();

3 changes: 2 additions & 1 deletion crates/sui-indexer/src/handlers/checkpoint_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,15 @@ where
// otherwise send it to channel to be committed later.
if epoch.last_epoch.is_none() {
let epoch_db_guard = self.metrics.epoch_db_commit_latency.start_timer();
info!("Persisting first epoch...");
let mut persist_first_epoch_res = self.state.persist_epoch(&epoch).await;
while persist_first_epoch_res.is_err() {
warn!("Failed to persist first epoch, retrying...");
persist_first_epoch_res = self.state.persist_epoch(&epoch).await;
}
self.state.persist_epoch(&epoch).await?;
epoch_db_guard.stop_and_record();
self.metrics.total_epoch_committed.inc();
info!("Persisted first epoch");
} else {
let epoch_sender_guard = self.epoch_sender.lock().await;
// NOTE: when the channel is full, epoch_sender_guard will wait until the channel has space.
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/store/indexer_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub trait IndexerStore {
) -> Result<usize, IndexerError>;
// TODO(gegaowp): keep this method in this trait for now for easier reverting,
// will remove it if it's no longer needed.
async fn persist_all_checkpoint_data(
fn persist_all_checkpoint_data(
&self,
data: &TemporaryCheckpointStore,
) -> Result<usize, IndexerError>;
Expand Down
27 changes: 0 additions & 27 deletions crates/sui-indexer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,6 @@ mod diesel_marco {
}};
}

macro_rules! read_only {
($pool:expr, $query:expr) => {{
let mut pg_pool_conn = crate::get_async_pg_pool_connection($pool).await?;
pg_pool_conn
.build_transaction()
.read_only()
.run($query)
.await
.map_err(|e| IndexerError::PostgresReadError(e.to_string()))
}};
}

macro_rules! transactional {
($pool:expr, $query:expr) => {{
let mut pg_pool_conn = crate::get_async_pg_pool_connection($pool).await?;
pg_pool_conn
.build_transaction()
.serializable()
.read_write()
.run($query)
.await
.map_err(|e| IndexerError::PostgresWriteError(e.to_string()))
}};
}

macro_rules! transactional_blocking {
($pool:expr, $query:expr) => {{
let mut pg_pool_conn = crate::get_pg_pool_connection($pool)?;
Expand All @@ -57,8 +32,6 @@ mod diesel_marco {
.map_err(|e| IndexerError::PostgresWriteError(e.to_string()))
}};
}
pub(crate) use read_only;
pub(crate) use read_only_blocking;
pub(crate) use transactional;
pub(crate) use transactional_blocking;
}
Loading

0 comments on commit 0055b6b

Please sign in to comment.