Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
05515c3
feat: init gRPC event filter
bingyanglin Aug 19, 2025
f5f79a0
refactor: code clean
bingyanglin Aug 19, 2025
16041f7
fix: bugs found by using iota CLI
bingyanglin Aug 19, 2025
bbe7feb
refactor: better passing hiearchy for the grpc cancellation token
bingyanglin Aug 19, 2025
0055125
refactor: use GrpcEventBroadcaster
bingyanglin Aug 20, 2025
06aa721
rename: grpc_event_broadcast_tx to grpc_event_broadcaster
bingyanglin Aug 20, 2025
dfd670b
rename: grpc_event_tx to grpc_event_broadcaster
bingyanglin Aug 20, 2025
f6b65f8
docs: refine gRPC Event Service description
bingyanglin Aug 20, 2025
b03c331
rename: broadcast channel to broadcaster
bingyanglin Aug 20, 2025
60653eb
refactor: code clean
bingyanglin Aug 20, 2025
2480d49
fix: cargo doc error
bingyanglin Aug 20, 2025
6cc5ae5
fix: code clean
bingyanglin Aug 20, 2025
d79d4a1
Update crates/iota/src/iota_commands.rs
bingyanglin Aug 20, 2025
e94140c
fix: fmt
bingyanglin Aug 20, 2025
8dc4a7a
refactor: event protobuf
bingyanglin Aug 21, 2025
f06a81d
refactor: use event_streamer and subscriber for gRPC
bingyanglin Aug 21, 2025
31a3cda
Update crates/iota/src/iota_commands.rs
bingyanglin Aug 21, 2025
e8b4518
Update crates/iota-grpc-api/src/event_service.rs
bingyanglin Aug 21, 2025
f92693b
Update crates/iota-grpc-api/src/event_service.rs
bingyanglin Aug 21, 2025
a295003
Update crates/iota-grpc-api/src/event_service.rs
bingyanglin Aug 21, 2025
68ed36b
fix: clippy
bingyanglin Aug 21, 2025
ba81b1b
refactor: holding shutdown_token in service level
bingyanglin Aug 21, 2025
982151e
fix: delete empty line
bingyanglin Aug 21, 2025
9cee6ac
refactor: code clean
bingyanglin Aug 21, 2025
8c01a95
refactor: use send_traced
bingyanglin Aug 21, 2025
fea7803
refactor: remove and reorder grpc event filters
bingyanglin Aug 22, 2025
14537e3
refactor: use into_iter and remove clone
bingyanglin Aug 25, 2025
216d7ef
refactor: remove into_iter()
bingyanglin Aug 25, 2025
8121115
refactor: use package_id
bingyanglin Aug 26, 2025
f7dcd36
refactor: the only error is no receivers
bingyanglin Aug 26, 2025
9660b98
fix: remove meaningless error from debug message
semenov-vladyslav Aug 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iota-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ iota-config.workspace = true
iota-execution.workspace = true
iota-framework.workspace = true
iota-genesis-builder.workspace = true
iota-grpc-api.workspace = true
iota-grpc-types.workspace = true
iota-json-rpc-types.workspace = true
iota-macros.workspace = true
Expand Down
12 changes: 12 additions & 0 deletions crates/iota-core/src/subscription_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::sync::Arc;

