Skip to content

Commit 52214f3

Browse files
authored
fix(client): retry when pool checkout returns closed HTTP2 connection (#2585)
When http2_only is true, we never try to open a new connection if there is one open already, which means that if the existing connection that gets checked out of the pool is closed, then the request won't happen.
1 parent 5243570 commit 52214f3

File tree

3 files changed

+53
-13
lines changed

3 files changed

+53
-13
lines changed

src/client/client.rs

+34-7
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,9 @@ use http::{Method, Request, Response, Uri, Version};
1111

1212
use super::conn;
1313
use super::connect::{self, sealed::Connect, Alpn, Connected, Connection};
14-
use super::pool::{self, Key as PoolKey, Pool, Poolable, Pooled, Reservation};
14+
use super::pool::{
15+
self, CheckoutIsClosedError, Key as PoolKey, Pool, Poolable, Pooled, Reservation,
16+
};
1517
#[cfg(feature = "tcp")]
1618
use super::HttpConnector;
1719
use crate::body::{Body, HttpBody};
@@ -223,7 +225,17 @@ where
223225
mut req: Request<B>,
224226
pool_key: PoolKey,
225227
) -> Result<Response<Body>, ClientError<B>> {
226-
let mut pooled = self.connection_for(pool_key).await?;
228+
let mut pooled = match self.connection_for(pool_key).await {
229+
Ok(pooled) => pooled,
230+
Err(ClientConnectError::Normal(err)) => return Err(ClientError::Normal(err)),
231+
Err(ClientConnectError::H2CheckoutIsClosed(reason)) => {
232+
return Err(ClientError::Canceled {
233+
connection_reused: true,
234+
req,
235+
reason,
236+
})
237+
}
238+
};
227239

228240
if pooled.is_http1() {
229241
if req.version() == Version::HTTP_2 {
@@ -321,7 +333,7 @@ where
321333
async fn connection_for(
322334
&self,
323335
pool_key: PoolKey,
324-
) -> Result<Pooled<PoolClient<B>>, ClientError<B>> {
336+
) -> Result<Pooled<PoolClient<B>>, ClientConnectError> {
325337
// This actually races 2 different futures to try to get a ready
326338
// connection the fastest, and to reduce connection churn.
327339
//
@@ -337,6 +349,7 @@ where
337349
// and then be inserted into the pool as an idle connection.
338350
let checkout = self.pool.checkout(pool_key.clone());
339351
let connect = self.connect_to(pool_key);
352+
let is_ver_h2 = self.config.ver == Ver::Http2;
340353

341354
// The order of the `select` is depended on below...
342355

@@ -380,16 +393,25 @@ where
380393
// In both cases, we should just wait for the other future.
381394
Either::Left((Err(err), connecting)) => {
382395
if err.is_canceled() {
383-
connecting.await.map_err(ClientError::Normal)
396+
connecting.await.map_err(ClientConnectError::Normal)
384397
} else {
385-
Err(ClientError::Normal(err))
398+
Err(ClientConnectError::Normal(err))
386399
}
387400
}
388401
Either::Right((Err(err), checkout)) => {
389402
if err.is_canceled() {
390-
checkout.await.map_err(ClientError::Normal)
403+
checkout.await.map_err(move |err| {
404+
if is_ver_h2
405+
&& err.is_canceled()
406+
&& err.find_source::<CheckoutIsClosedError>().is_some()
407+
{
408+
ClientConnectError::H2CheckoutIsClosed(err)
409+
} else {
410+
ClientConnectError::Normal(err)
411+
}
412+
})
391413
} else {
392-
Err(ClientError::Normal(err))
414+
Err(ClientConnectError::Normal(err))
393415
}
394416
}
395417
}
@@ -722,6 +744,11 @@ impl<B> ClientError<B> {
722744
}
723745
}
724746

747+
enum ClientConnectError {
748+
Normal(crate::Error),
749+
H2CheckoutIsClosed(crate::Error),
750+
}
751+
725752
/// A marker to identify what version a pooled connection is.
726753
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
727754
pub(super) enum Ver {

src/client/pool.rs

+18-5
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::collections::{HashMap, HashSet, VecDeque};
2+
use std::error::Error as StdError;
23
use std::fmt;
34
use std::ops::{Deref, DerefMut};
45
use std::sync::{Arc, Mutex, Weak};
@@ -560,28 +561,40 @@ pub(super) struct Checkout<T> {
560561
waiter: Option<oneshot::Receiver<T>>,
561562
}
562563

564+
#[derive(Debug)]
565+
pub(super) struct CheckoutIsClosedError;
566+
567+
impl StdError for CheckoutIsClosedError {}
568+
569+
impl fmt::Display for CheckoutIsClosedError {
570+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
571+
f.write_str("checked out connection was closed")
572+
}
573+
}
574+
563575
impl<T: Poolable> Checkout<T> {
564576
fn poll_waiter(
565577
&mut self,
566578
cx: &mut task::Context<'_>,
567579
) -> Poll<Option<crate::Result<Pooled<T>>>> {
568-
static CANCELED: &str = "pool checkout failed";
569580
if let Some(mut rx) = self.waiter.take() {
570581
match Pin::new(&mut rx).poll(cx) {
571582
Poll::Ready(Ok(value)) => {
572583
if value.is_open() {
573584
Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value))))
574585
} else {
575-
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
586+
Poll::Ready(Some(Err(
587+
crate::Error::new_canceled().with(CheckoutIsClosedError)
588+
)))
576589
}
577590
}
578591
Poll::Pending => {
579592
self.waiter = Some(rx);
580593
Poll::Pending
581594
}
582-
Poll::Ready(Err(_canceled)) => {
583-
Poll::Ready(Some(Err(crate::Error::new_canceled().with(CANCELED))))
584-
}
595+
Poll::Ready(Err(_canceled)) => Poll::Ready(Some(Err(
596+
crate::Error::new_canceled().with("request has been canceled")
597+
))),
585598
}
586599
} else {
587600
Poll::Ready(None)

src/error.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl Error {
214214
&self.inner.kind
215215
}
216216

217-
fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
217+
pub(crate) fn find_source<E: StdError + 'static>(&self) -> Option<&E> {
218218
let mut cause = self.source();
219219
while let Some(err) = cause {
220220
if let Some(ref typed) = err.downcast_ref() {

0 commit comments

Comments
 (0)