Skip to content

Commit f61332e

Browse files
committed
refactor: change local reset counter to use type system more
1 parent 3f1a8e3 commit f61332e

File tree

3 files changed

+96
-32
lines changed

3 files changed

+96
-32
lines changed

src/proto/connection.rs

Lines changed: 28 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -428,24 +428,7 @@ where
428428
// error. This is handled by setting a GOAWAY frame followed by
429429
// terminating the connection.
430430
Err(Error::GoAway(debug_data, reason, initiator)) => {
431-
let e = Error::GoAway(debug_data.clone(), reason, initiator);
432-
tracing::debug!(error = ?e, "Connection::poll; connection error");
433-
434-
// We may have already sent a GOAWAY for this error,
435-
// if so, don't send another, just flush and close up.
436-
if self
437-
.go_away
438-
.going_away()
439-
.map_or(false, |frame| frame.reason() == reason)
440-
{
441-
tracing::trace!(" -> already going away");
442-
*self.state = State::Closing(reason, initiator);
443-
return Ok(());
444-
}
445-
446-
// Reset all active streams
447-
self.streams.handle_error(e);
448-
self.go_away_now_data(reason, debug_data);
431+
self.handle_go_away(reason, debug_data, initiator);
449432
Ok(())
450433
}
451434
// Attempting to read a frame resulted in a stream level error.
@@ -454,7 +437,12 @@ where
454437
Err(Error::Reset(id, reason, initiator)) => {
455438
debug_assert_eq!(initiator, Initiator::Library);
456439
tracing::trace!(?id, ?reason, "stream error");
457-
self.streams.send_reset(id, reason);
440+
match self.streams.send_reset(id, reason) {
441+
Ok(()) => (),
442+
Err(crate::proto::error::GoAway { debug_data, reason }) => {
443+
self.handle_go_away(reason, debug_data, Initiator::Library);
444+
}
445+
}
458446
Ok(())
459447
}
460448
// Attempting to read a frame resulted in an I/O error. All
@@ -474,6 +462,27 @@ where
474462
}
475463
}
476464

