From 2537e676f60c210c6b83734324ecc4d77a7444b4 Mon Sep 17 00:00:00 2001 From: YassinEldeeb Date: Wed, 14 Feb 2024 10:04:43 +0200 Subject: [PATCH] add mutex to context --- libs/engine/src/source/federation_source.rs | 8 +++- libs/federation_query_planner/src/lib.rs | 45 ++++++++++++--------- 2 files changed, 31 insertions(+), 22 deletions(-) diff --git a/libs/engine/src/source/federation_source.rs b/libs/engine/src/source/federation_source.rs index 7bf625b9..93256666 100644 --- a/libs/engine/src/source/federation_source.rs +++ b/libs/engine/src/source/federation_source.rs @@ -6,8 +6,10 @@ use conductor_common::graphql::GraphQLResponse; use conductor_config::{FederationSourceConfig, SupergraphSourceConfig}; use federation_query_planner::supergraph::{parse_supergraph, Supergraph}; use federation_query_planner::FederationExecutor; +use futures::lock::Mutex; use minitrace_reqwest::{traced_reqwest, TracedHttpClient}; use std::collections::HashMap; +use std::sync::Arc; use std::{future::Future, pin::Pin}; #[derive(Debug)] @@ -204,10 +206,12 @@ impl SourceRuntime for FederationSourceRuntime { client: &self.client, plugin_manager: route_data.plugin_manager.clone(), supergraph: &self.supergraph, - execution_context: request_context, }; - match executor.execute_federation(operation).await { + match executor + .execute_federation(Arc::new(Mutex::new(request_context)), operation) + .await + { Ok((response_data, query_plan)) => { let mut response = serde_json::from_str::(&response_data).unwrap(); diff --git a/libs/federation_query_planner/src/lib.rs b/libs/federation_query_planner/src/lib.rs index df699711..e15dd503 100644 --- a/libs/federation_query_planner/src/lib.rs +++ b/libs/federation_query_planner/src/lib.rs @@ -1,13 +1,14 @@ use std::{ops::Index, sync::Arc}; use anyhow::{anyhow, Error}; -use conductor_common::http::{ConductorHttpRequest, HttpHeadersMap}; +use conductor_common::http::ConductorHttpRequest; use conductor_common::{execute::RequestExecutionContext, plugin_manager::PluginManager}; use constants::CONDUCTOR_INTERNAL_SERVICE_RESOLVER; use executor::{ dynamically_build_schema_from_supergraph, find_objects_matching_criteria, QueryResponse, }; use futures::future::join_all; +use futures::lock::Mutex; use graphql_parser::query::Document; use minitrace::Span; use query_planner::QueryStep; @@ -36,8 +37,8 @@ pub struct FederationExecutor<'a> { impl<'a> FederationExecutor<'a> { pub async fn execute_federation( - &mut self, - request_context: &'a mut RequestExecutionContext, + &self, + request_context: Arc>, parsed_user_query: Document<'static, String>, ) -> Result<(String, QueryPlan), Error> { // println!("parsed_user_query: {:#?}", user_query); @@ -61,38 +62,42 @@ impl<'a> FederationExecutor<'a> { pub async fn execute_query_plan( &self, query_plan: &QueryPlan, - request_context: &'a mut RequestExecutionContext, + request_context: Arc>, ) -> Result>, Error> { let mut all_futures = Vec::new(); - for step in &query_plan.parallel_steps { - match step { - Parallel::Sequential(query_steps) => { - let future = self.execute_sequential(query_steps, request_context); - all_futures.push(future); - } - } - } + let parallel_block = query_plan.parallel_steps.get(0).unwrap(); - let results: Result, _> = join_all(all_futures).await.into_iter().collect(); + match parallel_block { + Parallel::Sequential(steps) => { + let future = self.execute_sequential(steps, request_context); + all_futures.push(future); - match results { - Ok(val) => Ok(val), - Err(e) => Err(anyhow!(e)), + let results: Result, _> = join_all(all_futures).await.into_iter().collect(); + + match results { + Ok(val) => Ok(val), + Err(e) => Err(anyhow!(e)), + } + } } } pub async fn execute_sequential( &self, query_steps: &Vec, - request_context: &'a mut RequestExecutionContext, + request_context: Arc>, ) -> Result, Error> { let mut data_vec = vec![]; let mut entity_arguments: Option = None; for (i, query_step) in query_steps.iter().enumerate() { let data = self - .execute_query_step(query_step, entity_arguments.clone(), request_context) + .execute_query_step( + query_step, + entity_arguments.clone(), + request_context.lock().await, + ) .await; match data { @@ -158,7 +163,7 @@ impl<'a> FederationExecutor<'a> { &self, query_step: &QueryStep, entity_arguments: Option, - request_context: &'a mut RequestExecutionContext, + mut request_context: futures::lock::MutexGuard<'_, &mut RequestExecutionContext>, ) -> Result { let is_introspection = query_step.service_name == CONDUCTOR_INTERNAL_SERVICE_RESOLVER; @@ -221,7 +226,7 @@ impl<'a> FederationExecutor<'a> { self .plugin_manager - .on_upstream_http_request(request_context, &mut upstream_request) + .on_upstream_http_request(&mut *request_context, &mut upstream_request) .await; if request_context.is_short_circuit() {