-
-
Notifications
You must be signed in to change notification settings - Fork 431
Remove BytesSource
and refactor write_source
#2230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,9 +19,8 @@ use recv::Recv; | |
pub use recv::{Chunks, ReadError, ReadableError}; | ||
|
||
mod send; | ||
pub(crate) use send::{ByteSlice, BytesArray}; | ||
use send::{BytesSource, Send, SendState}; | ||
pub use send::{FinishError, WriteError, Written}; | ||
use send::{Send, SendState}; | ||
|
||
mod state; | ||
#[allow(unreachable_pub)] // fuzzing only | ||
|
@@ -221,7 +220,11 @@ impl<'a> SendStream<'a> { | |
/// | ||
/// Returns the number of bytes successfully written. | ||
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> { | ||
Ok(self.write_source(&mut ByteSlice::from_slice(data))?.bytes) | ||
self.write_source(|limit, chunks| { | ||
let prefix = &data[..limit.min(data.len())]; | ||
chunks.push(prefix.to_vec().into()); | ||
prefix.len() | ||
}) | ||
} | ||
|
||
/// Send data on the given stream | ||
|
@@ -231,10 +234,38 @@ impl<'a> SendStream<'a> { | |
/// [`Written::chunks`] will not count this chunk as fully written. However | ||
/// the chunk will be advanced and contain only non-written data after the call. | ||
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> { | ||
self.write_source(&mut BytesArray::from_chunks(data)) | ||
self.write_source(|limit, chunks| { | ||
let mut written = Written::default(); | ||
for chunk in data { | ||
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes)); | ||
written.bytes += prefix.len(); | ||
chunks.push(prefix); | ||
|
||
if chunk.is_empty() { | ||
written.chunks += 1; | ||
} | ||
|
||
debug_assert!(written.bytes <= limit); | ||
if written.bytes == limit { | ||
break; | ||
} | ||
} | ||
written | ||
}) | ||
} | ||
|
||
fn write_source<B: BytesSource>(&mut self, source: &mut B) -> Result<Written, WriteError> { | ||
/// Send data on the given stream | ||
/// | ||
/// The `source` callback is invoked with the number of bytes that can be written immediately, | ||
/// as well as an initially empty `&mut Vec<Bytes>` to which it can push bytes to write. If the | ||
/// callback pushes a total number of bytes less than or equal to the provided limit, it is | ||
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed | ||
/// that a prefix of the provided cumulative bytes will be written equal in length to the | ||
/// provided limit. | ||
fn write_source<T>( | ||
&mut self, | ||
source: impl FnOnce(usize, &mut Vec<Bytes>) -> T, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Straw alternative proposal: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: I'm contemplating that, and I may experiment with it. You've mentioned the pros, the cons include:
I'm kind of leaning against. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re: "Let callers worry about their own return values via side-effects": Here's the diff I would make to do that. It does not remove any lines of code from Diffdiff --git a/quinn-proto/src/connection/streams/mod.rs b/quinn-proto/src/connection/streams/mod.rs
index 70f51e39..137cf370 100644
--- a/quinn-proto/src/connection/streams/mod.rs
+++ b/quinn-proto/src/connection/streams/mod.rs
@@ -220,11 +220,13 @@ impl<'a> SendStream<'a> {
///
/// Returns the number of bytes successfully written.
pub fn write(&mut self, data: &[u8]) -> Result<usize, WriteError> {
+ let mut written = 0;
self.write_source(|limit, chunks| {
let prefix = &data[..limit.min(data.len())];
chunks.push(prefix.to_vec().into());
- prefix.len()
+ written = prefix.len();
})
+ .map(|()| written)
}
/// Send data on the given stream
@@ -234,8 +236,8 @@ impl<'a> SendStream<'a> {
/// [`Written::chunks`] will not count this chunk as fully written. However
/// the chunk will be advanced and contain only non-written data after the call.
pub fn write_chunks(&mut self, data: &mut [Bytes]) -> Result<Written, WriteError> {
+ let mut written = Written::default();
self.write_source(|limit, chunks| {
- let mut written = Written::default();
for chunk in data {
let prefix = chunk.split_to(chunk.len().min(limit - written.bytes));
written.bytes += prefix.len();
@@ -250,8 +252,8 @@ impl<'a> SendStream<'a> {
break;
}
}
- written
})
+ .map(|()| written)
}
/// Send data on the given stream
@@ -262,10 +264,10 @@ impl<'a> SendStream<'a> {
/// guaranteed they will all be written. If it provides more bytes than this, it is guaranteed
/// that a prefix of the provided cumulative bytes will be written equal in length to the
/// provided limit.
- fn write_source<T>(
+ fn write_source(
&mut self,
- source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
- ) -> Result<T, WriteError> {
+ source: impl FnOnce(usize, &mut Vec<Bytes>),
+ ) -> Result<(), WriteError> {
if self.conn_state.is_closed() {
trace!(%self.id, "write blocked; connection draining");
return Err(WriteError::Blocked);
@@ -295,14 +297,14 @@ impl<'a> SendStream<'a> {
}
let was_pending = stream.is_pending();
- let (written, source_output) = stream.write(source, limit)?;
+ let written = stream.write(source, limit)?;
self.state.data_sent += written as u64;
self.state.unacked_data += written as u64;
trace!(stream = %self.id, "wrote {} bytes", written);
if !was_pending {
self.state.pending.push_pending(self.id, stream.priority);
}
- Ok(source_output)
+ Ok(())
}
/// Check if this stream was stopped, get the reason if it was
diff --git a/quinn-proto/src/connection/streams/send.rs b/quinn-proto/src/connection/streams/send.rs
index 52a9b714..2217b988 100644
--- a/quinn-proto/src/connection/streams/send.rs
+++ b/quinn-proto/src/connection/streams/send.rs
@@ -52,11 +52,11 @@ impl Send {
}
}
- pub(super) fn write<T>(
+ pub(super) fn write(
&mut self,
- source: impl FnOnce(usize, &mut Vec<Bytes>) -> T,
+ source: impl FnOnce(usize, &mut Vec<Bytes>),
limit: u64,
- ) -> Result<(usize, T), WriteError> {
+ ) -> Result<usize, WriteError> {
if !self.is_writable() {
return Err(WriteError::ClosedStream);
}
@@ -70,7 +70,7 @@ impl Send {
let limit = limit.min(budget) as usize;
debug_assert!(self.chunks.is_empty());
- let source_output = source(limit, &mut self.chunks);
+ source(limit, &mut self.chunks);
let mut written = 0;
@@ -85,7 +85,7 @@ impl Send {
}
}
- Ok((written, source_output))
+ Ok(written)
}
/// Update stream state due to a reset sent by the local application
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I prefer it with the diff. Avoiding the generic is worth the 2 extra lines to me and IMO distributes responsibility in a better way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think this will look much less weird if you phrase it as |
||
) -> Result<T, WriteError> { | ||
if self.conn_state.is_closed() { | ||
trace!(%self.id, "write blocked; connection draining"); | ||
return Err(WriteError::Blocked); | ||
|
@@ -264,14 +295,14 @@ impl<'a> SendStream<'a> { | |
} | ||
|
||
let was_pending = stream.is_pending(); | ||
let written = stream.write(source, limit)?; | ||
self.state.data_sent += written.bytes as u64; | ||
self.state.unacked_data += written.bytes as u64; | ||
trace!(stream = %self.id, "wrote {} bytes", written.bytes); | ||
let (written, source_output) = stream.write(source, limit)?; | ||
self.state.data_sent += written as u64; | ||
self.state.unacked_data += written as u64; | ||
trace!(stream = %self.id, "wrote {} bytes", written); | ||
if !was_pending { | ||
self.state.pending.push_pending(self.id, stream.priority); | ||
} | ||
Ok(written) | ||
Ok(source_output) | ||
} | ||
|
||
/// Check if this stream was stopped, get the reason if it was | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens to the excess bytes? Would it be simpler to require (and even
assert
) that the limit is respected?