|  | 
|  | 1 | +use std::{ | 
|  | 2 | +    sync::Arc, | 
|  | 3 | +    time::{Duration, Instant, SystemTime, UNIX_EPOCH}, | 
|  | 4 | +}; | 
|  | 5 | + | 
|  | 6 | +use async_trait::async_trait; | 
|  | 7 | +use graphql_parser::schema::Document; | 
|  | 8 | +use hive_console_sdk::agent::{ExecutionReport, UsageAgent}; | 
|  | 9 | +use hive_router_config::{usage_reporting::UsageReportingConfig, HiveRouterConfig}; | 
|  | 10 | +use hive_router_plan_executor::execution::plan::{ClientRequestDetails, PlanExecutionOutput}; | 
|  | 11 | +use ntex::web::HttpRequest; | 
|  | 12 | +use rand::Rng; | 
|  | 13 | +use tokio_util::sync::CancellationToken; | 
|  | 14 | + | 
|  | 15 | +use crate::background_tasks::BackgroundTask; | 
|  | 16 | + | 
|  | 17 | +pub fn from_config(router_config: &HiveRouterConfig) -> Option<UsageAgent> { | 
|  | 18 | +    router_config.usage_reporting.as_ref().map(|usage_config| { | 
|  | 19 | +        let flush_interval = Duration::from_secs(usage_config.flush_interval); | 
|  | 20 | +        hive_console_sdk::agent::UsageAgent::new( | 
|  | 21 | +            usage_config.token.clone(), | 
|  | 22 | +            usage_config.endpoint.clone(), | 
|  | 23 | +            usage_config.target_id.clone(), | 
|  | 24 | +            usage_config.buffer_size, | 
|  | 25 | +            usage_config.connect_timeout, | 
|  | 26 | +            usage_config.request_timeout, | 
|  | 27 | +            usage_config.accept_invalid_certs, | 
|  | 28 | +            flush_interval, | 
|  | 29 | +            "hive-router".to_string(), | 
|  | 30 | +        ) | 
|  | 31 | +    }) | 
|  | 32 | +} | 
|  | 33 | + | 
|  | 34 | +pub fn send_usage_report( | 
|  | 35 | +    schema: Arc<Document<'static, String>>, | 
|  | 36 | +    start: Instant, | 
|  | 37 | +    req: &HttpRequest, | 
|  | 38 | +    client_request_details: &ClientRequestDetails, | 
|  | 39 | +    usage_agent: &UsageAgent, | 
|  | 40 | +    usage_config: &UsageReportingConfig, | 
|  | 41 | +    execution_result: &PlanExecutionOutput, | 
|  | 42 | +) { | 
|  | 43 | +    let mut rng = rand::rng(); | 
|  | 44 | +    let sampled = rng.random::<f64>() < usage_config.sample_rate; | 
|  | 45 | +    if !sampled { | 
|  | 46 | +        return; | 
|  | 47 | +    } | 
|  | 48 | +    if client_request_details | 
|  | 49 | +        .operation | 
|  | 50 | +        .name | 
|  | 51 | +        .is_some_and(|op_name| usage_config.exclude.contains(&op_name.to_string())) | 
|  | 52 | +    { | 
|  | 53 | +        return; | 
|  | 54 | +    } | 
|  | 55 | +    let client_name = get_header_value(req, &usage_config.client_name_header); | 
|  | 56 | +    let client_version = get_header_value(req, &usage_config.client_version_header); | 
|  | 57 | +    let timestamp = SystemTime::now() | 
|  | 58 | +        .duration_since(UNIX_EPOCH) | 
|  | 59 | +        .unwrap() | 
|  | 60 | +        .as_secs() | 
|  | 61 | +        * 1000; | 
|  | 62 | +    let duration = start.elapsed(); | 
|  | 63 | +    let execution_report = ExecutionReport { | 
|  | 64 | +        schema, | 
|  | 65 | +        client_name, | 
|  | 66 | +        client_version, | 
|  | 67 | +        timestamp, | 
|  | 68 | +        duration, | 
|  | 69 | +        ok: execution_result.error_count == 0, | 
|  | 70 | +        errors: execution_result.error_count, | 
|  | 71 | +        operation_body: client_request_details.operation.query.to_owned(), | 
|  | 72 | +        operation_name: client_request_details | 
|  | 73 | +            .operation | 
|  | 74 | +            .name | 
|  | 75 | +            .map(|op_name| op_name.to_owned()), | 
|  | 76 | +        persisted_document_hash: None, | 
|  | 77 | +    }; | 
|  | 78 | +    usage_agent | 
|  | 79 | +        .add_report(execution_report) | 
|  | 80 | +        .unwrap_or_else(|err| tracing::error!("Failed to send usage report: {}", err)); | 
|  | 81 | +} | 
|  | 82 | + | 
|  | 83 | +fn get_header_value(req: &HttpRequest, header_name: &str) -> Option<String> { | 
|  | 84 | +    req.headers() | 
|  | 85 | +        .get(header_name) | 
|  | 86 | +        .and_then(|v| v.to_str().ok()) | 
|  | 87 | +        .map(|s| s.to_string()) | 
|  | 88 | +} | 
|  | 89 | + | 
|  | 90 | +#[async_trait] | 
|  | 91 | +impl BackgroundTask for UsageAgent { | 
|  | 92 | +    fn id(&self) -> &str { | 
|  | 93 | +        "usage_report_flush_interval" | 
|  | 94 | +    } | 
|  | 95 | + | 
|  | 96 | +    async fn run(&self, token: CancellationToken) { | 
|  | 97 | +        self.start_flush_interval(Some(token)).await | 
|  | 98 | +    } | 
|  | 99 | +} | 
0 commit comments