Skip to content

Commit 4b98630

Browse files
committed
feat(gRPC): add improved transaction filter (#9086)
# Description of change This PR adds a new transaction filter and a new event filter for gRPC (not used yet by the API). It combines the two existing json-rpc filters into one, but also adds the possibility to filter for transactions that match a certain event filter. Also, the possibility to chain the filters was added (`Any`, `All`, `Or`, `And`) similar to the event filters. ## How the change has been tested - [x] Basic tests (linting, compilation, formatting, unit/integration tests) - [ ] Patch-specific tests (correctness, functionality coverage) - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] I have checked that new and existing unit tests pass locally with my changes
1 parent 9b11b25 commit 4b98630

File tree

4 files changed

+292
-2
lines changed

4 files changed

+292
-2
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// Modifications Copyright (c) 2025 IOTA Stiftung
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
use iota_json_rpc_types::{Filter, IotaEvent};
6+
use iota_metrics::monitored_scope;
7+
use iota_types::{
8+
base_types::{IotaAddress, ObjectID},
9+
error::IotaResult,
10+
};
11+
use move_core_types::{identifier::Identifier, language_storage::StructTag};
12+
use serde::{Deserialize, Serialize};
13+
use serde_json::Value;
14+
15+
#[derive(Clone, Debug, Serialize, Deserialize)]
16+
pub enum GrpcEventFilter {
17+
// Logical AND of several filters.
18+
All(Vec<GrpcEventFilter>),
19+
// Logical OR of several filters.
20+
Any(Vec<GrpcEventFilter>),
21+
// Logical NOT of a filter.
22+
Not(Box<GrpcEventFilter>),
23+
24+
/// Filter by sender address.
25+
Sender(IotaAddress),
26+
27+
/// Return events emitted in a specified Move package + module (optional).
28+
/// If the event is defined in PackageA::ModuleA but emitted in a tx with
29+
/// PackageB::ModuleB, filtering `MovePackageAndModule` by PackageB::ModuleB
30+
/// returns the event. Filtering `MoveEventPackageAndModule` by
31+
/// PackageA::ModuleA returns the event too.
32+
MovePackageAndModule {
33+
/// the Move package ID
34+
package: ObjectID,
35+
/// the module name (optional)
36+
module: Option<Identifier>,
37+
},
38+
/// Return events with the given Move package + module (optional) where the
39+
/// event struct is defined. If the event is defined in
40+
/// PackageA::ModuleA but emitted in a tx with PackageB::ModuleB, filtering
41+
/// `MoveEventPackageAndModule` by PackageA::ModuleA returns the
42+
/// event. Filtering `MovePackageAndModule` by PackageB::ModuleB returns the
43+
/// event too.
44+
MoveEventPackageAndModule {
45+
/// the Move package ID
46+
package: ObjectID,
47+
/// the module name (optional)
48+
module: Option<Identifier>,
49+
},
50+
/// Return events with the given Move event struct name (struct tag).
51+
/// For example, if the event is defined in `0xabcd::MyModule`, and named
52+
/// `Foo`, then the struct tag is `0xabcd::MyModule::Foo`.
53+
MoveEventType(StructTag),
54+
/// Return events whose JSON representation contains the given field path
55+
/// with the specified value (optional). The path should be a JSON pointer
56+
/// as defined in RFC 6901.
57+
MoveEventField {
58+
path: String,
59+
value: Option<Value>,
60+
},
61+
}
62+
63+
impl GrpcEventFilter {
64+
fn try_matches(&self, item: &IotaEvent) -> IotaResult<bool> {
65+
Ok(match self {
66+
GrpcEventFilter::All(filters) => filters.iter().all(|f| f.matches(item)),
67+
GrpcEventFilter::Any(filters) => filters.iter().any(|f| f.matches(item)),
68+
GrpcEventFilter::Not(filter) => !filter.matches(item),
69+
70+
GrpcEventFilter::Sender(sender) => &item.sender == sender,
71+
72+
GrpcEventFilter::MovePackageAndModule { package, module } => {
73+
&item.package_id == package
74+
&& (module.is_none()
75+
|| matches!(module, Some(m2) if m2 == &item.transaction_module))
76+
}
77+
GrpcEventFilter::MoveEventPackageAndModule { package, module } => {
78+
&ObjectID::from(item.type_.address) == package
79+
&& (module.is_none() || matches!(module, Some(m2) if m2 == &item.type_.module))
80+
}
81+
GrpcEventFilter::MoveEventType(event_type) => &item.type_ == event_type,
82+
GrpcEventFilter::MoveEventField { path, value } => {
83+
let json_ptr_value = item.parsed_json.pointer(path);
84+
if value.is_none() {
85+
// If no value is specified, just check for the existence of the field.
86+
json_ptr_value.is_some()
87+
} else {
88+
matches!(json_ptr_value, Some(v) if v == value.as_ref().unwrap())
89+
}
90+
}
91+
})
92+
}
93+
94+
pub fn and(self, other_filter: GrpcEventFilter) -> Self {
95+
Self::All(vec![self, other_filter])
96+
}
97+
pub fn or(self, other_filter: GrpcEventFilter) -> Self {
98+
Self::Any(vec![self, other_filter])
99+
}
100+
}
101+
102+
impl Filter<IotaEvent> for GrpcEventFilter {
103+
fn matches(&self, item: &IotaEvent) -> bool {
104+
let _scope = monitored_scope("GrpcEventFilter::matches");
105+
self.try_matches(item).unwrap_or_default()
106+
}
107+
}

crates/iota-core/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub(crate) mod consensus_types;
1919
pub mod consensus_validator;
2020
pub mod db_checkpoint_handler;
2121
pub mod epoch;
22+
pub mod event_filter;
2223
pub mod execution_cache;
2324
mod execution_driver;
2425
mod fallback_fetch;
@@ -42,6 +43,7 @@ pub mod streamer;
4243
pub mod subscription_handler;
4344
pub mod test_utils;
4445
pub mod traffic_controller;
46+
pub mod transaction_filter;
4547
mod transaction_input_loader;
4648
mod transaction_manager;
4749
pub mod transaction_orchestrator;

crates/iota-core/src/subscription_handler.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use prometheus::{
1616
use tokio_stream::Stream;
1717
use tracing::{error, instrument, trace};
1818

19-
use crate::streamer::Streamer;
19+
use crate::{
20+
streamer::Streamer,
21+
transaction_filter::{GrpcTransactionFilter, TransactionDataWithEffectsAndEvents},
22+
};
2023

2124
#[cfg(test)]
2225
#[path = "unit_tests/subscription_handler_tests.rs"]
@@ -70,14 +73,28 @@ pub struct SubscriptionHandler {
7073
event_streamer: Streamer<IotaEvent, IotaEvent, EventFilter>,
7174
transaction_streamer:
7275
Streamer<EffectsWithInput, IotaTransactionBlockEffects, TransactionFilter>,
76+
grpc_transaction_streamer: Streamer<
77+
TransactionDataWithEffectsAndEvents,
78+
IotaTransactionBlockEffects,
79+
GrpcTransactionFilter,
80+
>,
7381
}
7482

7583
impl SubscriptionHandler {
7684
pub fn new(registry: &Registry) -> Self {
7785
let metrics = Arc::new(SubscriptionMetrics::new(registry));
7886
Self {
7987
event_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics.clone(), "event"),
80-
transaction_streamer: Streamer::spawn(EVENT_DISPATCH_BUFFER_SIZE, metrics, "tx"),
88+
transaction_streamer: Streamer::spawn(
89+
EVENT_DISPATCH_BUFFER_SIZE,
90+
metrics.clone(),
91+
"tx",
92+
),
93+
grpc_transaction_streamer: Streamer::spawn(
94+
EVENT_DISPATCH_BUFFER_SIZE,
95+
metrics,
96+
"grpc_tx",
97+
),
8198
}
8299
}
83100
}
@@ -96,6 +113,17 @@ impl SubscriptionHandler {
96113
"Processing tx/event subscription"
97114
);
98115

116+
if let Err(e) =
117+
self.grpc_transaction_streamer
118+
.try_send(TransactionDataWithEffectsAndEvents {
119+
tx_data: input.clone(),
120+
effects: effects.clone(),
121+
events: events.clone(),
122+
})
123+
{
124+
error!(error =? e, "Failed to send transaction to grpc dispatch");
125+
}
126+
99127
if let Err(e) = self.transaction_streamer.try_send(EffectsWithInput {
100128
input: input.clone(),
101129
effects: effects.clone(),
@@ -123,4 +151,11 @@ impl SubscriptionHandler {
123151
) -> impl Stream<Item = IotaTransactionBlockEffects> {
124152
self.transaction_streamer.subscribe(filter)
125153
}
154+
155+
pub fn subscribe_grpc_transactions(
156+
&self,
157+
filter: GrpcTransactionFilter,
158+
) -> impl Stream<Item = IotaTransactionBlockEffects> {
159+
self.grpc_transaction_streamer.subscribe(filter)
160+
}
126161
}
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
// Copyright (c) Mysten Labs, Inc.
2+
// Modifications Copyright (c) 2025 IOTA Stiftung
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
use iota_json_rpc_types::{
6+
Filter, IotaTransactionBlockEffects, IotaTransactionBlockEffectsAPI,
7+
IotaTransactionBlockEvents, IotaTransactionKind, OwnedObjectRef,
8+
};
9+
use iota_metrics::monitored_scope;
10+
use iota_types::{
11+
base_types::{IotaAddress, ObjectID},
12+
object::Owner,
13+
transaction::{TransactionData, TransactionDataAPI},
14+
};
15+
use serde::{Deserialize, Serialize};
16+
17+
use crate::event_filter::GrpcEventFilter;
18+
19+
#[derive(Clone)]
20+
pub struct TransactionDataWithEffectsAndEvents {
21+
pub tx_data: TransactionData,
22+
pub effects: IotaTransactionBlockEffects,
23+
pub events: IotaTransactionBlockEvents,
24+
}
25+
26+
impl From<TransactionDataWithEffectsAndEvents> for IotaTransactionBlockEffects {
27+
fn from(e: TransactionDataWithEffectsAndEvents) -> Self {
28+
e.effects
29+
}
30+
}
31+
32+
#[derive(Clone, Debug, Serialize, Deserialize)]
33+
pub enum GrpcTransactionFilter {
34+
// Logical AND of several filters.
35+
All(Vec<GrpcTransactionFilter>),
36+
// Logical OR of several filters.
37+
Any(Vec<GrpcTransactionFilter>),
38+
// Logical NOT of a filter.
39+
Not(Box<GrpcTransactionFilter>),
40+
41+
/// Filter transactions of any given kind in the input.
42+
TransactionKind(Vec<IotaTransactionKind>),
43+
44+
/// Filter by sender address.
45+
Sender(IotaAddress),
46+
/// Filter by recipient address. The recipient is determined by
47+
/// checking the owners of mutated and unwrapped objects.
48+
Receiver(IotaAddress),
49+
50+
/// Filter by input object.
51+
InputObject(ObjectID),
52+
/// Filter by changed object, including created, mutated and unwrapped
53+
/// objects.
54+
ChangedObject(ObjectID),
55+
/// Filter transactions that wrapped or deleted the specified object.
56+
/// Includes transactions that either created and immediately wrapped
57+
/// the object or unwrapped and immediately deleted it.
58+
/// TODO: @infra: do we need that now that we have the AffectedObject
59+
/// filter?
60+
WrappedOrDeletedObject(ObjectID),
61+
/// Filter for transactions that touch this object.
62+
AffectedObject(ObjectID),
63+
64+
/// Filter by move package, module (optional) and function (optional).
65+
MoveCall {
66+
/// the Move package ID
67+
package: ObjectID,
68+
/// the module name
69+
module: Option<String>,
70+
/// the function name
71+
function: Option<String>,
72+
},
73+
74+
/// Filter transactions that contain events matching the given event filter.
75+
Events(GrpcEventFilter),
76+
}
77+
78+
impl Filter<TransactionDataWithEffectsAndEvents> for GrpcTransactionFilter {
79+
fn matches(&self, item: &TransactionDataWithEffectsAndEvents) -> bool {
80+
let _scope = monitored_scope("GrpcTransactionFilter::matches");
81+
match self {
82+
GrpcTransactionFilter::All(filters) => filters.iter().all(|f| f.matches(item)),
83+
GrpcTransactionFilter::Any(filters) => filters.iter().any(|f| f.matches(item)),
84+
GrpcTransactionFilter::Not(filter) => !filter.matches(item),
85+
86+
GrpcTransactionFilter::TransactionKind(kinds) => kinds
87+
.iter()
88+
.any(|kind| kind == &IotaTransactionKind::from(item.tx_data.kind())),
89+
90+
GrpcTransactionFilter::Sender(a) => &item.tx_data.sender() == a,
91+
GrpcTransactionFilter::Receiver(a) => {
92+
let mutated: &[OwnedObjectRef] = item.effects.mutated();
93+
mutated.iter().chain(item.effects.unwrapped().iter()).any(|oref: &OwnedObjectRef| {
94+
matches!(oref.owner, Owner::AddressOwner(owner) if owner == *a)
95+
})
96+
}
97+
98+
GrpcTransactionFilter::InputObject(o) => {
99+
let Ok(input_objects) = item.tx_data.input_objects() else {
100+
return false;
101+
};
102+
input_objects.iter().any(|object| object.object_id() == *o)
103+
}
104+
GrpcTransactionFilter::ChangedObject(o) => item
105+
.effects
106+
.mutated()
107+
.iter()
108+
.any(|oref: &OwnedObjectRef| &oref.reference.object_id == o),
109+
GrpcTransactionFilter::WrappedOrDeletedObject(o) => item
110+
.effects
111+
.wrapped()
112+
.iter()
113+
.chain(item.effects.deleted().iter())
114+
.chain(item.effects.unwrapped_then_deleted().iter())
115+
.any(|oref| &oref.object_id == o),
116+
GrpcTransactionFilter::AffectedObject(o) => item
117+
.effects
118+
.created()
119+
.iter()
120+
.chain(item.effects.mutated().iter())
121+
.chain(item.effects.unwrapped().iter())
122+
.map(|oref: &OwnedObjectRef| &oref.reference)
123+
.chain(item.effects.shared_objects().iter())
124+
.chain(item.effects.deleted().iter())
125+
.chain(item.effects.unwrapped_then_deleted().iter())
126+
.chain(item.effects.wrapped().iter())
127+
.any(|oref| &oref.object_id == o),
128+
129+
GrpcTransactionFilter::MoveCall {
130+
package,
131+
module,
132+
function,
133+
} => item.tx_data.move_calls().into_iter().any(|(p, m, f)| {
134+
p == package
135+
&& (module.is_none() || matches!(module, Some(m2) if m2 == &m.to_string()))
136+
&& (function.is_none() || matches!(function, Some(f2) if f2 == &f.to_string()))
137+
}),
138+
139+
GrpcTransactionFilter::Events(event_filter) => item
140+
.events
141+
.data
142+
.iter()
143+
.any(|event| event_filter.matches(event)),
144+
}
145+
}
146+
}

0 commit comments

Comments
 (0)