Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add SendRequest::try_send_request() method #3691

Merged
merged 1 commit into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 28 additions & 23 deletions src/client/conn/http1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_util::ready;
use http::{Request, Response};
use httparse::ParserConfig;

use super::super::dispatch;
use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
use crate::proto;

Expand Down Expand Up @@ -200,33 +200,38 @@ where
}
}

/*
pub(super) fn send_request_retryable(
/// Sends a `Request` on the associated connection.
///
/// Returns a future that if successful, yields the `Response`.
///
/// # Error
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
where
B: Send,
{
match self.dispatch.try_send(req) {
Ok(rx) => {
Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => future::ok(res),
Ok(Err(err)) => future::err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::Right(future::err((err, Some(req))))
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
*/
}

impl<B> fmt::Debug for SendRequest<B> {
Expand Down
51 changes: 28 additions & 23 deletions src/client/conn/http2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::rt::{Read, Write};
use futures_util::ready;
use http::{Request, Response};

use super::super::dispatch;
use super::super::dispatch::{self, TrySendError};
use crate::body::{Body, Incoming as IncomingBody};
use crate::common::time::Time;
use crate::proto;
Expand Down Expand Up @@ -152,33 +152,38 @@ where
}
}

/*
pub(super) fn send_request_retryable(
/// Sends a `Request` on the associated connection.
///
/// Returns a future that if successful, yields the `Response`.
///
/// # Error
///
/// If there was an error before trying to serialize the request to the
/// connection, the message will be returned as part of this error.
pub fn try_send_request(
&mut self,
req: Request<B>,
) -> impl Future<Output = Result<Response<Body>, (crate::Error, Option<Request<B>>)>> + Unpin
where
B: Send,
{
match self.dispatch.try_send(req) {
Ok(rx) => {
Either::Left(rx.then(move |res| {
match res {
Ok(Ok(res)) => future::ok(res),
Ok(Err(err)) => future::err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
}
}))
}
Err(req) => {
debug!("connection was not ready");
let err = crate::Error::new_canceled().with("connection was not ready");
Either::Right(future::err((err, Some(req))))
) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
let sent = self.dispatch.try_send(req);
async move {
match sent {
Ok(rx) => match rx.await {
Ok(Ok(res)) => Ok(res),
Ok(Err(err)) => Err(err),
// this is definite bug if it happens, but it shouldn't happen!
Err(_) => panic!("dispatch dropped without returning error"),
},
Err(req) => {
debug!("connection was not ready");
let error = crate::Error::new_canceled().with("connection was not ready");
Err(TrySendError {
error,
message: Some(req),
})
}
}
}
}
*/
}

