Skip to content

Commit e24fcb5

Browse files
committed
Implement close notify in server close
Signed-off-by: Eloi DEMOLIS <eloi.demolis@clever-cloud.com>
1 parent ad8fb54 commit e24fcb5

File tree

3 files changed

+27
-6
lines changed

3 files changed

+27
-6
lines changed

lib/src/protocol/mux/h1.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
141141
kawa.prepare(&mut kawa::h1::BlockConverter);
142142
debug_kawa(kawa);
143143
let bufs = kawa.as_io_slice();
144-
if bufs.is_empty() {
144+
if bufs.is_empty() && !self.socket.socket_wants_write() {
145145
self.readiness.interest.remove(Ready::WRITABLE);
146146
return MuxResult::Continue;
147147
}
@@ -157,7 +157,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
157157
parts.metrics.bout += size;
158158
}
159159
}
160-
if update_readiness_after_write(size, status, &mut self.readiness) {
160+
if update_readiness_after_write(size, status, &mut self.readiness)
161+
|| self.socket.socket_wants_write()
162+
{
161163
return MuxResult::Continue;
162164
}
163165

@@ -272,7 +274,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
272274
}
273275
Position::Client(_, _, BackendStatus::Connecting(_))
274276
| Position::Client(_, _, BackendStatus::Connected) => {}
275-
Position::Server => unreachable!(),
277+
Position::Server => {
278+
println_!("H1 SENDING CLOSE NOTIFY");
279+
self.socket.socket_close();
280+
let _ = self.socket.socket_write_vectored(&[]);
281+
return;
282+
}
276283
}
277284
// reconnection is handled by the server
278285
let StreamState::Linked(token) = context.streams[self.stream].state else {

lib/src/protocol/mux/h2.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
605605
priorities.sort();
606606

607607
println_!("PRIORITIES: {priorities:?}");
608+
let mut socket_write = false;
608609
'outer: for stream_id in priorities {
609610
let global_stream_id = *self.streams.get(stream_id).unwrap();
610611
let stream = &mut context.streams[global_stream_id];
@@ -621,6 +622,7 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
621622
debug_kawa(kawa);
622623
}
623624
while !kawa.out.is_empty() {
625+
socket_write = true;
624626
let bufs = kawa.as_io_slice();
625627
let (size, status) = self.socket.socket_write_vectored(&bufs);
626628
kawa.consume(size);
@@ -670,7 +672,11 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
670672
self.streams.remove(&stream_id).unwrap();
671673
}
672674

673-
if self.expect_write.is_none() {
675+
if self.socket.socket_wants_write() {
676+
if !socket_write {
677+
self.socket.socket_write(&[]);
678+
}
679+
} else if self.expect_write.is_none() {
674680
// We wrote everything
675681
self.readiness.interest.remove(Ready::WRITABLE);
676682
}
@@ -1045,7 +1051,12 @@ impl<Front: SocketHandler> ConnectionH2<Front> {
10451051
match self.position {
10461052
Position::Client(_, _, BackendStatus::KeepAlive) => unreachable!(),
10471053
Position::Client(..) => {}
1048-
Position::Server => unreachable!(),
1054+
Position::Server => {
1055+
println_!("H2 SENDING CLOSE NOTIFY");
1056+
self.socket.socket_close();
1057+
let _ = self.socket.socket_write_vectored(&[]);
1058+
return;
1059+
}
10491060
}
10501061
// reconnection is handled by the server for each stream separately
10511062
for global_stream_id in self.streams.values() {

lib/src/protocol/mux/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ fn set_default_answer(stream: &mut Stream, readiness: &mut Readiness, code: u16)
151151
fill_default_301_answer(kawa, host, uri);
152152
} else {
153153
fill_default_answer(kawa, code);
154+
context.keep_alive_frontend = false;
154155
}
155156
context.status = Some(code);
156157
stream.state = StreamState::Unlinked;
@@ -450,7 +451,7 @@ impl<Front: SocketHandler> Connection<Front> {
450451
);
451452
println_!("--------------- CONNECTION CLOSE: {backend_borrow:#?}");
452453
}
453-
Position::Server => todo!(),
454+
Position::Server => {}
454455
}
455456
match self {
456457
Connection::H1(c) => c.close(context, endpoint),
@@ -1586,6 +1587,8 @@ impl<Front: SocketHandler + std::fmt::Debug, L: ListenerHandler + L7ListenerHand
15861587
println_!("FRONTEND: {:#?}", self.frontend);
15871588
println_!("BACKENDS: {:#?}", self.router.backends);
15881589

1590+
self.frontend.close(&mut self.context, EndpointClient(&mut self.router));
1591+
15891592
for (token, client) in &mut self.router.backends {
15901593
let proxy_borrow = proxy.borrow();
15911594
client.timeout_container().cancel();

0 commit comments

Comments
 (0)