Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ lto = true
codegen-units = 1

[workspace.dependencies]
graphql-tools = "0.4.0"
graphql-parser = "0.4.1"
graphql-parser = { version = "0.5.0", package = "graphql-parser-hive-fork" }
Copy link
Member

@dotansimha dotansimha Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why use the custom one? we dropped it on purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SDK uses this one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we need to fix that in SDK. I don't think the SDK has any reason to still use it now.
Router shouldn't use this custom one

graphql-tools = { version = "0.4.0", features = [
"graphql_parser_fork",
], default-features = false }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.142"
sonic-rs = "0.5.3"
Expand Down
1 change: 1 addition & 0 deletions bin/router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ path = "src/main.rs"
hive-router-query-planner = { path = "../../lib/query-planner", version = "2.0.2" }
hive-router-plan-executor = { path = "../../lib/executor", version = "6.0.0" }
hive-router-config = { path = "../../lib/router-config", version = "0.0.10" }
hive-console-sdk = "0.0.0"

tokio = { workspace = true }
futures = { workspace = true }
Expand Down
16 changes: 14 additions & 2 deletions bin/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
},
jwt::JwtAuthRuntime,
logger::configure_logging,
pipeline::graphql_request_handler,
pipeline::{graphql_request_handler, usage_reporting::create_hive_user_agent},
};

pub use crate::{schema_state::SchemaState, shared_state::RouterSharedState};
Expand Down Expand Up @@ -110,12 +110,24 @@ pub async fn configure_app_from_config(
true => Some(JwtAuthRuntime::init(bg_tasks_manager, &router_config.jwt).await?),
false => None,
};
let usage_agent = router_config
.usage_reporting
.as_ref()
.map(|usage_config| Arc::new(create_hive_user_agent(usage_config)));

if let Some(usage_agent) = &usage_agent {
bg_tasks_manager.register_task(usage_agent.clone());
}

let router_config_arc = Arc::new(router_config);
let schema_state =
SchemaState::new_from_config(bg_tasks_manager, router_config_arc.clone()).await?;
let schema_state_arc = Arc::new(schema_state);
let shared_state = Arc::new(RouterSharedState::new(router_config_arc, jwt_runtime)?);
let shared_state = Arc::new(RouterSharedState::new(
router_config_arc,
jwt_runtime,
usage_agent,
)?);

Ok((shared_state, schema_state_arc))
}
Expand Down
25 changes: 24 additions & 1 deletion bin/router/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{sync::Arc, time::Instant};

