Skip to content

Commit 06763f6

Browse files
committed
split client.rs
1 parent 4a649fd commit 06763f6

File tree

4 files changed

+261
-242
lines changed

4 files changed

+261
-242
lines changed

src/client/decode.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
use async_std::io::{BufReader, Read};
2+
use async_std::prelude::*;
3+
use http_types::{ensure, ensure_eq, format_err};
4+
use http_types::{
5+
headers::{HeaderName, HeaderValue, CONTENT_LENGTH, DATE, TRANSFER_ENCODING},
6+
Body, Response, StatusCode,
7+
};
8+
9+
use std::convert::TryFrom;
10+
use std::str::FromStr;
11+
12+
use crate::chunked::ChunkedDecoder;
13+
use crate::date::fmt_http_date;
14+
use crate::{MAX_HEADERS, MAX_HEAD_LENGTH};
15+
16+
/// Decode an HTTP response on the client.
17+
#[doc(hidden)]
18+
pub async fn decode<R>(reader: R) -> http_types::Result<Response>
19+
where
20+
R: Read + Unpin + Send + Sync + 'static,
21+
{
22+
let mut reader = BufReader::new(reader);
23+
let mut buf = Vec::new();
24+
let mut headers = [httparse::EMPTY_HEADER; MAX_HEADERS];
25+
let mut httparse_res = httparse::Response::new(&mut headers);
26+
27+
// Keep reading bytes from the stream until we hit the end of the stream.
28+
loop {
29+
let bytes_read = reader.read_until(b'\n', &mut buf).await?;
30+
// No more bytes are yielded from the stream.
31+
assert!(bytes_read != 0, "Empty response"); // TODO: ensure?
32+
33+
// Prevent CWE-400 DDOS with large HTTP Headers.
34+
ensure!(
35+
buf.len() < MAX_HEAD_LENGTH,
36+
"Head byte length should be less than 8kb"
37+
);
38+
39+
// We've hit the end delimiter of the stream.
40+
let idx = buf.len() - 1;
41+
if idx >= 3 && &buf[idx - 3..=idx] == b"\r\n\r\n" {
42+
break;
43+
}
44+
}
45+
46+
// Convert our header buf into an httparse instance, and validate.
47+
let status = httparse_res.parse(&buf)?;
48+
ensure!(!status.is_partial(), "Malformed HTTP head");
49+
50+
let code = httparse_res.code;
51+
let code = code.ok_or_else(|| format_err!("No status code found"))?;
52+
53+
// Convert httparse headers + body into a `http_types::Response` type.
54+
let version = httparse_res.version;
55+
let version = version.ok_or_else(|| format_err!("No version found"))?;
56+
ensure_eq!(version, 1, "Unsupported HTTP version");
57+
58+
let mut res = Response::new(StatusCode::try_from(code)?);
59+
for header in httparse_res.headers.iter() {
60+
let name = HeaderName::from_str(header.name)?;
61+
let value = HeaderValue::from_str(std::str::from_utf8(header.value)?)?;
62+
res.append_header(name, value)?;
63+
}
64+
65+
if res.header(&DATE).is_none() {
66+
let date = fmt_http_date(std::time::SystemTime::now());
67+
res.insert_header(DATE, &format!("date: {}\r\n", date)[..])?;
68+
}
69+
70+
let content_length = res.header(&CONTENT_LENGTH);
71+
let transfer_encoding = res.header(&TRANSFER_ENCODING);
72+
73+
ensure!(
74+
content_length.is_none() || transfer_encoding.is_none(),
75+
"Unexpected Content-Length header"
76+
);
77+
78+
// Check for Transfer-Encoding
79+
match transfer_encoding {
80+
Some(encoding) if !encoding.is_empty() => {
81+
if encoding.last().unwrap().as_str() == "chunked" {
82+
let trailers_sender = res.send_trailers();
83+
let reader = BufReader::new(ChunkedDecoder::new(reader, trailers_sender));
84+
res.set_body(Body::from_reader(reader, None));
85+
return Ok(res);
86+
}
87+
// Fall through to Content-Length
88+
}
89+
_ => {
90+
// Fall through to Content-Length
91+
}
92+
}
93+
94+
// Check for Content-Length.
95+
match content_length {
96+
Some(len) => {
97+
let len = len.last().unwrap().as_str().parse::<usize>()?;
98+
res.set_body(Body::from_reader(reader.take(len as u64), Some(len)));
99+
}
100+
None => {}
101+
}
102+
103+
// Return the response.
104+
Ok(res)
105+
}

