Skip to content

Commit 24fcc55

Browse files
feat(indexer): add streaming support for events and transactions (#8555)
# Description of change This PR is the first iteration attempt to provide streaming support for the `iota-indexer` as a library componente. Currently it supports streaming of `StoredEvents` and `StoredTransactions`. It also supports filters. ## Links to any relevant issues fixes #8315 ## How the change has been tested created a separated binary which used the `iota-indexer` as a library, used `testnet` network as source of checkpoints for synchronization, subscribed to incoming events, applied different filter to assert that they were respected. - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [x] Patch-specific tests (correctness, functionality coverage) ### Infrastructure QA (only required for crates that are maintained by @iotaledger/infrastructure) - [ ] Synchronization of the indexer from genesis for a network including migration objects. - [x] Restart of indexer synchronization locally without resetting the database. - [ ] Restart of indexer synchronization on a production-like database. - [ ] Deployment of services using Docker. - [x] Verification of API backward compatibility. ### Release Notes - [x] Indexer: Add streaming support for `events` and `transactions`.
1 parent 821295e commit 24fcc55

File tree

8 files changed

+562
-1
lines changed

8 files changed

+562
-1
lines changed

Cargo.lock

Lines changed: 85 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/iota-indexer/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ prometheus.workspace = true
2828
rand = { workspace = true, optional = true }
2929
secrecy = "0.8.0"
3030
serde.workspace = true
31+
serde_json.workspace = true
3132
serde_with.workspace = true
3233
strum.workspace = true
3334
strum_macros.workspace = true
3435
tap.workspace = true
3536
tempfile.workspace = true
3637
thiserror.workspace = true
3738
tokio = { workspace = true, features = ["full"] }
39+
tokio-postgres = "0.7.13"
3840
tokio-stream.workspace = true
3941
tokio-util = { workspace = true, features = ["rt"] }
4042
toml.workspace = true
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
DROP TRIGGER IF EXISTS checkpoint_committed_trigger ON checkpoints;
2+
DROP FUNCTION IF EXISTS notify_checkpoint_committed();
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
CREATE OR REPLACE FUNCTION notify_checkpoint_committed()
2+
RETURNS TRIGGER AS $$
3+
BEGIN
4+
-- Send notification with just the range - let client query events
5+
PERFORM pg_notify('checkpoint_committed',
6+
json_build_object(
7+
'checkpoint_sequence_number', NEW.sequence_number,
8+
'min_tx_sequence_number', NEW.min_tx_sequence_number,
9+
'max_tx_sequence_number', NEW.max_tx_sequence_number
10+
)::text
11+
);
12+
13+
RETURN NULL;
14+
END;
15+
$$ LANGUAGE plpgsql;
16+
17+
18+
CREATE TRIGGER checkpoint_committed_trigger
19+
AFTER INSERT ON checkpoints
20+
FOR EACH ROW
21+
EXECUTE FUNCTION notify_checkpoint_committed();

crates/iota-indexer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub mod pruning;
4444
pub mod read;
4545
pub mod schema;
4646
pub mod store;
47+
pub mod stream;
4748
pub mod system_package_task;
4849
pub mod test_utils;
4950
pub mod types;

crates/iota-indexer/src/models/transactions.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ impl StoredTransaction {
455455
})
456456
}
457457

458-
fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
458+
pub(crate) fn try_into_sender_signed_data(&self) -> IndexerResult<SenderSignedData> {
459459
let sender_signed_data: SenderSignedData =
460460
bcs::from_bytes(&self.raw_transaction).map_err(|e| {
461461
IndexerError::PersistentStorageDataCorruption(format!(

crates/iota-indexer/src/read.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -870,6 +870,20 @@ impl IndexerReader {
870870
run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
871871
}
872872

873+
pub(crate) fn multi_get_transactions_by_sequence_numbers_range(
874+
&self,
875+
min_seq: i64,
876+
max_seq: i64,
877+
) -> Result<Vec<StoredTransaction>, IndexerError> {
878+
use crate::schema::transactions::dsl as txdsl;
879+
let query = txdsl::transactions
880+
.filter(txdsl::tx_sequence_number.ge(min_seq))
881+
.filter(txdsl::tx_sequence_number.le(max_seq))
882+
.order(txdsl::tx_sequence_number.asc())
883+
.into_boxed();
884+
run_query!(&self.pool, |conn| query.load::<StoredTransaction>(conn))
885+
}
886+
873887
pub async fn get_owned_objects_in_blocking_task(
874888
&self,
875889
address: IotaAddress,

0 commit comments

Comments
 (0)