-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor download to use tower #47
Conversation
pub async fn join(mut self) -> Result<(), crate::error::Error> { | ||
self.body.close(); | ||
while let Some(join_result) = self.tasks.join_next().await { | ||
join_result?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused whether DownloadHandle.join()
is supposed to stop work ASAP or not.
Tracing the close()
calls down, it ultimately calls https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.close
pub(crate) struct UnorderedBody {
chunks: Option<mpsc::Receiver<Result<ChunkResponse, crate::error::Error>>>,
Does this stop new parts being started? Or does it just ignore the results of all further parts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm confused whether DownloadHandle.join() is supposed to stop work ASAP or not.
Stop ASAP, no. join
consumes the handle so it's invalid to access it after that. The semantics are to wait for all tasks to complete their work. If we want to cancel/abort we would want to add a method for that. I suspect we'll revisit some of this as we look at progress + cancellation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this stop new parts being started? Or does it just ignore the results of all further parts?
This stops any new parts being sent on the body channel. It does not stop in-progress/scheduled work, the results will just be ignored/dropped. I don't think we want to cancel or abort when invoking join
as a user could still be processing results off the channel (though I suppose closing the channel may be the wrong behavior as well 🤔 ).
Taking a step back what should the semantics of join be? If we don't close the channel we can result in join hanging forever if the body isn't drained because the channel could be full (which is why I added close originally). We don't know if the caller is going to drain the body though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
accidentally submitted my one questions as a standalone comment, but looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@@ -291,10 +291,23 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError> | |||
Ok(()) | |||
} | |||
|
|||
async fn warmup(config: &SdkConfig) -> Result<(), BoxError> { | |||
async fn warmup(config: &SdkConfig, bucket: &str) -> Result<(), BoxError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add an explanatory comment on what we're warming up here?
/// Consume the handle and wait for download transfer to complete | ||
pub async fn join(mut self) -> Result<(), crate::error::Error> { | ||
self.body.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, was there an issue before because of not closing the body?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Issue #, if available:
n/a
Description of changes:
The primary change in this PR is to refactor download internals from a fixed worker pool size to use tower instead. The motivation for using tower is to be able to build higher level abstractions that would have been difficult with the simple channel/worker approach. Additionally I cleaned some things up and added some new tests. I benchmarked this on a c5n.18xlarge and saw no regression in download times for a single 30 GB object downloaded to RAM. The peak throughput was actually consistently a bit higher (closer to 77-78 Gbps vs the previous ~72 Gbps).
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.