465+
fn handle_go_away(&mut self, reason: Reason, debug_data: Bytes, initiator: Initiator) {
466+
let e = Error::GoAway(debug_data.clone(), reason, initiator);
467+
tracing::debug!(error = ?e, "Connection::poll; connection error");
468+
469+
// We may have already sent a GOAWAY for this error,
470+
// if so, don't send another, just flush and close up.
471+
if self
472+
.go_away
473+
.going_away()
474+
.map_or(false, |frame| frame.reason() == reason)
475+
{
476+
tracing::trace!(" -> already going away");
477+
*self.state = State::Closing(reason, initiator);
478+
return;
479+
}
480+
481+
// Reset all active streams
482+
self.streams.handle_error(e);
483+
self.go_away_now_data(reason, debug_data);
484+
}
485+
477486
fn recv_frame(&mut self, frame: Option<Frame>) -> Result<ReceivedFrame, Error> {
478487
use crate::frame::Frame::*;
479488
match frame {

src/proto/error.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,11 @@ pub enum Error {
1313
Io(io::ErrorKind, Option<String>),
1414
}
1515

16+
pub struct GoAway {
17+
pub debug_data: Bytes,
18+
pub reason: Reason,
19+
}
20+
1621
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1722
pub enum Initiator {
1823
User,
@@ -60,6 +65,10 @@ impl Initiator {
6065
Self::Remote => false,
6166
}
6267
}
68+
69+
pub(crate) fn is_library(&self) -> bool {
70+
matches!(self, Self::Library)
71+
}
6372
}
6473

6574
impl fmt::Display for Error {

src/proto/streams/streams.rs

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -366,7 +366,11 @@ impl<B> DynStreams<'_, B> {
366366
me.recv_eof(self.send_buffer, clear_pending_accept)
367367
}
368368

369-
pub fn send_reset(&mut self, id: StreamId, reason: Reason) {
369+
pub fn send_reset(
370+
&mut self,
371+
id: StreamId,
372+
reason: Reason,
373+
) -> Result<(), crate::proto::error::GoAway> {
370374
let mut me = self.inner.lock().unwrap();
371375
me.send_reset(self.send_buffer, id, reason)
372376
}
@@ -637,15 +641,23 @@ impl Inner {
637641
// The remote may send window updates for streams that the local now
638642
// considers closed. It's ok...
639643
if let Some(mut stream) = self.store.find_mut(&id) {
640-
// This result is ignored as there is nothing to do when there
641-
// is an error. The stream is reset by the function on error and
642-
// the error is informational.
643-
let _ = self.actions.send.recv_stream_window_update(
644-
frame.size_increment(),
644+
let res = self
645+
.actions
646+
.send
647+
.recv_stream_window_update(
648+
frame.size_increment(),
649+
send_buffer,
650+
&mut stream,
651+
&mut self.counts,
652+
&mut self.actions.task,
653+
)
654+
.map_err(|reason| Error::library_reset(id, reason));
655+
656+
return self.actions.reset_on_recv_stream_err(
645657
send_buffer,
646658
&mut stream,
647659
&mut self.counts,
648-
&mut self.actions.task,
660+
res,
649661
);
650662
} else {
651663
self.actions
@@ -882,7 +894,12 @@ impl Inner {
882894
Poll::Ready(Ok(()))
883895
}
884896

885-
fn send_reset<B>(&mut self, send_buffer: &SendBuffer<B>, id: StreamId, reason: Reason) {
897+
fn send_reset<B>(
898+
&mut self,
899+
send_buffer: &SendBuffer<B>,
900+
id: StreamId,
901+
reason: Reason,
902+
) -> Result<(), crate::proto::error::GoAway> {
886903
let key = match self.store.find_entry(id) {
887904
Entry::Occupied(e) => e.key(),
888905
Entry::Vacant(e) => {
@@ -923,7 +940,7 @@ impl Inner {
923940
Initiator::Library,
924941
&mut self.counts,
925942
send_buffer,
926-
);
943+
)
927944
}
928945
}
929946

@@ -1095,8 +1112,20 @@ impl<B> StreamRef<B> {
10951112
let mut send_buffer = self.send_buffer.inner.lock().unwrap();
10961113
let send_buffer = &mut *send_buffer;
10971114

1098-
me.actions
1099-
.send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer);
1115+
match me
1116+
.actions
1117+
.send_reset(stream, reason, Initiator::User, &mut me.counts, send_buffer)
1118+
{
1119+
Ok(()) => (),
1120+
Err(crate::proto::error::GoAway { .. }) => {
1121+
// this should never happen, because Initiator::User resets do
1122+
// not count toward the local limit.
1123+
// we could perhaps make this state impossible, if we made the
1124+
// initiator argument a generic, and so this could return
1125+
// Infallible instead of an impossible GoAway, but oh well.
1126+
unreachable!("Initiator::User should not error sending reset");
1127+
}
1128+
}
11001129
}
11011130

11021131
pub fn send_response(
@@ -1517,8 +1546,23 @@ impl Actions {
15171546
initiator: Initiator,
15181547
counts: &mut Counts,
15191548
send_buffer: &mut Buffer<Frame<B>>,
1520-
) {
1549+
) -> Result<(), crate::proto::error::GoAway> {
15211550
counts.transition(stream, |counts, stream| {
1551+
if initiator.is_library() {
1552+
if counts.can_inc_num_local_error_resets() {
1553+
counts.inc_num_local_error_resets();
1554+
} else {
1555+
tracing::warn!(
1556+
"locally-reset streams reached limit ({:?})",
1557+
counts.max_local_error_resets().unwrap(),
1558+
);
1559+
return Err(crate::proto::error::GoAway {
1560+
reason: Reason::ENHANCE_YOUR_CALM,
1561+
debug_data: "too_many_internal_resets".into(),
1562+
});
1563+
}
1564+
}
1565+
15221566
self.send.send_reset(
15231567
reason,
15241568
initiator,
@@ -1530,7 +1574,9 @@ impl Actions {
15301574
self.recv.enqueue_reset_expiration(stream, counts);
15311575
// if a RecvStream is parked, ensure it's notified
15321576
stream.notify_recv();
1533-
});
1577+
1578+
Ok(())
1579+
})
15341580
}
15351581

15361582
fn reset_on_recv_stream_err<B>(

0 commit comments

Comments
 (0)