Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lib/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ ahash = "0.8.12"
regex-automata = "0.4.10"
vrl = { version = "0.27.0", features = ["compiler", "parser", "value", "diagnostic", "stdlib", "core"] }

ntex = { version = "2", features = ["tokio"] }
ntex-http = "0.1.15"
hyper-tls = { version = "0.6.0", features = ["vendored"] }
hyper-util = { version = "0.1.16", features = [
Expand All @@ -47,6 +48,7 @@ itoa = "1.0.15"
ryu = "1.0.20"
indexmap = "2.10.0"
bumpalo = "3.19.0"
redis = "0.32.7"

[dev-dependencies]
subgraphs = { path = "../../bench/subgraphs" }
Expand Down
2 changes: 2 additions & 0 deletions lib/executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ pub mod executors;
pub mod headers;
pub mod introspection;
pub mod json_writer;
pub mod plugins;
pub mod projection;
pub mod response;
pub mod utils;
pub mod variables;

pub use execution::plan::execute_query_plan;
pub use executors::map::SubgraphExecutorMap;
pub use plugins::*;
2 changes: 2 additions & 0 deletions lib/executor/src/plugins/examples/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod response_cache;

Check warning on line 1 in lib/executor/src/plugins/examples/mod.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/examples/mod.rs
pub mod subgraph_response_cache;
114 changes: 114 additions & 0 deletions lib/executor/src/plugins/examples/response_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use dashmap::DashMap;
use ntex::web::HttpResponse;
use redis::Commands;

Check warning on line 3 in lib/executor/src/plugins/examples/response_cache.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/examples/response_cache.rs

use crate::{
hooks::{on_execute::OnExecutePayload, on_schema_reload::OnSchemaReloadPayload}, plugins::plugin_trait::{
ControlFlow, RouterPlugin
}, utils::consts::TYPENAME_FIELD_NAME
};

pub struct ResponseCachePlugin {
redis_client: redis::Client,
ttl_per_type: DashMap<String, u64>,
}

impl ResponseCachePlugin {
pub fn try_new(redis_url: &str) -> Result<Self, redis::RedisError> {
let redis_client = redis::Client::open(redis_url)?;
Ok(Self {
redis_client,
ttl_per_type: DashMap::new(),
})
}
}

impl RouterPlugin for ResponseCachePlugin {
fn on_execute<'exec>(
&self,
payload: OnExecutePayload<'exec>,
) -> ControlFlow<'exec, OnExecutePayload<'exec>> {
let key = format!(
"response_cache:{}:{:?}",
payload.query_plan, payload.variable_values
);
if let Ok(mut conn) = self.redis_client.get_connection() {
let cached_response: Option<Vec<u8>> = conn.get(&key).ok();
if let Some(cached_response) = cached_response {
return ControlFlow::Break(
HttpResponse::Ok()
.header("Content-Type", "application/json")
.body(cached_response),
);
}
ControlFlow::OnEnd(Box::new(move |payload: OnExecutePayload| {

Check failure on line 44 in lib/executor/src/plugins/examples/response_cache.rs

View workflow job for this annotation

GitHub Actions / clippy

unnecessary operation
// Do not cache if there are errors
if !payload.errors.is_empty() {
return ControlFlow::Continue;
}

if let Ok(serialized) = sonic_rs::to_vec(&payload.data) {
// Decide on the ttl somehow
// Get the type names
let mut max_ttl = 0;

// Imagine this code is traversing the response data to find type names
if let Some(obj) = payload.data.as_object() {
if let Some(typename) = obj
.iter()
.position(|(k, _)| k == &TYPENAME_FIELD_NAME)
.and_then(|idx| obj[idx].1.as_str())
{
if let Some(ttl) = self.ttl_per_type.get(typename).map(|v| *v) {
max_ttl = max_ttl.max(ttl);
}
}
}

// If no ttl found, default to 60 seconds
if max_ttl == 0 {
max_ttl = 60;
}

// Insert the ttl into extensions for client awareness
payload
.extensions
.insert("response_cache_ttl".to_string(), sonic_rs::json!(max_ttl));

// Set the cache with the decided ttl
let _: () = conn.set_ex(key, serialized, max_ttl).unwrap_or(());
}
ControlFlow::Continue
}));
}
ControlFlow::Continue
}
fn on_schema_reload(&self, payload: OnSchemaReloadPayload) {
// Visit the schema and update ttl_per_type based on some directive
payload
.new_schema
.document
.definitions
.iter()
.for_each(|def| {
if let graphql_parser::schema::Definition::TypeDefinition(type_def) = def {
if let graphql_parser::schema::TypeDefinition::Object(obj_type) = type_def {

Check failure on line 95 in lib/executor/src/plugins/examples/response_cache.rs

View workflow job for this annotation

GitHub Actions / clippy

this `if let` can be collapsed into the outer `if let`
for directive in &obj_type.directives {
if directive.name == "cacheControl" {
for arg in &directive.arguments {
if arg.0 == "maxAge" {
if let graphql_parser::query::Value::Int(max_age) = &arg.1 {
if let Some(max_age) = max_age.as_i64() {
self.ttl_per_type
.insert(obj_type.name.clone(), max_age as u64);
}
}
}
}
}
}
}
}
});
}
}
30 changes: 30 additions & 0 deletions lib/executor/src/plugins/examples/subgraph_response_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use dashmap::DashMap;

Check warning on line 1 in lib/executor/src/plugins/examples/subgraph_response_cache.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/examples/subgraph_response_cache.rs

use crate::{executors::dedupe::SharedResponse, hooks::on_subgraph_http_request::{OnSubgraphHttpRequestPayload, OnSubgraphHttpResponsePayload}, plugin_trait::{ControlFlow, RouterPlugin}};

pub struct SubgraphResponseCachePlugin {
cache: DashMap<String, SharedResponse>,
}

Check warning on line 8 in lib/executor/src/plugins/examples/subgraph_response_cache.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/examples/subgraph_response_cache.rs
impl RouterPlugin for SubgraphResponseCachePlugin {
fn on_subgraph_http_request<'exec>(
&'static self,
payload: OnSubgraphHttpRequestPayload<'exec>,
) -> ControlFlow<'exec, OnSubgraphHttpResponsePayload<'exec>> {
let key = format!(
"subgraph_response_cache:{}:{:?}",
payload.execution_request.query, payload.execution_request.variables
);
if let Some(cached_response) = self.cache.get(&key) {
// Here payload.response is Option
// So it is bypassing the actual subgraph request
*payload.response = Some(cached_response.clone());
return ControlFlow::Continue;
}
ControlFlow::OnEnd(Box::new(move |payload: OnSubgraphHttpResponsePayload| {
// Here payload.response is not Option
self.cache.insert(key, payload.response.clone());
ControlFlow::Continue
}))

Check warning on line 28 in lib/executor/src/plugins/examples/subgraph_response_cache.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/examples/subgraph_response_cache.rs
}
}
3 changes: 3 additions & 0 deletions lib/executor/src/plugins/hooks/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod on_execute;

