-
-
Notifications
You must be signed in to change notification settings - Fork 431
Remove BytesSource
and refactor write_source
#2230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Remove BytesSource
and refactor write_source
#2230
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I love this simplification! The prospect of eventually having a pub
-worthy generic write
method is also very appealing; we're picking up too many variants.
/// callback pushes a total number of bytes less than or equal to the provided limit, it is | ||
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed | ||
/// that a prefix of the provided cumulative bytes will be written equal in length to the | ||
/// provided limit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens to the excess bytes? Would it be simpler to require (and even assert
) that the limit is respected?
/// provided limit. | ||
fn write_source<T>( | ||
&mut self, | ||
source: impl FnOnce(usize, &mut Vec<Bytes>) -> T, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Straw alternative proposal: impl FnOnce(usize) -> I
where I: IntoIter<Item=Bytes>
. Let callers worry about their own return values via side-effects. This makes for a less leaky abstraction, saves a Vec
, and relieves us from having to plumb a T
back to the caller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: I: IntoIter<Item=Bytes>
:
I'm contemplating that, and I may experiment with it. You've mentioned the pros, the cons include:
- More complex generics. Harder to read, adds compile time(?).
- Probably trickier to use (I imagine I would change
write_chunks
to usestd::iter::from_fn
or something). - Application code able to panic in the middle of proto looping through the chunks, whereas currently it can only panic before that loop. Might be trickier to analyze.
- May be in tension with your above suggestion to panic if the user tries to write too many bytes, because it makes it impossible to confirm up-front how many bytes the user is submitting (unless we have the user return an
IntoIterator
and then just collect it into aVec
, which seems like the worst of both worlds in many ways).
I'm kind of leaning against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: "Let callers worry about their own return values via side-effects":
Here's the diff I would make to do that. It does not remove any lines of code from write_source
/Send::write
, it adds 2 lines of code in net, it forces me to think about the subtleties of closure variable capturing stuff, and it makes the callers do this awkward .map(|()| written)
thing. Conversely, plumbing it through acts as a nice type-level proof of when the callback will or won't be called, making the code easier to reason about. Especially if in the future we expose it publicly, as in #2231.
Diff
diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs
index 70f51e39..137cf370 100644
--- a/quinn-proto/src/connection/streams/mod.rs
+++ b/quinn-proto/src/connection/streams/mod.rs
@@ -220,11 +220,13 @@ impl<'a> SendStream<'a> {
///
/// Returns the number of bytes successfully written.
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
+ let mut written = 0;
self.write_source(|limit, chunks| {
let prefix = &data[..limit.min(data.len())];
chunks.push(prefix.to_vec().into());
- prefix.len()
+ written = prefix.len();
})
+ .map(|()| written)
}
/// Send data on the given stream
@@ -234,8 +236,8 @@ impl<'a> SendStream<'a> {
/// [`Written::chunks`] will not count this chunk as fully written. However
/// the chunk will be advanced and contain only non-written data after the call.
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
+ let mut written = Written::default();
self.write_source(|limit, chunks| {
- let mut written = Written::default();
for chunk in data {
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
written.bytes += prefix.len();
@@ -250,8 +252,8 @@ impl<'a> SendStream<'a> {
break;
}
}
- written
})
+ .map(|()| written)
}
/// Send data on the given stream
@@ -262,10 +264,10 @@ impl<'a> SendStream<'a> {
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed
/// that a prefix of the provided cumulative bytes will be written equal in length to the
/// provided limit.
- fn write_source<T>(
+ fn write_source(
&mut self,
- source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
- ) -> Result<T, WriteError> {
+ source: impl FnOnce(usize, &mut Vec<Bytes>),
+ ) -> Result<(), WriteError> {
if self.conn_state.is_closed() {
trace!(%self.id, "write blocked; connection draining");
return Err(WriteError::Blocked);
@@ -295,14 +297,14 @@ impl<'a> SendStream<'a> {
}
let was_pending = stream.is_pending();
- let (written, source_output) = stream.write(source, limit)?;
+ let written = stream.write(source, limit)?;
self.state.data_sent += written as u64;
self.state.unacked_data += written as u64;
trace!(stream = %self.id, "wrote {} bytes", written);
if !was_pending {
self.state.pending.push_pending(self.id, stream.priority);
}
- Ok(source_output)
+ Ok(())
}
/// Check if this stream was stopped, get the reason if it was
diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs
index 52a9b714..2217b988 100644
--- a/quinn-proto/src/connection/streams/send.rs
+++ b/quinn-proto/src/connection/streams/send.rs
@@ -52,11 +52,11 @@ impl Send {
}
}
- pub(super) fn write<T>(
+ pub(super) fn write(
&mut self,
- source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
+ source: impl FnOnce(usize, &mut Vec<Bytes>),
limit: u64,
- ) -> Result<(usize, T), WriteError> {
+ ) -> Result<usize, WriteError> {
if !self.is_writable() {
return Err(WriteError::ClosedStream);
}
@@ -70,7 +70,7 @@ impl Send {
let limit = limit.min(budget) as usize;
debug_assert!(self.chunks.is_empty());
- let source_output = source(limit, &mut self.chunks);
+ source(limit, &mut self.chunks);
let mut written = 0;
@@ -85,7 +85,7 @@ impl Send {
}
}
- Ok((written, source_output))
+ Ok(written)
}
/// Update stream state due to a reset sent by the local application
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I prefer it with the diff. Avoiding the generic is worth the 2 extra lines to me and IMO distributes responsibility in a better way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it makes the callers do this awkward .map(|()| written) thing
I think this will look much less weird if you phrase it as ?; Ok(written)
.
I think we should consider this non-breaking, since |
616a9a7
to
10b4569
Compare
This commit is able to achieve significant simplification of code relating to proto::SendStream::write_source. The BytesSource trait and its two implementations are removed entirely. The source parameter to that method is replaced with a callback which is given a limit, and an output variable Vec<Bytes> with which to fill with Bytes to write. Benefits of this approach include: - More relevant logic for write and write_chunks is local. - write no longer has to convert back and forth as much through an API that fundamentally assumes multiple chunks. - Send::write no longer has to care about Written.chunks. - Tests for BytesSource implementations are obselete and removed. Conceivably, this could facilitate exposing these APIs in the future.
This is a minor optimization to avoid repeated re-allocation.
10b4569
to
0bbaf4d
Compare
Removing the breaking label; rebased over #2233 which took care of that complication. |
Please see individual commit messages.
Closes #2228.
Breaking because it removes a publicly exposed item. However, probably nobody is using that item because we don't provide any possible way to use it. Other than that, this is a purely internal refactors.