Skip to content

Commit

Permalink
fix some unwraps
Browse files Browse the repository at this point in the history
  • Loading branch information
Geal committed Jan 7, 2022
1 parent c0c115e commit bc14a79
Showing 1 changed file with 36 additions and 28 deletions.
64 changes: 36 additions & 28 deletions apollo-router/src/http_subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,39 +96,47 @@ impl HttpSubgraphFetcher {
&self,
request: graphql::Request,
) -> Result<graphql::Response, graphql::FetchError> {
let hashed_request = serde_json::to_string(&request).unwrap();

let mut locked_wait_map = self.wait_map.lock().await;
match locked_wait_map.get_mut(&hashed_request) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);
let recv_value = receiver.recv().await.expect("FIXME");
recv_value
}
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(hashed_request.clone(), tx.clone());
drop(locked_wait_map);

let res = self.fetch(request).await;

{
let mut locked_wait_map = self.wait_map.lock().await;
locked_wait_map.remove(&hashed_request);
let hashed_request = serde_json::to_string(&request).expect(
"the serializer for Request cannot panic and its fields are always strings; qed",
);

loop {
let mut locked_wait_map = self.wait_map.lock().await;
match locked_wait_map.get_mut(&hashed_request) {
Some(waiter) => {
// Register interest in key
let mut receiver = waiter.subscribe();
drop(locked_wait_map);

match receiver.recv().await {
Ok(value) => return value,
// there was an issue with the broadcast channel, retry fetching
Err(_) => continue,
}
}

// Let our waiters know
let broadcast_value = res.clone();
// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
None => {
let (tx, _rx) = broadcast::channel(1);
locked_wait_map.insert(hashed_request.clone(), tx.clone());
drop(locked_wait_map);

let res = self.fetch(request).await;

{
let mut locked_wait_map = self.wait_map.lock().await;
locked_wait_map.remove(&hashed_request);
}

// Let our waiters know
let broadcast_value = res.clone();
// Our use case is very specific, so we are sure that
// we won't get any errors here.
tokio::task::spawn_blocking(move || {
tx.send(broadcast_value)
.expect("there is always at least one receiver alive, the _rx guard; qed")
}).await
.expect("can only fail if the task is aborted or if the internal code panics, neither is possible here; qed");
res
return res;
}
}
}
}
Expand Down

0 comments on commit bc14a79

Please sign in to comment.