Skip to content

Commit 7f7b7ab

Browse files
committed
fix(server/http): improve stream / unary error handling
1 parent 8d5a6c0 commit 7f7b7ab

File tree

1 file changed

+80
-82
lines changed
  • crates/hrpc/src/server/transport/http

1 file changed

+80
-82
lines changed

crates/hrpc/src/server/transport/http/impl.rs

Lines changed: 80 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -70,85 +70,66 @@ impl Service<HttpRequest> for HrpcServiceToHttp {
7070
}
7171

7272
fn call(&mut self, mut req: HttpRequest) -> Self::Future {
73-
let (ws_upgrade, hrpc_req) = match WebSocketUpgrade::from_request(&mut req) {
73+
let (ws_upgrade, maybe_hrpc_req) = match WebSocketUpgrade::from_request(&mut req) {
7474
Ok(mut upgrade) => {
7575
upgrade = upgrade.protocols([ws_version()]);
7676

77-
let (parts, body) = req.into_parts();
78-
79-
let endpoint = Cow::Owned(parts.uri.path().to_string());
80-
81-
let mut extensions = Extensions::new();
82-
extensions.insert(parts.extensions);
83-
extensions.insert(parts.headers);
84-
extensions.insert(parts.method);
85-
extensions.insert(parts.version);
86-
extensions.insert(parts.uri);
87-
88-
let req = Request::from(request::Parts {
89-
body: body.into(),
90-
extensions,
91-
endpoint,
92-
});
93-
94-
(Some(upgrade), Ok(req))
77+
(Ok(upgrade), Ok(from_http_request(req)))
9578
}
9679
Err(err) => {
97-
// TODO: this is not good, find a way to properly get if a path is socket or unary
98-
if let WebSocketUpgradeError::MethodNotGet = err {
99-
(None, from_unary_request(req))
100-
} else {
101-
let message = err.to_string();
102-
let mut resp = err_into_unary_response(
103-
HrpcError::default()
104-
.with_identifier("hrpc.http.bad-streaming-request")
105-
.with_message(message),
106-
);
107-
108-
*resp.status_mut() = match err {
109-
WebSocketUpgradeError::MethodNotGet => StatusCode::METHOD_NOT_ALLOWED,
110-
_ => StatusCode::BAD_REQUEST,
111-
};
112-
113-
return Box::pin(futures_util::future::ready(Ok(resp)));
114-
}
80+
let hrpc_err = HrpcError::default()
81+
.with_identifier("hrpc.http.bad-streaming-request")
82+
.with_message(err.to_string());
83+
let status = match err {
84+
WebSocketUpgradeError::MethodNotGet => StatusCode::METHOD_NOT_ALLOWED,
85+
_ => StatusCode::BAD_REQUEST,
86+
};
87+
88+
(Err((status, hrpc_err)), from_unary_request(req))
11589
}
11690
};
11791

118-
match hrpc_req {
119-
Ok(mut req) => {
120-
if let Some(socket_addr) = self.socket_addr {
121-
req.extensions_mut().insert(socket_addr);
122-
}
123-
Box::pin(Service::call(&mut self.inner, req).map(|res| {
124-
let mut resp = res.unwrap();
125-
126-
if let (Some(socket_handler), Some(ws_upgrade)) =
127-
(resp.extensions_mut().remove::<SocketHandler>(), ws_upgrade)
128-
{
92+
let (mut req, maybe_unary_err) = match maybe_hrpc_req {
93+
Ok(req) => (req, None),
94+
Err((req, err_info)) => (req, Some(err_info)),
95+
};
96+
if let Some(socket_addr) = self.socket_addr {
97+
req.extensions_mut().insert(socket_addr);
98+
}
99+
Box::pin(Service::call(&mut self.inner, req).map(|res| {
100+
let mut resp = res.unwrap();
101+
102+
if let Some(sock_handler) = resp.extensions_mut().remove::<SocketHandler>() {
103+
let resp = match ws_upgrade {
104+
Ok(ws_upgrade) => {
129105
let mut ws_resp = ws_upgrade
130106
.on_upgrade(|stream| {
131107
let (ws_tx, ws_rx) = stream.split();
132-
(socket_handler.inner)(Box::pin(ws_rx), Box::pin(ws_tx))
108+
(sock_handler.inner)(Box::pin(ws_rx), Box::pin(ws_tx))
133109
})
134110
.into_response();
135111

136112
let parts: response::Parts = resp.into();
137113

138114
set_http_extensions(parts.extensions, &mut ws_resp);
139115

140-
return Ok(ws_resp);
116+
ws_resp
141117
}
142-
143-
Ok(into_unary_response(resp))
144-
}))
145-
}
146-
Err((status, err)) => {
118+
Err((status, err)) => {
119+
let mut resp = err_into_unary_response(err);
120+
*resp.status_mut() = status;
121+
resp
122+
}
123+
};
124+
Ok(resp)
125+
} else if let Some((status, err)) = maybe_unary_err {
147126
let mut resp = err_into_unary_response(err);
148127
*resp.status_mut() = status;
149-
Box::pin(futures_util::future::ready(Ok(resp)))
128+
Ok(resp)
129+
} else {
130+
Ok(into_unary_response(resp))
150131
}
151-
}
132+
}))
152133
}
153134
}
154135

@@ -260,33 +241,9 @@ pub(crate) fn into_unary_response<T>(resp: Response<T>) -> HttpResponse {
260241
resp
261242
}
262243

263-
/// Try to create a [`Request`] from a unary [`HttpRequest`].
264-
pub(crate) fn from_unary_request<T>(
265-
req: HttpRequest,
266-
) -> Result<Request<T>, (StatusCode, HrpcError)> {
244+
pub(crate) fn from_http_request<T>(req: HttpRequest) -> Request<T> {
267245
let (parts, body) = req.into_parts();
268246

269-
if parts.method != Method::POST {
270-
return Err((
271-
StatusCode::METHOD_NOT_ALLOWED,
272-
("hrpc.http.bad-unary-request", "method must be POST").into(),
273-
));
274-
}
275-
276-
if !parts
277-
.headers
278-
.header_eq(&header::CONTENT_TYPE, HRPC_CONTENT_MIMETYPE.as_bytes())
279-
{
280-
return Err((
281-
StatusCode::BAD_REQUEST,
282-
(
283-
"hrpc.http.bad-unary-request",
284-
"request content type not supported",
285-
)
286-
.into(),
287-
));
288-
}
289-
290247
let endpoint = Cow::Owned(parts.uri.path().to_string());
291248

292249
let mut extensions = Extensions::new();
@@ -302,6 +259,47 @@ pub(crate) fn from_unary_request<T>(
302259
endpoint,
303260
});
304261

262+
req
263+
}
264+
265+
/// Try to create a [`Request`] from a unary [`HttpRequest`].
266+
pub(crate) fn from_unary_request<T>(
267+
req: HttpRequest,
268+
) -> Result<Request<T>, (Request<T>, (StatusCode, HrpcError))> {
269+
let req = from_http_request(req);
270+
271+
if req
272+
.http_method()
273+
.expect("must have http method -- this is a bug")
274+
!= Method::POST
275+
{
276+
return Err((
277+
req,
278+
(
279+
StatusCode::METHOD_NOT_ALLOWED,
280+
("hrpc.http.bad-unary-request", "method must be POST").into(),
281+
),
282+
));
283+
}
284+
285+
if !req
286+
.header_map()
287+
.expect("must have http header map -- this is a bug")
288+
.header_eq(&header::CONTENT_TYPE, HRPC_CONTENT_MIMETYPE.as_bytes())
289+
{
290+
return Err((
291+
req,
292+
(
293+
StatusCode::BAD_REQUEST,
294+
(
295+
"hrpc.http.bad-unary-request",
296+
"request content type not supported",
297+
)
298+
.into(),
299+
),
300+
));
301+
}
302+
305303
Ok(req)
306304
}
307305

0 commit comments

Comments
 (0)