Skip to content

Commit 69241af

Browse files
committed
h2spec ETA: 135/147
- implement reset_stream (in a very bad way) - change forcefully_terminate_stream to be quicker - more strict h2 parsing note: it bacomes apparent that: - streams lack the RFC definition of state - remultiplexing lacks a frame queue - priorizer may have to be shared among connections of a session Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
1 parent 8976796 commit 69241af

File tree

7 files changed

+348
-123
lines changed

7 files changed

+348
-123
lines changed

lib/src/protocol/mux/converter.rs

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
use std::str::from_utf8_unchecked;
22

3-
use kawa::{AsBuffer, Block, BlockConverter, Chunk, Flags, Kawa, Pair, StatusLine, Store};
4-
5-
use crate::protocol::http::parser::compare_no_case;
3+
use kawa::{
4+
AsBuffer, Block, BlockConverter, Chunk, Flags, Kawa, Pair, ParsingErrorKind, ParsingPhase,
5+
StatusLine, Store,
6+
};
67

7-
use super::{
8-
parser::{FrameHeader, FrameType, H2Error},
9-
serializer::{gen_frame_header, gen_rst_stream},
10-
StreamId,
8+
use crate::protocol::{
9+
http::parser::compare_no_case,
10+
mux::{
11+
parser::{str_to_error_code, FrameHeader, FrameType, H2Error},
12+
serializer::{gen_frame_header, gen_rst_stream},
13+
StreamId,
14+
},
1115
};
1216

1317
pub struct H2BlockConverter<'a> {
@@ -17,6 +21,26 @@ pub struct H2BlockConverter<'a> {
1721
}
1822

1923
impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
24+
fn initialize(&mut self, kawa: &mut Kawa<T>) {
25+
// This is very ugly... we may add a h2 variant in kawa::ParsingErrorKind
26+
match kawa.parsing_phase {
27+
ParsingPhase::Error {
28+
kind: ParsingErrorKind::Processing { message },
29+
..
30+
} => {
31+
let error = str_to_error_code(message);
32+
let mut frame = [0; 13];
33+
gen_rst_stream(&mut frame, self.stream_id, error).unwrap();
34+
kawa.push_out(Store::from_slice(&frame));
35+
}
36+
ParsingPhase::Error { .. } => {
37+
let mut frame = [0; 13];
38+
gen_rst_stream(&mut frame, self.stream_id, H2Error::InternalError).unwrap();
39+
kawa.push_out(Store::from_slice(&frame));
40+
}
41+
_ => {}
42+
}
43+
}
2044
fn call(&mut self, block: Block, kawa: &mut Kawa<T>) {
2145
let buffer = kawa.storage.buffer();
2246
match block {
@@ -140,24 +164,18 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
140164
kawa.push_out(Store::from_slice(&header));
141165
kawa.push_out(Store::Alloc(payload.into_boxed_slice(), 0));
142166
} else if end_stream {
143-
if kawa.is_error() {
144-
let mut frame = [0; 13];
145-
gen_rst_stream(&mut frame, self.stream_id, H2Error::InternalError).unwrap();
146-
kawa.push_out(Store::from_slice(&frame));
147-
} else {
148-
let mut header = [0; 9];
149-
gen_frame_header(
150-
&mut header,
151-
&FrameHeader {
152-
payload_len: 0,
153-
frame_type: FrameType::Data,
154-
flags: 1,
155-
stream_id: self.stream_id,
156-
},
157-
)
158-
.unwrap();
159-
kawa.push_out(Store::from_slice(&header));
160-
}
167+
let mut header = [0; 9];
168+
gen_frame_header(
169+
&mut header,
170+
&FrameHeader {
171+
payload_len: 0,
172+
frame_type: FrameType::Data,
173+
flags: 1,
174+
stream_id: self.stream_id,
175+
},
176+
)
177+
.unwrap();
178+
kawa.push_out(Store::from_slice(&header));
161179
}
162180
if end_header || end_stream {
163181
kawa.push_delimiter()

lib/src/protocol/mux/h1.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ use sozu_command::ready::Ready;
33
use crate::{
44
println_,
55
protocol::mux::{
6-
debug_kawa, forcefully_terminate_answer, set_default_answer, update_readiness_after_read,
7-
update_readiness_after_write, BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult,
8-
Position, StreamState,
6+
debug_kawa, forcefully_terminate_answer, parser::H2Error, set_default_answer, update_readiness_after_read, update_readiness_after_write, BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult, Position, StreamState
97
},
108
socket::SocketHandler,
119
timer::TimeoutContainer,
@@ -226,6 +224,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
226224
Position::Client(BackendStatus::Connected(cluster_id))
227225
| Position::Client(BackendStatus::Connecting(cluster_id)) => {
228226
self.stream = usize::MAX;
227+
// keep alive should probably be used only if the http context is fully reset
228+
// in case end_stream occurs due to an error the connection state is probably
229+
// unrecoverable and should be terminated
229230
if stream_context.keep_alive_backend {
230231
self.position =
231232
Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id)))
@@ -241,7 +242,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
241242
// if the answer is not terminated we send an RstStream to properly clean the stream
242243
// if it is terminated, we finish the transfer, the backend is not necessary anymore
243244
if !stream.back.is_terminated() {
244-
forcefully_terminate_answer(stream, &mut self.readiness);
245+
forcefully_terminate_answer(stream, &mut self.readiness, H2Error::InternalError);
245246
} else {
246247
stream.state = StreamState::Unlinked;
247248
self.readiness.interest.insert(Ready::WRITABLE);

0 commit comments

Comments
 (0)