Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/iota-graphql-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tap.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread"] }
tokio-stream.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
toml.workspace = true
tower.workspace = true
Expand Down
86 changes: 86 additions & 0 deletions crates/iota-graphql-rpc/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,16 @@ input EventFilter {
eventType: String
}

"""
Possible responses from a subscription.

It could be one of the following:
- A successful payload from the subscription stream.
- A notice that the subscription has been lagged behind the network with the
number of lost payloads.
"""
union EventSubscriptionPayload = Event | Lagged

"""
The result of an execution, including errors that occurred during said
execution.
Expand Down Expand Up @@ -1695,6 +1705,17 @@ Arbitrary JSON data.
"""
scalar JSON

"""
Notifies that the subscription consumer has fallen behind the live
subscription stream and missed one or more payloads.
"""
type Lagged {
"""
Number of missed payloads since the previous emitted one.
"""
count: Int!
}

"""
Information used by a package to link to a specific version of its
dependency.
Expand Down Expand Up @@ -4279,6 +4300,56 @@ type StorageFund {
nonRefundableBalance: BigInt
}

type Subscription {
"""
Subscribe to incoming transactions from the IOTA network.

If no filter is provided, all transactions will be returned.
"""
transactions(filter: SubscriptionTransactionFilter): TransactionBlockSubscriptionPayload!
"""
Subscribe to incoming events from the IOTA network.

If no filter is provided, all events will be returned.
"""
events(filter: SubscriptionEventFilter): EventSubscriptionPayload!
}

"""
Filter incoming events in a subscription.
"""
input SubscriptionEventFilter @oneOf {
"""
Filter incoming events by emitting module.

- Filter by package: "0x02"
- Filter by module: "0x02::coin"
"""
emittingModule: String
}

"""
Filter incoming transactions in a subscription.
"""
input SubscriptionTransactionFilter @oneOf {
"""
Filter incoming transactions by kind.
"""
kind: TransactionBlockKindInput
"""
Filter incoming transactions by signing address.
"""
signingAddress: IotaAddress
"""
Filter incoming transactions by package, module, or function name.

- Filter by package: "0x03"
- Filter by module: "0x03::iota_system"
- Filter by function: "0x03::iota_system::request_add_stake"
"""
function: String
}

"""
Details of the system that are decided during genesis.
"""
Expand Down Expand Up @@ -4531,6 +4602,16 @@ enum TransactionBlockKindInput {
END_OF_EPOCH_TX
}

"""
Possible responses from a subscription.

It could be one of the following:
- A successful payload from the subscription stream.
- A notice that the subscription has been lagged behind the network with the
number of lost payloads.
"""
union TransactionBlockSubscriptionPayload = TransactionBlock | Lagged

union TransactionInput = OwnedOrImmutable | SharedInput | Receiving | Pure

type TransactionInputConnection {
Expand Down Expand Up @@ -4953,10 +5034,15 @@ Directs the executor to include this field or fragment only when the `if` argume
"""
directive @include(if: Boolean!) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT
"""
Indicates that an Input Object is a OneOf Input Object (and thus requires exactly one of its field be provided)
"""
directive @oneOf on INPUT_OBJECT
"""
Directs the executor to skip this field or fragment when the `if` argument is true.
"""
directive @skip(if: Boolean!) on FIELD | FRAGMENT_SPREAD | INLINE_FRAGMENT
schema {
query: Query
mutation: Mutation
subscription: Subscription
}
88 changes: 73 additions & 15 deletions crates/iota-graphql-rpc/src/server/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,20 @@ use std::{
};