use hive_router_plan_executor::execution::{
client_request_details::{ClientRequestDetails, JwtRequestDetails, OperationDetails},
Expand Down Expand Up @@ -46,6 +46,7 @@ pub mod normalize;
pub mod parser;
pub mod progressive_override;
pub mod query_plan;
pub mod usage_reporting;
pub mod validation;

static GRAPHIQL_HTML: &str = include_str!("../../static/graphiql.html");
Expand Down Expand Up @@ -111,6 +112,7 @@ pub async fn execute_pipeline(
shared_state: &Arc<RouterSharedState>,
schema_state: &Arc<SchemaState>,
) -> Result<PlanExecutionOutput, PipelineError> {
let start = Instant::now();
perform_csrf_prevention(req, &shared_state.router_config.csrf)?;

let mut execution_request = get_execution_request(req, body_bytes).await?;
Expand Down Expand Up @@ -190,5 +192,26 @@ pub async fn execute_pipeline(
)
.await?;

shared_state
.hive_usage_agent
.as_ref()
.and_then(|usage_agent| {
shared_state
.router_config
.usage_reporting
.as_ref()
.map(|usage_config| {
usage_reporting::collect_usage_report(
supergraph.supergraph_schema.clone(),
start.elapsed(),
req,
&client_request_details,
usage_agent,
usage_config,
&execution_result,
)
})
});

Ok(execution_result)
}
96 changes: 96 additions & 0 deletions bin/router/src/pipeline/usage_reporting.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::{
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
};

use async_trait::async_trait;
use graphql_parser::schema::Document;
use hive_console_sdk::agent::{ExecutionReport, UsageAgent};
use hive_router_config::usage_reporting::UsageReportingConfig;
use hive_router_plan_executor::execution::{
client_request_details::ClientRequestDetails, plan::PlanExecutionOutput,
};
use ntex::web::HttpRequest;
use rand::Rng;
use tokio_util::sync::CancellationToken;

use crate::{background_tasks::BackgroundTask, consts::ROUTER_VERSION};

pub fn create_hive_user_agent(usage_config: &UsageReportingConfig) -> UsageAgent {
let user_agent = format!("hive-router/{}", ROUTER_VERSION);
hive_console_sdk::agent::UsageAgent::new(
usage_config.access_token.clone(),
usage_config.endpoint.clone(),
usage_config.target_id.clone(),
usage_config.buffer_size,
usage_config.connect_timeout.as_secs(),
usage_config.request_timeout.as_secs(),
usage_config.accept_invalid_certs,
usage_config.flush_interval,
user_agent,
)
}

#[inline]
pub fn collect_usage_report(
schema: Arc<Document<'static, String>>,
duration: Duration,
req: &HttpRequest,
client_request_details: &ClientRequestDetails,
usage_agent: &UsageAgent,
usage_config: &UsageReportingConfig,
execution_result: &PlanExecutionOutput,
) {
let mut rng = rand::rng();
let sampled = rng.random::<f64>() < usage_config.sample_rate;
if !sampled {
return;
}
if client_request_details
.operation
.name
.is_some_and(|op_name| usage_config.exclude.contains(&op_name.to_string()))
{
return;
}
let client_name = get_header_value(req, &usage_config.client_name_header);
let client_version = get_header_value(req, &usage_config.client_version_header);
let timestamp = SystemTime::now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be as_millis instead of sec*1000

.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64;
let execution_report = ExecutionReport {
schema,
client_name: client_name.map(|s| s.to_owned()),
client_version: client_version.map(|s| s.to_owned()),
timestamp,
duration,
ok: execution_result.error_count == 0,
errors: execution_result.error_count,
operation_body: client_request_details.operation.query.to_owned(),
operation_name: client_request_details
.operation
.name
.map(|op_name| op_name.to_owned()),
persisted_document_hash: None,
};

if let Err(err) = usage_agent.add_report(execution_report) {
tracing::error!("Failed to send usage report: {}", err);
}
}

fn get_header_value<'req>(req: &'req HttpRequest, header_name: &str) -> Option<&'req str> {
req.headers().get(header_name).and_then(|v| v.to_str().ok())
}

#[async_trait]
impl BackgroundTask for UsageAgent {
fn id(&self) -> &str {
"hive_console_usage_report_task"
}

async fn run(&self, token: CancellationToken) {
self.start_flush_interval(Some(token)).await
}
}
3 changes: 3 additions & 0 deletions bin/router/src/schema_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use arc_swap::{ArcSwap, Guard};
use async_trait::async_trait;
use graphql_parser::schema::Document;
use graphql_tools::validation::utils::ValidationError;
use hive_router_config::{supergraph::SupergraphSource, HiveRouterConfig};
use hive_router_plan_executor::{
Expand Down Expand Up @@ -39,6 +40,7 @@ pub struct SupergraphData {
pub metadata: SchemaMetadata,
pub planner: Planner,
pub subgraph_executor_map: SubgraphExecutorMap,
pub supergraph_schema: Arc<Document<'static, String>>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -124,6 +126,7 @@ impl SchemaState {
)?;

Ok(SupergraphData {
supergraph_schema: Arc::new(parsed_supergraph_sdl),
metadata,
planner,
subgraph_executor_map,
Expand Down
Loading
Loading