Check warning on line 1 in lib/executor/src/plugins/hooks/mod.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/hooks/mod.rs
pub mod on_schema_reload;
pub mod on_subgraph_http_request;
33 changes: 33 additions & 0 deletions lib/executor/src/plugins/hooks/on_execute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::collections::HashMap;
use std::sync::Arc;

use hive_router_query_planner::planner::plan_nodes::QueryPlan;

Check warning on line 4 in lib/executor/src/plugins/hooks/on_execute.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/hooks/on_execute.rs
use ntex::web::HttpRequest;

use crate::response::{value::Value};
use crate::response::graphql_error::GraphQLError;

pub struct OnExecutePayload<'exec> {
pub router_http_request: &'exec HttpRequest,
pub query_plan: Arc<QueryPlan>,

pub data: &'exec mut Value<'exec>,
pub errors: &'exec mut Vec<GraphQLError>,
pub extensions: &'exec mut HashMap<String, sonic_rs::Value>,

pub skip_execution: bool,

pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
}

pub struct OnExecuteEndPayload<'exec> {
pub router_http_request: &'exec HttpRequest,
pub query_plan: Arc<QueryPlan>,

pub data: &'exec Value<'exec>,
pub errors: &'exec Vec<GraphQLError>,
pub extensions: &'exec mut HashMap<String, sonic_rs::Value>,

