Skip to content

Remove BytesSource and refactor write_source #2230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

gretchenfrage
Copy link
Collaborator

@gretchenfrage gretchenfrage commented May 10, 2025

Please see individual commit messages.

Closes #2228.

Breaking because it removes a publicly exposed item. However, probably nobody is using that item because we don't provide any possible way to use it. Other than that, this is a purely internal refactors.

Copy link
Collaborator

@Ralith Ralith left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love this simplification! The prospect of eventually having a pub-worthy generic write method is also very appealing; we're picking up too many variants.

/// callback pushes a total number of bytes less than or equal to the provided limit, it is
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed
/// that a prefix of the provided cumulative bytes will be written equal in length to the
/// provided limit.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens to the excess bytes? Would it be simpler to require (and even assert) that the limit is respected?

/// provided limit.
fn write_source<T>(
&mut self,
source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Straw alternative proposal: impl FnOnce(usize) -> I where I: IntoIter<Item=Bytes>. Let callers worry about their own return values via side-effects. This makes for a less leaky abstraction, saves a Vec, and relieves us from having to plumb a T back to the caller.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: I: IntoIter<Item=Bytes>:

I'm contemplating that, and I may experiment with it. You've mentioned the pros, the cons include:

  • More complex generics. Harder to read, adds compile time(?).
  • Probably trickier to use (I imagine I would change write_chunks to use std::iter::from_fn or something).
  • Application code able to panic in the middle of proto looping through the chunks, whereas currently it can only panic before that loop. Might be trickier to analyze.
  • May be in tension with your above suggestion to panic if the user tries to write too many bytes, because it makes it impossible to confirm up-front how many bytes the user is submitting (unless we have the user return an IntoIterator and then just collect it into a Vec, which seems like the worst of both worlds in many ways).

I'm kind of leaning against.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: "Let callers worry about their own return values via side-effects":

Here's the diff I would make to do that. It does not remove any lines of code from write_source/Send::write, it adds 2 lines of code in net, it forces me to think about the subtleties of closure variable capturing stuff, and it makes the callers do this awkward .map(|()| written) thing. Conversely, plumbing it through acts as a nice type-level proof of when the callback will or won't be called, making the code easier to reason about. Especially if in the future we expose it publicly, as in #2231.

Diff
diff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs
index 70f51e39..137cf370 100644
--- a/quinn-proto/src/connection/streams/mod.rs
+++ b/quinn-proto/src/connection/streams/mod.rs
@@ -220,11 +220,13 @@ impl<'a> SendStream<'a> {
     ///
     /// Returns the number of bytes successfully written.
     pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
+        let mut written = 0;
         self.write_source(|limit, chunks| {
             let prefix = &data[..limit.min(data.len())];
             chunks.push(prefix.to_vec().into());
-            prefix.len()
+            written = prefix.len();
         })
+        .map(|()| written)
     }
 
     /// Send data on the given stream
@@ -234,8 +236,8 @@ impl<'a> SendStream<'a> {
     /// [`Written::chunks`] will not count this chunk as fully written. However
     /// the chunk will be advanced and contain only non-written data after the call.
     pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
+        let mut written = Written::default();
         self.write_source(|limit, chunks| {
-            let mut written = Written::default();
             for chunk in data {
                 let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
                 written.bytes += prefix.len();
@@ -250,8 +252,8 @@ impl<'a> SendStream<'a> {
                     break;
                 }
             }
-            written
         })
+        .map(|()| written)
     }
 
     /// Send data on the given stream
@@ -262,10 +264,10 @@ impl<'a> SendStream<'a> {
     /// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed
     /// that a prefix of the provided cumulative bytes will be written equal in length to the
     /// provided limit.
-    fn write_source<T>(
+    fn write_source(
         &mut self,
-        source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
-    ) -> Result<T, WriteError> {
+        source: impl FnOnce(usize, &mut Vec<Bytes>),
+    ) -> Result<(), WriteError> {
         if self.conn_state.is_closed() {
             trace!(%self.id, "write blocked; connection draining");
             return Err(WriteError::Blocked);
@@ -295,14 +297,14 @@ impl<'a> SendStream<'a> {
         }
 
         let was_pending = stream.is_pending();
-        let (written, source_output) = stream.write(source, limit)?;
+        let written = stream.write(source, limit)?;
         self.state.data_sent += written as u64;
         self.state.unacked_data += written as u64;
         trace!(stream = %self.id, "wrote {} bytes", written);
         if !was_pending {
             self.state.pending.push_pending(self.id, stream.priority);
         }
-        Ok(source_output)
+        Ok(())
     }
 
     /// Check if this stream was stopped, get the reason if it was
diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs
index 52a9b714..2217b988 100644
--- a/quinn-proto/src/connection/streams/send.rs
+++ b/quinn-proto/src/connection/streams/send.rs
@@ -52,11 +52,11 @@ impl Send {
         }
     }
 
-    pub(super) fn write<T>(
+    pub(super) fn write(
         &mut self,
-        source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
+        source: impl FnOnce(usize, &mut Vec<Bytes>),
         limit: u64,
-    ) -> Result<(usize, T), WriteError> {
+    ) -> Result<usize, WriteError> {
         if !self.is_writable() {
             return Err(WriteError::ClosedStream);
         }
@@ -70,7 +70,7 @@ impl Send {
         let limit = limit.min(budget) as usize;
 
         debug_assert!(self.chunks.is_empty());
-        let source_output = source(limit, &mut self.chunks);
+        source(limit, &mut self.chunks);
 
         let mut written = 0;
 
@@ -85,7 +85,7 @@ impl Send {
             }
         }
 
-        Ok((written, source_output))
+        Ok(written)
     }
 
     /// Update stream state due to a reset sent by the local application

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I prefer it with the diff. Avoiding the generic is worth the 2 extra lines to me and IMO distributes responsibility in a better way.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it makes the callers do this awkward .map(|()| written) thing

I think this will look much less weird if you phrase it as ?; Ok(written).

@Ralith
Copy link
Collaborator

Ralith commented May 10, 2025

I think we should consider this non-breaking, since BytesSource wasn't intentionally exposed and wasn't actually useful. I doubt anyone's actually touched it.

@gretchenfrage gretchenfrage force-pushed the internal-write-source-refactor branch from 616a9a7 to 10b4569 Compare May 11, 2025 18:52
This commit is able to achieve significant simplification of code
relating to proto::SendStream::write_source. The BytesSource trait and
its two implementations are removed entirely. The source parameter to
that method is replaced with a callback which is given a limit, and an
output variable Vec<Bytes> with which to fill with Bytes to write.

Benefits of this approach include:

- More relevant logic for write and write_chunks is local.
- write no longer has to convert back and forth as much through an API
  that fundamentally assumes multiple chunks.
- Send::write no longer has to care about Written.chunks.
- Tests for BytesSource implementations are obselete and removed.

Conceivably, this could facilitate exposing these APIs in the future.
This is a minor optimization to avoid repeated re-allocation.
@gretchenfrage gretchenfrage force-pushed the internal-write-source-refactor branch from 10b4569 to 0bbaf4d Compare May 13, 2025 02:25
@gretchenfrage
Copy link
Collaborator Author

Removing the breaking label; rebased over #2233 which took care of that complication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unclear why BytesSource is public
3 participants