use iota_grpc_api::EventSubscriber;
use iota_json_rpc_types::{
EffectsWithInput, EventFilter, IotaEvent, IotaTransactionBlockEffects,
IotaTransactionBlockEffectsAPI, IotaTransactionBlockEvents, TransactionFilter,
Expand Down Expand Up @@ -105,6 +106,7 @@ impl SubscriptionHandler {

// serially dispatch event processing to honor events' orders.
for event in events.data.clone() {
// Send to unified event streamer (serves both JSON-RPC and gRPC subscribers)
if let Err(e) = self.event_streamer.try_send(event) {
error!(error =? e, "Failed to send event to dispatch");
}
Expand All @@ -123,3 +125,13 @@ impl SubscriptionHandler {
self.transaction_streamer.subscribe(filter)
}
}

// Implement EventSubscriber trait for gRPC integration
impl EventSubscriber for SubscriptionHandler {
fn subscribe_events(
&self,
filter: EventFilter,
) -> Box<dyn futures::Stream<Item = IotaEvent> + Send + Unpin> {
Box::new(Box::pin(self.event_streamer.subscribe(filter)))
}
}
4 changes: 3 additions & 1 deletion crates/iota-grpc-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ bcs.workspace = true
futures.workspace = true
prost.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util.workspace = true
Expand All @@ -19,12 +20,13 @@ tracing.workspace = true

# internal dependencies
iota-grpc-types.workspace = true
iota-json-rpc-types.workspace = true
iota-types.workspace = true
move-core-types.workspace = true

[build-dependencies]
tonic-build.workspace = true

[dev-dependencies]
iota-config.workspace = true
move-core-types.workspace = true
test-cluster.workspace = true
53 changes: 38 additions & 15 deletions crates/iota-grpc-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,30 @@ This crate introduces a gRPC API for IOTA. The primary goal of this API is to pr

## Features

The `NodeService` provides the following RPC endpoints:
The gRPC API provides the following services:

### Checkpoint Service

- `StreamCheckpoints`: Stream checkpoint data based on a flexible range.
- `GetEpochFirstCheckpointSequenceNumber`: Query the first checkpoint sequence number for a given epoch (useful for robust reset and epoch boundary handling).

### Event Service

- `StreamEvents`: Stream events with flexible filtering capabilities

Event filters allow precise control over which events are streamed to clients, including filtering by event type, package, sender, transaction, field values, time ranges, and boolean combinations. For the complete list and definitions of available filters, see [`proto/event.proto`](proto/event.proto).

## Usage

The `iota-grpc-api` crate defines the gRPC service and its messages. The `iota-node` crate integrates and starts this gRPC server if `enable-grpc-api` is set to `true` and `grpc-api-config` is configured.

A shared gRPC client (`GrpcNodeClient`) is provided by this crate and should be used by downstream consumers to connect and stream checkpoints. This ensures all consumers use the same, up-to-date protocol and data model.
Shared gRPC clients are provided by this crate:

- `CheckpointClient`: For streaming checkpoints and querying epoch information
- `EventClient`: For streaming events with filtering capabilities
- `NodeClient`: Factory for creating and managing service clients

These clients should be used by downstream consumers to ensure all consumers use the same, up-to-date protocol and data model.

**Configuration Example:**

Expand All @@ -27,24 +41,33 @@ enable-grpc-api: true
grpc-api-config:
address: "0.0.0.0:50051"
checkpoint-broadcast-buffer-size: 100
event-broadcast-buffer-size: 1000
```

**Client Example:**
**Client Examples:**

```rust
use iota_grpc_api::client::GrpcNodeClient;
use iota_grpc_api::client::NodeClient;

let mut client = GrpcNodeClient::connect("http://localhost:50051").await?;
let mut stream = client.stream_checkpoints(Some(0), Some(10), Some(false)).await?;
while let Some(Ok(checkpoint)) = stream.next().await {
// Deserialize and process checkpoint.data (BCS-encoded CertifiedCheckpointSummary)
}
let mut stream = client.stream_checkpoints(None, Some(4), Some(true)).await?;
if let Some(Ok(checkpoint)) = stream.next().await {
// Deserialize as CheckpointData
// Connect to gRPC node
let node_client = NodeClient::connect("http://localhost:50051").await?;

// Checkpoint streaming example
let mut checkpoint_client = node_client.checkpoint_client().expect("Checkpoint client available");
let mut checkpoint_stream = checkpoint_client.stream_checkpoints(Some(0), Some(10), false).await?;
while let Some(Ok(checkpoint_content)) = checkpoint_stream.next().await {
// Process checkpoint data or summary
}
let mut stream = client.stream_checkpoints(Some(5), None, Some(true)).await?;
while let Some(Ok(checkpoint)) = stream.next().await {
// checkpoint.data is BCS-encoded CheckpointData

// Event streaming example
use iota_grpc_api::events::{EventFilter, AllFilter, event_filter::Filter};

let mut event_client = node_client.event_client().expect("Event client available");
let all_events_filter = EventFilter {
filter: Some(Filter::All(AllFilter {})),
};
let mut event_stream = event_client.stream_events(all_events_filter).await?;
while let Some(Ok(event)) = event_stream.next().await {
// Process IotaEvent
}
```
9 changes: 8 additions & 1 deletion crates/iota-grpc-api/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@

fn main() {
tonic_build::configure()
.compile_protos(&["proto/checkpoint.proto"], &["proto"])
.compile_protos(
&[
"proto/common.proto",
"proto/checkpoint.proto",
"proto/event.proto",
],
&["proto/"],
)
.unwrap();
}
11 changes: 4 additions & 7 deletions crates/iota-grpc-api/proto/checkpoint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

syntax = "proto3";

package iota.grpc;
package iota.grpc.checkpoints;

import "common.proto";

service CheckpointService {
// Checkpoint operations
Expand All @@ -26,15 +28,10 @@ message CheckpointSequenceNumberResponse {
uint64 sequence_number = 1;
}

// BCS-serialized data container
message BcsData {
bytes data = 1;
}

message Checkpoint {
uint64 sequence_number = 1;
// Indicates whether bcs_data contains full CheckpointData (true) or just CertifiedCheckpointSummary (false)
bool is_full = 2;
// BCS-encoded CertifiedCheckpointSummary (default) or CheckpointData (if full=true)
BcsData bcs_data = 3;
iota.grpc.common.BcsData bcs_data = 3;
}
21 changes: 21 additions & 0 deletions crates/iota-grpc-api/proto/common.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";

package iota.grpc.common;

// BCS-serialized data container
message BcsData {
bytes data = 1;
}

// 32-byte address type for IOTA addresses, object IDs, package IDs, etc.
message Address {
bytes address = 1;
}

// 32-byte Transaction digest
message TransactionDigest {
bytes digest = 1;
}
85 changes: 85 additions & 0 deletions crates/iota-grpc-api/proto/event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2025 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";

package iota.grpc.events;

import "common.proto";

service EventService {
rpc StreamEvents (EventStreamRequest) returns (stream Event);
}

message EventStreamRequest {
EventFilter filter = 1;
}

message Event {
EventID event_id = 1;
iota.grpc.common.Address package_id = 2;
string transaction_module = 3;
iota.grpc.common.Address sender = 4;
string type_name = 5;
string parsed_json = 6;
optional uint64 timestamp_ms = 7;
iota.grpc.common.BcsData event_data = 8;
}

message EventID {
uint64 event_seq = 1;
iota.grpc.common.TransactionDigest tx_digest = 2;
}

// Rich event filter that supports gRPC event filtering
message EventFilter {
oneof filter {
AllFilter all = 1;
SenderFilter sender = 2;
TransactionFilter transaction = 3;
MoveModuleFilter move_module = 4;
MoveEventTypeFilter move_event_type = 5;
MoveEventModuleFilter move_event_module = 6;
TimeRangeFilter time_range = 7;
}
}

// Match all events (catch-all filter)
message AllFilter {
// Empty - matches all events
}

// Filter by sender address
message SenderFilter {
iota.grpc.common.Address sender = 1; // Sender address
}

// Filter by transaction digest
message TransactionFilter {
iota.grpc.common.TransactionDigest tx_digest = 1; // Transaction digest
}

// Filter by transaction execution module (different from event definition module)
message MoveModuleFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by Move event type (package::module::event_name)
message MoveEventTypeFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name (e.g., "request")
string name = 3; // Event name (e.g., "RequestEvent")
}

// Filter by package and module
message MoveEventModuleFilter {
iota.grpc.common.Address package_id = 1; // Package ID
string module = 2; // Module name
}

// Filter by timestamp range
message TimeRangeFilter {
uint64 start_time = 1; // Start time in milliseconds since epoch (inclusive)
uint64 end_time = 2; // End time in milliseconds since epoch (exclusive)
}
20 changes: 16 additions & 4 deletions crates/iota-grpc-api/src/checkpoint_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::{pin::Pin, sync::Arc};

use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status};
use tracing::{debug, info};

Expand All @@ -18,6 +19,7 @@ pub struct CheckpointGrpcService {
pub reader: Arc<GrpcReader>,
pub checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
pub checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
pub cancellation_token: CancellationToken,
}

impl CheckpointGrpcService {
Expand All @@ -26,11 +28,13 @@ impl CheckpointGrpcService {
reader: Arc<GrpcReader>,
checkpoint_summary_broadcaster: GrpcCheckpointSummaryBroadcaster,
checkpoint_data_broadcaster: GrpcCheckpointDataBroadcaster,
cancellation_token: CancellationToken,
) -> Self {
Self {
reader,
checkpoint_summary_broadcaster,
checkpoint_data_broadcaster,
cancellation_token,
}
}
}
Expand All @@ -42,8 +46,12 @@ impl CheckpointGrpcService {
end_sequence_number: Option<u64>,
) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
let rx = self.checkpoint_data_broadcaster.subscribe();
self.reader
.create_checkpoint_data_stream(rx, start_sequence_number, end_sequence_number)
self.reader.create_checkpoint_data_stream(
rx,
start_sequence_number,
end_sequence_number,
self.cancellation_token.clone(),
)
}

fn stream_checkpoint_summary(
Expand All @@ -52,8 +60,12 @@ impl CheckpointGrpcService {
end_sequence_number: Option<u64>,
) -> impl futures::Stream<Item = CheckpointStreamResult> + Send {
let rx = self.checkpoint_summary_broadcaster.subscribe();
self.reader
.create_checkpoint_summary_stream(rx, start_sequence_number, end_sequence_number)
self.reader.create_checkpoint_summary_stream(
rx,
start_sequence_number,
end_sequence_number,
self.cancellation_token.clone(),
)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/iota-grpc-api/src/client/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl CheckpointClient {
Ok(stream.map(|result| {
result.and_then(|checkpoint| {
Self::deserialize_checkpoint(&checkpoint).map_err(|e| {
tonic::Status::internal(format!("Failed to deserialize checkpoint: {}", e))
tonic::Status::internal(format!("Failed to deserialize checkpoint: {e}"))
})
})
}))
Expand Down
Loading
Loading