impl<B> fmt::Debug for SendRequest<B> {
Expand Down
2 changes: 2 additions & 0 deletions src/client/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
pub mod http1;
#[cfg(feature = "http2")]
pub mod http2;

pub use super::dispatch::TrySendError;
61 changes: 45 additions & 16 deletions src/client/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,21 @@ use tokio::sync::{mpsc, oneshot};
#[cfg(feature = "http2")]
use crate::{body::Incoming, proto::h2::client::ResponseFutMap};

#[cfg(test)]
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, (crate::Error, Option<T>)>>;
pub(crate) type RetryPromise<T, U> = oneshot::Receiver<Result<U, TrySendError<T>>>;
pub(crate) type Promise<T> = oneshot::Receiver<Result<T, crate::Error>>;

/// An error when calling `try_send_request`.
///
/// There is a possibility of an error occuring on a connection in-between the
/// time that a request is queued and when it is actually written to the IO
/// transport. If that happens, it is safe to return the request back to the
/// caller, as it was never fully sent.
#[derive(Debug)]
pub struct TrySendError<T> {
pub(crate) error: crate::Error,
pub(crate) message: Option<T>,
}

pub(crate) fn channel<T, U>() -> (Sender<T, U>, Receiver<T, U>) {
let (tx, rx) = mpsc::unbounded_channel();
let (giver, taker) = want::new();
Expand Down Expand Up @@ -92,7 +103,7 @@ impl<T, U> Sender<T, U> {
}
}

#[cfg(test)]
#[cfg(feature = "http1")]
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
if !self.can_send() {
return Err(val);
Expand Down Expand Up @@ -135,7 +146,6 @@ impl<T, U> UnboundedSender<T, U> {
self.giver.is_canceled()
}

#[cfg(test)]
pub(crate) fn try_send(&mut self, val: T) -> Result<RetryPromise<T, U>, T> {
let (tx, rx) = oneshot::channel();
self.inner
Expand Down Expand Up @@ -210,17 +220,17 @@ struct Envelope<T, U>(Option<(T, Callback<T, U>)>);
impl<T, U> Drop for Envelope<T, U> {
fn drop(&mut self) {
if let Some((val, cb)) = self.0.take() {
cb.send(Err((
crate::Error::new_canceled().with("connection closed"),
Some(val),
)));
cb.send(Err(TrySendError {
error: crate::Error::new_canceled().with("connection closed"),
message: Some(val),
}));
}
}
}

pub(crate) enum Callback<T, U> {
#[allow(unused)]
Retry(Option<oneshot::Sender<Result<U, (crate::Error, Option<T>)>>>),
Retry(Option<oneshot::Sender<Result<U, TrySendError<T>>>>),
NoRetry(Option<oneshot::Sender<Result<U, crate::Error>>>),
}

Expand All @@ -229,7 +239,10 @@ impl<T, U> Drop for Callback<T, U> {
match self {
Callback::Retry(tx) => {
if let Some(tx) = tx.take() {
let _ = tx.send(Err((dispatch_gone(), None)));
let _ = tx.send(Err(TrySendError {
error: dispatch_gone(),
message: None,
}));
}
}
Callback::NoRetry(tx) => {
Expand Down Expand Up @@ -269,18 +282,34 @@ impl<T, U> Callback<T, U> {
}
}

pub(crate) fn send(mut self, val: Result<U, (crate::Error, Option<T>)>) {
pub(crate) fn send(mut self, val: Result<U, TrySendError<T>>) {
match self {
Callback::Retry(ref mut tx) => {
let _ = tx.take().unwrap().send(val);
}
Callback::NoRetry(ref mut tx) => {
let _ = tx.take().unwrap().send(val.map_err(|e| e.0));
let _ = tx.take().unwrap().send(val.map_err(|e| e.error));
}
}
}
}

impl<T> TrySendError<T> {
/// Take the message from this error.
///
/// The message will not always have been recovered. If an error occurs
/// after the message has been serialized onto the connection, it will not
/// be available here.
pub fn take_message(&mut self) -> Option<T> {
self.message.take()
}

/// Consumes this to return the inner error.
pub fn into_error(self) -> crate::Error {
self.error
}
}

#[cfg(feature = "http2")]
pin_project! {
pub struct SendWhen<B>
Expand Down Expand Up @@ -325,8 +354,8 @@ where
trace!("send_when canceled");
Poll::Ready(())
}
Poll::Ready(Err(err)) => {
call_back.send(Err(err));
Poll::Ready(Err((error, message))) => {
call_back.send(Err(TrySendError { error, message }));
Poll::Ready(())
}
}
Expand Down Expand Up @@ -389,8 +418,8 @@ mod tests {
let err = fulfilled
.expect("fulfilled")
.expect_err("promise should error");
match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
match (err.error.is_canceled(), err.message) {
(true, Some(_)) => (),
e => panic!("expected Error::Cancel(_), found {:?}", e),
}
}
Expand Down
18 changes: 13 additions & 5 deletions src/proto/h1/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use http::Request;

use super::{Http1Transaction, Wants};
use crate::body::{Body, DecodedLength, Incoming as IncomingBody};
#[cfg(feature = "client")]
use crate::client::dispatch::TrySendError;
use crate::common::task;
use crate::proto::{BodyLength, Conn, Dispatched, MessageHead, RequestHead};
use crate::upgrade::OnUpgrade;
Expand Down Expand Up @@ -655,15 +657,21 @@ cfg_client! {
}
Err(err) => {
if let Some(cb) = self.callback.take() {
cb.send(Err((err, None)));
cb.send(Err(TrySendError {
error: err,
message: None,
}));
Ok(())
} else if !self.rx_closed {
self.rx.close();
if let Some((req, cb)) = self.rx.try_recv() {
trace!("canceling queued request with connection error: {}", err);
// in this case, the message was never even started, so it's safe to tell
// the user that the request was completely canceled
cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
cb.send(Err(TrySendError {
error: crate::Error::new_canceled().with(err),
message: Some(req),
}));
Ok(())
} else {
Err(err)
Expand Down Expand Up @@ -729,9 +737,9 @@ mod tests {
let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
.expect_err("callback should send error");

match (err.0.kind(), err.1) {
(&crate::error::Kind::Canceled, Some(_)) => (),
other => panic!("expected Canceled, got {:?}", other),
match (err.error.is_canceled(), err.message.as_ref()) {
(true, Some(_)) => (),
_ => panic!("expected Canceled, got {:?}", err),
}
});
}
Expand Down
20 changes: 13 additions & 7 deletions src/proto/h2/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use pin_project_lite::pin_project;
use super::ping::{Ponger, Recorder};
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
use crate::body::{Body, Incoming as IncomingBody};
use crate::client::dispatch::{Callback, SendWhen};
use crate::client::dispatch::{Callback, SendWhen, TrySendError};
use crate::common::io::Compat;
use crate::common::time::Time;
use crate::ext::Protocol;
Expand Down Expand Up @@ -662,10 +662,10 @@ where
.map_or(false, |len| len != 0)
{
warn!("h2 connect request with non-zero body not supported");
cb.send(Err((
crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
None,
)));
cb.send(Err(TrySendError {
error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
message: None,
}));
continue;
}

Expand All @@ -677,7 +677,10 @@ where
Ok(ok) => ok,
Err(err) => {
debug!("client send request error: {}", err);
cb.send(Err((crate::Error::new_h2(err), None)));
cb.send(Err(TrySendError {
error: crate::Error::new_h2(err),
message: None,
}));
continue;
}
};
Expand All @@ -702,7 +705,10 @@ where
}
Poll::Ready(Ok(())) => (),
Poll::Ready(Err(err)) => {
f.cb.send(Err((crate::Error::new_h2(err), None)));
f.cb.send(Err(TrySendError {
error: crate::Error::new_h2(err),
message: None,
}));
continue;
}
}
Expand Down
Loading
Loading