Skip to content

Commit 489e590

Browse files
committed
Proxying enhancements:
- Primitive GoAway and connection error handling for H2 (todo: stream error handling) - Add `force_disconnect` method to H2 that triggers the clean up routine by faking socket disconnection (experimental) - Reading parts of a message properly inserts WRITABLE in the opposite endpoint readiness interest - Fix stream_id numbering - `set_default_answer` and `forcefully_terminate_answer` update StreamState to Unlinked - Replace std::mem::swap with more appropriate variants
1 parent fe61422 commit 489e590

File tree

6 files changed

+190
-132
lines changed

6 files changed

+190
-132
lines changed

lib/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -539,9 +539,7 @@ macro_rules! StateMachineBuilder {
539539
/// leaving a FailedUpgrade in its place.
540540
/// The FailedUpgrade retains the marker of the previous State.
541541
fn take(&mut self) -> $state_name {
542-
let mut owned_state = $state_name::FailedUpgrade(self.marker());
543-
std::mem::swap(&mut owned_state, self);
544-
owned_state
542+
std::mem::replace(self, $state_name::FailedUpgrade(self.marker()))
545543
}
546544
_fn_impl!{front_socket(&, self) -> &mio::net::TcpStream}
547545
}

lib/src/protocol/mux/converter.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ impl<'a, T: AsBuffer> BlockConverter<T> for H2BlockConverter<'a> {
125125
..
126126
}) => {
127127
if end_header {
128-
let mut payload = Vec::new();
129-
std::mem::swap(&mut self.out, &mut payload);
128+
let payload = std::mem::replace(&mut self.out, Vec::new());
130129
let mut header = [0; 9];
131130
let flags = if end_stream { 1 } else { 0 } | if end_header { 4 } else { 0 };
132131
gen_frame_header(

lib/src/protocol/mux/h1.rs

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ use sozu_command::ready::Ready;
33
use crate::{
44
println_,
55
protocol::mux::{
6-
debug_kawa, set_default_answer, update_readiness_after_read, update_readiness_after_write,
7-
BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult, Position, StreamState, forcefully_terminate_answer,
6+
debug_kawa, forcefully_terminate_answer, set_default_answer, update_readiness_after_read,
7+
update_readiness_after_write, BackendStatus, Context, Endpoint, GlobalStreamId, MuxResult,
8+
Position, StreamState,
89
},
910
socket::SocketHandler,
1011
Readiness,
@@ -58,16 +59,15 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
5859
endpoint.end_stream(token, global_stream_id, context);
5960
}
6061
Position::Server => {
61-
set_default_answer(&mut stream.back, &mut self.readiness, 400);
62-
stream.state = StreamState::Unlinked;
62+
set_default_answer(stream, &mut self.readiness, 400);
6363
}
6464
}
6565
return MuxResult::Continue;
6666
}
6767
if kawa.is_terminated() {
6868
self.readiness.interest.remove(Ready::READABLE);
6969
}
70-
if was_initial && kawa.is_main_phase() {
70+
if kawa.is_main_phase() {
7171
match self.position {
7272
Position::Client(_) => {
7373
let StreamState::Linked(token) = stream.state else { unreachable!() };
@@ -77,12 +77,14 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
7777
.insert(Ready::WRITABLE)
7878
}
7979
Position::Server => {
80-
self.requests += 1;
81-
println_!("REQUESTS: {}", self.requests);
82-
stream.state = StreamState::Link
80+
if was_initial {
81+
self.requests += 1;
82+
println_!("REQUESTS: {}", self.requests);
83+
stream.state = StreamState::Link
84+
}
8385
}
84-
};
85-
}
86+
}
87+
};
8688
MuxResult::Continue
8789
}
8890

@@ -115,7 +117,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
115117
stream.back.storage.clear();
116118
stream.front.clear();
117119
// do not clear stream.front.storage because of H1 pipelining
118-
if let StreamState::Linked(token) = stream.state {
120+
stream.attempts = 0;
121+
let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
122+
if let StreamState::Linked(token) = old_state {
119123
endpoint.end_stream(token, self.stream, context);
120124
}
121125
}
@@ -148,14 +152,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
148152
let stream = &mut context.streams[stream];
149153
let stream_context = &mut stream.context;
150154
println_!("end H1 stream {}: {stream_context:#?}", self.stream);
151-
self.stream = usize::MAX;
152-
let mut owned_position = Position::Server;
153-
std::mem::swap(&mut owned_position, &mut self.position);
154-
match owned_position {
155+
match &mut self.position {
155156
Position::Client(BackendStatus::Connected(cluster_id))
156157
| Position::Client(BackendStatus::Connecting(cluster_id)) => {
158+
self.stream = usize::MAX;
157159
self.position = if stream_context.keep_alive_backend {
158-
Position::Client(BackendStatus::KeepAlive(cluster_id))
160+
Position::Client(BackendStatus::KeepAlive(std::mem::take(cluster_id)))
159161
} else {
160162
Position::Client(BackendStatus::Disconnecting)
161163
}
@@ -168,13 +170,14 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
168170
// if the answer is not terminated we send an RstStream to properly clean the stream
169171
// if it is terminated, we finish the transfer, the backend is not necessary anymore
170172
if !stream.back.is_terminated() {
171-
forcefully_terminate_answer(&mut stream.back, &mut self.readiness);
173+
forcefully_terminate_answer(stream, &mut self.readiness);
174+
} else {
175+
stream.state = StreamState::Unlinked;
176+
self.readiness.interest.insert(Ready::WRITABLE);
172177
}
173-
stream.state = StreamState::Unlinked;
174178
}
175179
(true, false) => {
176-
set_default_answer(&mut stream.back, &mut self.readiness, 502);
177-
stream.state = StreamState::Unlinked;
180+
set_default_answer(stream, &mut self.readiness, 502);
178181
}
179182
(false, false) => {
180183
// we do not have an answer, but the request is untouched so we can retry
@@ -189,16 +192,13 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
189192
pub fn start_stream(&mut self, stream: GlobalStreamId, context: &mut Context) {
190193
println_!("start H1 stream {stream} {:?}", self.readiness);
191194
self.stream = stream;
192-
let mut owned_position = Position::Server;
193-
std::mem::swap(&mut owned_position, &mut self.position);
194-
match owned_position {
195+
match &mut self.position {
195196
Position::Client(BackendStatus::KeepAlive(cluster_id)) => {
196-
self.position = Position::Client(BackendStatus::Connecting(cluster_id))
197+
self.position =
198+
Position::Client(BackendStatus::Connecting(std::mem::take(cluster_id)))
197199
}
200+
Position::Client(_) => {}
198201
Position::Server => unreachable!(),
199-
_ => {
200-
self.position = owned_position;
201-
}
202202
}
203203
}
204204
}

0 commit comments

Comments
 (0)