src/client/encode.rs

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use async_std::io::{self, Read};
2+
use async_std::prelude::*;
3+
use async_std::task::{Context, Poll};
4+
use http_types::format_err;
5+
use http_types::Request;
6+
7+
use std::pin::Pin;
8+
9+
use crate::date::fmt_http_date;
10+
11+
/// Encode an HTTP request on the client.
12+
#[doc(hidden)]
13+
pub async fn encode(req: Request) -> http_types::Result<Encoder> {
14+
let mut buf: Vec<u8> = vec![];
15+
16+
let mut url = req.url().path().to_owned();
17+
if let Some(fragment) = req.url().fragment() {
18+
url.push('#');
19+
url.push_str(fragment);
20+
}
21+
if let Some(query) = req.url().query() {
22+
url.push('?');
23+
url.push_str(query);
24+
}
25+
26+
let val = format!("{} {} HTTP/1.1\r\n", req.method(), url);
27+
log::trace!("> {}", &val);
28+
buf.write_all(val.as_bytes()).await?;
29+
30+
// Insert Host header
31+
// Insert host
32+
let host = req.url().host_str();
33+
let host = host.ok_or_else(|| format_err!("Missing hostname"))?;
34+
let val = if let Some(port) = req.url().port() {
35+
format!("host: {}:{}\r\n", host, port)
36+
} else {
37+
format!("host: {}\r\n", host)
38+
};
39+
40+
log::trace!("> {}", &val);
41+
buf.write_all(val.as_bytes()).await?;
42+
43+
// If the body isn't streaming, we can set the content-length ahead of time. Else we need to
44+
// send all items in chunks.
45+
if let Some(len) = req.len() {
46+
let val = format!("content-length: {}\r\n", len);
47+
log::trace!("> {}", &val);
48+
buf.write_all(val.as_bytes()).await?;
49+
} else {
50+
// write!(&mut buf, "Transfer-Encoding: chunked\r\n")?;
51+
panic!("chunked encoding is not implemented yet");
52+
// See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Transfer-Encoding
53+
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Trailer
54+
}
55+
56+
let date = fmt_http_date(std::time::SystemTime::now());
57+
buf.write_all(b"date: ").await?;
58+
buf.write_all(date.as_bytes()).await?;
59+
buf.write_all(b"\r\n").await?;
60+
61+
for (header, values) in req.iter() {
62+
for value in values.iter() {
63+
let val = format!("{}: {}\r\n", header, value);
64+
log::trace!("> {}", &val);
65+
buf.write_all(val.as_bytes()).await?;
66+
}
67+
}
68+
69+
buf.write_all(b"\r\n").await?;
70+
71+
Ok(Encoder::new(buf, req))
72+
}
73+
74+
/// An HTTP encoder.
75+
#[doc(hidden)]
76+
#[derive(Debug)]
77+
pub struct Encoder {
78+
/// Keep track how far we've indexed into the headers + body.
79+
cursor: usize,
80+
/// HTTP headers to be sent.
81+
headers: Vec<u8>,
82+
/// Check whether we're done sending headers.
83+
headers_done: bool,
84+
/// Request with the HTTP body to be sent.
85+
request: Request,
86+
/// Check whether we're done with the body.
87+
body_done: bool,
88+
/// Keep track of how many bytes have been read from the body stream.
89+
body_bytes_read: usize,
90+
}
91+
92+
impl Encoder {
93+
/// Create a new instance.
94+
pub(crate) fn new(headers: Vec<u8>, request: Request) -> Self {
95+
Self {
96+
request,
97+
headers,
98+
cursor: 0,
99+
headers_done: false,
100+
body_done: false,
101+
body_bytes_read: 0,
102+
}
103+
}
104+
}
105+
106+
impl Read for Encoder {
107+
fn poll_read(
108+
mut self: Pin<&mut Self>,
109+
cx: &mut Context<'_>,
110+
buf: &mut [u8],
111+
) -> Poll<io::Result<usize>> {
112+
// Send the headers. As long as the headers aren't fully sent yet we
113+
// keep sending more of the headers.
114+
let mut bytes_read = 0;
115+
if !self.headers_done {
116+
let len = std::cmp::min(self.headers.len() - self.cursor, buf.len());
117+
let range = self.cursor..self.cursor + len;
118+
buf[0..len].copy_from_slice(&mut self.headers[range]);
119+
self.cursor += len;
120+
if self.cursor == self.headers.len() {
121+
self.headers_done = true;
122+
}
123+
bytes_read += len;
124+
}
125+
126+
if !self.body_done {
127+
let inner_poll_result =
128+
Pin::new(&mut self.request).poll_read(cx, &mut buf[bytes_read..]);
129+
let n = match inner_poll_result {
130+
Poll::Ready(Ok(n)) => n,
131+
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
132+
Poll::Pending => {
133+
if bytes_read == 0 {
134+
return Poll::Pending;
135+
} else {
136+
return Poll::Ready(Ok(bytes_read as usize));
137+
}
138+
}
139+
};
140+
bytes_read += n;
141+
self.body_bytes_read += n;
142+
if bytes_read == 0 {
143+
self.body_done = true;
144+
}
145+
}
146+
147+
Poll::Ready(Ok(bytes_read as usize))
148+
}
149+
}

0 commit comments

Comments
 (0)