use async_graphql::{
EmptySubscription, Schema, SchemaBuilder,
Data, Schema, SchemaBuilder,
extensions::{ApolloTracing, ExtensionFactory, Tracing},
http::ALL_WEBSOCKET_PROTOCOLS,
};
use async_graphql_axum::{GraphQLRequest, GraphQLResponse};
use async_graphql_axum::{GraphQLProtocol, GraphQLRequest, GraphQLResponse, GraphQLWebSocket};
use axum::{
Extension, Router,
body::Body,
extract::{ConnectInfo, FromRef, Query as AxumQuery, State},
extract::{
ConnectInfo, FromRef, FromRequestParts, Query as AxumQuery, State, ws::WebSocketUpgrade,
},
http::{HeaderMap, StatusCode},
middleware::{self},
response::IntoResponse,
response::{IntoResponse, Response as AxumResponse},
routing::{MethodRouter, Route, get, post},
};
use chrono::Utc;
Expand Down Expand Up @@ -71,6 +74,7 @@ use crate::{
object::IObject,
owner::IOwner,
query::{IotaGraphQLSchema, Query},
subscription::{GraphQLStream, Subscription},
},
};

Expand Down Expand Up @@ -166,7 +170,7 @@ impl Server {

pub(crate) struct ServerBuilder {
state: AppState,
schema: SchemaBuilder<Query, Mutation, EmptySubscription>,
schema: SchemaBuilder<Query, Mutation, Subscription>,
router: Option<Router>,
db_reader: Option<Db>,
resolver: Option<PackageResolver>,
Expand Down Expand Up @@ -239,7 +243,7 @@ impl ServerBuilder {
self
}

fn build_schema(self) -> Schema<Query, Mutation, EmptySubscription> {
fn build_schema(self) -> Schema<Query, Mutation, Subscription> {
self.schema.finish()
}

Expand All @@ -249,7 +253,7 @@ impl ServerBuilder {
self,
) -> (
String,
Schema<Query, Mutation, EmptySubscription>,
Schema<Query, Mutation, Subscription>,
Db,
PackageResolver,
Router,
Expand All @@ -275,9 +279,16 @@ impl ServerBuilder {
if self.router.is_none() {
let router: Router = Router::new()
.route("/", post(graphql_handler))
.route("/subscriptions", get(subscription_handler))
.route("/{version}", post(graphql_handler))
.route("/{version}/subscriptions", get(subscription_handler))
.route("/graphql", post(graphql_handler))
.route("/graphql/subscriptions", get(subscription_handler))
.route("/graphql/{version}", post(graphql_handler))
.route(
"/graphql/{version}/subscriptions",
get(subscription_handler),
)
.route("/health", get(health_check))
.route("/graphql/health", get(health_check))
.route("/graphql/{version}/health", get(health_check))
Expand Down Expand Up @@ -327,8 +338,8 @@ impl ServerBuilder {
info!("Access control allow origin set to: {acl:?}");

let cors = CorsLayer::new()
// Allow `POST` when accessing the resource
.allow_methods([Method::POST])
// Allow `POST` & `GET` when accessing the resource
.allow_methods([Method::POST, Method::GET])
// Allow requests from any origin
.allow_origin(acl)
.allow_headers([hyper::header::CONTENT_TYPE, LIMITS_HEADER.clone()]);
Expand Down Expand Up @@ -479,6 +490,8 @@ impl ServerBuilder {
None
};

let graphql_streams = GraphQLStream::new(&config.connection.db_url, reader).await?;

builder = builder
.context_data(config.service.clone())
.context_data(loader)
Expand All @@ -489,7 +502,8 @@ impl ServerBuilder {
.context_data(iota_names_config)
.context_data(zklogin_config)
.context_data(metrics.clone())
.context_data(config.clone());
.context_data(config.clone())
.context_data(graphql_streams);

if config.internal_features.feature_gate {
builder = builder.extension(FeatureGate);
Expand Down Expand Up @@ -526,8 +540,8 @@ impl ServerBuilder {
}
}

fn schema_builder() -> SchemaBuilder<Query, Mutation, EmptySubscription> {
async_graphql::Schema::build(Query, Mutation, EmptySubscription)
fn schema_builder() -> SchemaBuilder<Query, Mutation, Subscription> {
async_graphql::Schema::build(Query, Mutation, Subscription)
.register_output_type::<IMoveObject>()
.register_output_type::<IObject>()
.register_output_type::<IOwner>()
Expand All @@ -539,9 +553,10 @@ pub fn export_schema() -> String {
schema_builder().finish().sdl()
}

/// Entry point for graphql requests. Each request is stamped with a unique ID,
/// a `ShowUsage` flag if set in the request headers, and the watermark as set
/// by the background task.
/// Entry point for graphql requests.
///
/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
/// request headers, and the watermark as set by the background task.
async fn graphql_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
schema: Extension<IotaGraphQLSchema>,
Expand Down Expand Up @@ -572,6 +587,49 @@ async fn graphql_handler(
(extensions, result.into())
}

/// Entry point for graphql streaming requests.
///
/// Each request is stamped with a unique ID, a `ShowUsage` flag if set in the
/// request headers and tracks the connection information produced by the
/// client.
async fn subscription_handler(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
Extension(schema): Extension<IotaGraphQLSchema>,
req: http::Request<Body>,
) -> AxumResponse {
let headers_contains_show_usage = req.headers().contains_key(ShowUsage::name());
let (mut parts, _body) = req.into_parts();

// extract GraphQL protocol
let protocol = match GraphQLProtocol::from_request_parts(&mut parts, &()).await {
Ok(protocol) => protocol,
Err(err) => return err.into_response(),
};

// extract WebSocket upgrade from request
let upgrade = match WebSocketUpgrade::from_request_parts(&mut parts, &()).await {
Ok(upgrade) => upgrade,
Err(err) => return err.into_response(),
};

let resp = upgrade
.protocols(ALL_WEBSOCKET_PROTOCOLS)
.on_upgrade(move |stream| async move {
// create connection data with per-connection values
let mut connection_data = Data::default();
connection_data.insert(Uuid::new_v4());
if headers_contains_show_usage {
connection_data.insert(ShowUsage)
}
connection_data.insert(addr);

let connection =
GraphQLWebSocket::new(stream, schema, protocol).with_data(connection_data);
connection.serve().await;
});
resp
}

#[derive(Clone)]
struct MetricsMakeCallbackHandler {
metrics: Metrics,
Expand Down
4 changes: 3 additions & 1 deletion crates/iota-graphql-rpc/src/server/graphiql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ async fn graphiql(
} else {
"/graphql".to_string()
};
let gq = async_graphql::http::GraphiQLSource::build().endpoint(&endpoint);
let gq = async_graphql::http::GraphiQLSource::build()
.endpoint(&endpoint)
.subscription_endpoint("/subscriptions");
if let axum::Extension(Some(title)) = ide_title {
axum::response::Html(gq.title(&title).finish())
} else {
Expand Down
2 changes: 1 addition & 1 deletion crates/iota-graphql-rpc/src/types/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl Event {
})
}

fn try_from_stored_event(
pub(crate) fn try_from_stored_event(
stored: StoredEvent,
checkpoint_viewed_at: u64,
) -> Result<Self, Error> {
Expand Down
1 change: 1 addition & 0 deletions crates/iota-graphql-rpc/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub(crate) mod safe_mode;
pub(crate) mod stake;
pub(crate) mod storage_fund;
pub(crate) mod string_input;
pub(crate) mod subscription;
pub(crate) mod system_parameters;
pub(crate) mod system_state_summary;
pub(crate) mod transaction_block;
Expand Down
3 changes: 2 additions & 1 deletion crates/iota-graphql-rpc/src/types/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::{
object::{self, Object, ObjectFilter},
owner::Owner,
protocol_config::ProtocolConfigs,
subscription::Subscription,
transaction_block::{self, TransactionBlock, TransactionBlockFilter},
transaction_metadata::TransactionMetadata,
type_filter::ExactTypeFilter,
Expand All @@ -53,7 +54,7 @@ use crate::{
};

pub(crate) struct Query;
pub(crate) type IotaGraphQLSchema = async_graphql::Schema<Query, Mutation, EmptySubscription>;
pub(crate) type IotaGraphQLSchema = async_graphql::Schema<Query, Mutation, Subscription>;

#[Object]
impl Query {
Expand Down
Loading
Loading