Skip to content

Commit 3b2b54f

Browse files
feat(indexer): add streaming support for events and transactions
1 parent 6bfb900 commit 3b2b54f

File tree

8 files changed

+642
-1
lines changed

8 files changed

+642
-1
lines changed

Cargo.lock

Lines changed: 85 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iota-indexer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,13 @@ prometheus.workspace = true
2828
rand = { workspace = true, optional = true }
2929
secrecy = "0.8.0"
3030
serde.workspace = true
31+
serde_json.workspace = true
3132
serde_with.workspace = true
3233
tap.workspace = true
3334
tempfile.workspace = true
3435
thiserror.workspace = true
3536
tokio = { workspace = true, features = ["full"] }
37+
tokio-postgres = "0.7.13"
3638
tokio-stream.workspace = true
3739
tokio-util = { workspace = true, features = ["rt"] }
3840
tracing.workspace = true
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
DROP TRIGGER IF EXISTS checkpoint_committed_trigger ON checkpoints;
2+
DROP FUNCTION IF EXISTS notify_checkpoint_committed();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
CREATE OR REPLACE FUNCTION notify_checkpoint_committed()
2+
RETURNS TRIGGER AS $$
3+
BEGIN
4+
-- Send notification with just the range - let client query events
5+
PERFORM pg_notify('checkpoint_committed',
6+
json_build_object(
7+
'checkpoint_sequence_number', NEW.sequence_number,
8+
'min_tx_sequence_number', NEW.min_tx_sequence_number,
9+
'max_tx_sequence_number', NEW.max_tx_sequence_number
10+
)::text
11+
);
12+
13+
RETURN NULL;
14+
END;
15+
$$ LANGUAGE plpgsql;
16+
17+
18+
CREATE TRIGGER checkpoint_committed_trigger
19+
AFTER INSERT ON checkpoints
20+
FOR EACH ROW
21+
EXECUTE FUNCTION notify_checkpoint_committed();

crates/iota-indexer/src/indexer_reader.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,20 @@ impl IndexerReader {
869869
run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
870870
}
871871

872+
pub(crate) fn multi_get_transactions_by_sequence_numbers_range(
873+
&self,
874+
min_seq: i64,
875+
max_seq: i64,
876+
) -> Result<Vec<StoredTransaction>, IndexerError> {
877+
use crate::schema::transactions::dsl as txdsl;
878+
let query = txdsl::transactions
879+
.filter(txdsl::tx_sequence_number.ge(min_seq))
880+
.filter(txdsl::tx_sequence_number.le(max_seq))
881+
.order(txdsl::tx_sequence_number.asc())
882+
.into_boxed();
883+
run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
884+
}
885+
872886
pub async fn get_owned_objects_in_blocking_task(
873887
&self,
874888
address: IotaAddress,

crates/iota-indexer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod processors;
4444
pub(crate) mod rolling;
4545
pub mod schema;
4646
pub mod store;
47+
pub mod stream;
4748
pub mod system_package_task;
4849
pub mod test_utils;
4950
pub mod types;

crates/iota-indexer/src/models/transactions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,7 @@ impl StoredTransaction {
431431
})
432432
}
433433

434-
fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
434+
pub(crate) fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
435435
let sender_signed_data: SenderSignedData =
436436
bcs::from_bytes(&self.raw_transaction).map_err(|e| {
437437
IndexerError::PersistentStorageDataCorruption(format!(

0 commit comments

Comments
 (0)