Skip to content

Commit

Permalink
add mutex to context
Browse files Browse the repository at this point in the history
  • Loading branch information
YassinEldeeb committed Feb 14, 2024
1 parent 3d90971 commit 2537e67
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
8 changes: 6 additions & 2 deletions libs/engine/src/source/federation_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use conductor_common::graphql::GraphQLResponse;
use conductor_config::{FederationSourceConfig, SupergraphSourceConfig};
use federation_query_planner::supergraph::{parse_supergraph, Supergraph};
use federation_query_planner::FederationExecutor;
use futures::lock::Mutex;
use minitrace_reqwest::{traced_reqwest, TracedHttpClient};
use std::collections::HashMap;
use std::sync::Arc;
use std::{future::Future, pin::Pin};

#[derive(Debug)]
Expand Down Expand Up @@ -204,10 +206,12 @@ impl SourceRuntime for FederationSourceRuntime {
client: &self.client,
plugin_manager: route_data.plugin_manager.clone(),
supergraph: &self.supergraph,
execution_context: request_context,
};

match executor.execute_federation(operation).await {
match executor
.execute_federation(Arc::new(Mutex::new(request_context)), operation)
.await
{
Ok((response_data, query_plan)) => {
let mut response = serde_json::from_str::<GraphQLResponse>(&response_data).unwrap();

Expand Down
45 changes: 25 additions & 20 deletions libs/federation_query_planner/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use std::{ops::Index, sync::Arc};

use anyhow::{anyhow, Error};
use conductor_common::http::{ConductorHttpRequest, HttpHeadersMap};
use conductor_common::http::ConductorHttpRequest;
use conductor_common::{execute::RequestExecutionContext, plugin_manager::PluginManager};
use constants::CONDUCTOR_INTERNAL_SERVICE_RESOLVER;
use executor::{
dynamically_build_schema_from_supergraph, find_objects_matching_criteria, QueryResponse,
};
use futures::future::join_all;
use futures::lock::Mutex;
use graphql_parser::query::Document;
use minitrace::Span;
use query_planner::QueryStep;
Expand Down Expand Up @@ -36,8 +37,8 @@ pub struct FederationExecutor<'a> {

impl<'a> FederationExecutor<'a> {
pub async fn execute_federation(
&mut self,
request_context: &'a mut RequestExecutionContext,
&self,
request_context: Arc<Mutex<&mut RequestExecutionContext>>,
parsed_user_query: Document<'static, String>,
) -> Result<(String, QueryPlan), Error> {
// println!("parsed_user_query: {:#?}", user_query);
Expand All @@ -61,38 +62,42 @@ impl<'a> FederationExecutor<'a> {
pub async fn execute_query_plan(
&self,
query_plan: &QueryPlan,
request_context: &'a mut RequestExecutionContext,
request_context: Arc<Mutex<&mut RequestExecutionContext>>,
) -> Result<Vec<Vec<((String, String), QueryResponse)>>, Error> {
let mut all_futures = Vec::new();

for step in &query_plan.parallel_steps {
match step {
Parallel::Sequential(query_steps) => {
let future = self.execute_sequential(query_steps, request_context);
all_futures.push(future);
}
}
}
let parallel_block = query_plan.parallel_steps.get(0).unwrap();

Check warning on line 69 in libs/federation_query_planner/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

accessing first element with `query_plan.parallel_steps.get(0)`

warning: accessing first element with `query_plan.parallel_steps.get(0)` --> libs/federation_query_planner/src/lib.rs:69:26 | 69 | let parallel_block = query_plan.parallel_steps.get(0).unwrap(); | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: try: `query_plan.parallel_steps.first()` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#get_first = note: `#[warn(clippy::get_first)]` on by default

let results: Result<Vec<_>, _> = join_all(all_futures).await.into_iter().collect();
match parallel_block {
Parallel::Sequential(steps) => {
let future = self.execute_sequential(steps, request_context);
all_futures.push(future);

match results {
Ok(val) => Ok(val),
Err(e) => Err(anyhow!(e)),
let results: Result<Vec<_>, _> = join_all(all_futures).await.into_iter().collect();

match results {
Ok(val) => Ok(val),
Err(e) => Err(anyhow!(e)),
}
}
}
}

pub async fn execute_sequential(
&self,
query_steps: &Vec<QueryStep>,
request_context: &'a mut RequestExecutionContext,
request_context: Arc<Mutex<&mut RequestExecutionContext>>,
) -> Result<Vec<((String, String), QueryResponse)>, Error> {
let mut data_vec = vec![];
let mut entity_arguments: Option<SerdeValue> = None;

for (i, query_step) in query_steps.iter().enumerate() {
let data = self
.execute_query_step(query_step, entity_arguments.clone(), request_context)
.execute_query_step(
query_step,
entity_arguments.clone(),
request_context.lock().await,
)
.await;

match data {
Expand Down Expand Up @@ -158,7 +163,7 @@ impl<'a> FederationExecutor<'a> {
&self,
query_step: &QueryStep,
entity_arguments: Option<SerdeValue>,
request_context: &'a mut RequestExecutionContext,
mut request_context: futures::lock::MutexGuard<'_, &mut RequestExecutionContext>,
) -> Result<QueryResponse, Error> {
let is_introspection = query_step.service_name == CONDUCTOR_INTERNAL_SERVICE_RESOLVER;

Expand Down Expand Up @@ -221,7 +226,7 @@ impl<'a> FederationExecutor<'a> {

self
.plugin_manager
.on_upstream_http_request(request_context, &mut upstream_request)
.on_upstream_http_request(&mut *request_context, &mut upstream_request)

Check warning on line 229 in libs/federation_query_planner/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

deref which would be done by auto-deref

warning: deref which would be done by auto-deref --> libs/federation_query_planner/src/lib.rs:229:35 | 229 | .on_upstream_http_request(&mut *request_context, &mut upstream_request) | ^^^^^^^^^^^^^^^^^^^^^ help: try: `request_context` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#explicit_auto_deref = note: `#[warn(clippy::explicit_auto_deref)]` on by default

Check warning on line 229 in libs/federation_query_planner/src/lib.rs

View workflow job for this annotation

GitHub Actions / clippy

this expression creates a reference which is immediately dereferenced by the compiler

warning: this expression creates a reference which is immediately dereferenced by the compiler --> libs/federation_query_planner/src/lib.rs:229:35 | 229 | .on_upstream_http_request(&mut *request_context, &mut upstream_request) | ^^^^^^^^^^^^^^^^^^^^^ help: change this to: `(*request_context)` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_borrow = note: `#[warn(clippy::needless_borrow)]` on by default
.await;

if request_context.is_short_circuit() {
Expand Down

0 comments on commit 2537e67

Please sign in to comment.