Check warning on line 30 in lib/executor/src/plugins/hooks/on_execute.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/hooks/on_execute.rs
pub variable_values: &'exec Option<HashMap<String, sonic_rs::Value>>,
}

6 changes: 6 additions & 0 deletions lib/executor/src/plugins/hooks/on_schema_reload.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use hive_router_query_planner::consumer_schema::ConsumerSchema;

pub struct OnSchemaReloadPayload {
pub old_schema: &'static ConsumerSchema,
pub new_schema: &'static mut ConsumerSchema,
}
47 changes: 47 additions & 0 deletions lib/executor/src/plugins/hooks/on_subgraph_http_request.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use std::collections::HashMap;

use hive_router_query_planner::ast::operation::SubgraphFetchOperation;
use http::{HeaderMap, Uri};

Check warning on line 4 in lib/executor/src/plugins/hooks/on_subgraph_http_request.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/hooks/on_subgraph_http_request.rs
use ntex::web::HttpRequest;

use crate::
executors::dedupe::SharedResponse
;

pub struct OnSubgraphHttpRequestPayload<'exec> {
pub router_http_request: &'exec HttpRequest,
pub subgraph_name: &'exec str,
// At this point, there is no point of mutating this
pub execution_request: &'exec SubgraphExecutionRequest<'exec>,

pub endpoint: &'exec mut Uri,
// By default, it is POST
pub method: &'exec mut http::Method,
pub headers: &'exec mut HeaderMap,
pub request_body: &'exec mut Vec<u8>,

// Early response
pub response: &'exec mut Option<SharedResponse>,
}

pub struct SubgraphExecutionRequest<'exec> {
pub query: &'exec str,
// We can add the original operation here too
pub operation: &'exec SubgraphFetchOperation,

pub dedupe: bool,
pub operation_name: Option<&'exec str>,
pub variables: Option<HashMap<&'exec str, &'exec sonic_rs::Value>>,
pub extensions: Option<HashMap<String, sonic_rs::Value>>,
pub representations: Option<Vec<u8>>,
}

pub struct OnSubgraphHttpResponsePayload<'exec> {
pub router_http_request: &'exec HttpRequest,
pub subgraph_name: &'exec str,
// The node that initiates this subgraph execution
pub execution_request: &'exec SubgraphExecutionRequest<'exec>,

Check warning on line 43 in lib/executor/src/plugins/hooks/on_subgraph_http_request.rs

View workflow job for this annotation

GitHub Actions / fmt

Diff in /home/runner/work/router/router/lib/executor/src/plugins/hooks/on_subgraph_http_request.rs
// This will be tricky to implement with the current structure,
// but I'm sure we'll figure it out
pub response: &'exec mut SharedResponse,
}
3 changes: 3 additions & 0 deletions lib/executor/src/plugins/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod examples;
pub mod plugin_trait;
pub mod hooks;
27 changes: 27 additions & 0 deletions lib/executor/src/plugins/plugin_trait.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use ntex::web::HttpResponse;

use crate::hooks::on_execute::OnExecutePayload;
use crate::hooks::on_schema_reload::OnSchemaReloadPayload;
use crate::hooks::on_subgraph_http_request::{OnSubgraphHttpRequestPayload, OnSubgraphHttpResponsePayload};

pub enum ControlFlow<'exec, TPayload> {
Continue,
Break(HttpResponse),
OnEnd(Box<dyn FnOnce(TPayload) -> ControlFlow<'exec, ()> + 'exec>),
}

pub trait RouterPlugin {
fn on_execute<'exec>(
&self,
_payload: OnExecutePayload<'exec>,
) -> ControlFlow<'exec, OnExecutePayload<'exec>> {
ControlFlow::Continue
}
fn on_subgraph_http_request<'exec>(
&'static self,
_payload: OnSubgraphHttpRequestPayload<'exec>,
) -> ControlFlow<'exec, OnSubgraphHttpResponsePayload<'exec>> {
ControlFlow::Continue
}
fn on_schema_reload(&self, _payload: OnSchemaReloadPayload) {}
}
Loading