Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor download to use tower #47

Merged
merged 10 commits into from
Sep 6, 2024
Merged

refactor download to use tower #47

merged 10 commits into from
Sep 6, 2024

Conversation

aajtodd
Copy link
Contributor

@aajtodd aajtodd commented Aug 30, 2024

Issue #, if available:
n/a

Description of changes:

The primary change in this PR is to refactor download internals from a fixed worker pool size to use tower instead. The motivation for using tower is to be able to build higher level abstractions that would have been difficult with the simple channel/worker approach. Additionally I cleaned some things up and added some new tests. I benchmarked this on a c5n.18xlarge and saw no regression in download times for a single 30 GB object downloaded to RAM. The peak throughput was actually consistently a bit higher (closer to 77-78 Gbps vs the previous ~72 Gbps).

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@aajtodd aajtodd requested a review from a team as a code owner August 30, 2024 15:11
Comment on lines 45 to 48
pub async fn join(mut self) -> Result<(), crate::error::Error> {
self.body.close();
while let Some(join_result) = self.tasks.join_next().await {
join_result?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused whether DownloadHandle.join() is supposed to stop work ASAP or not.

Tracing the close() calls down, it ultimately calls https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.close

pub(crate) struct UnorderedBody {
    chunks: Option<mpsc::Receiver<Result<ChunkResponse, crate::error::Error>>>,

Does this stop new parts being started? Or does it just ignore the results of all further parts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused whether DownloadHandle.join() is supposed to stop work ASAP or not.

Stop ASAP, no. join consumes the handle so it's invalid to access it after that. The semantics are to wait for all tasks to complete their work. If we want to cancel/abort we would want to add a method for that. I suspect we'll revisit some of this as we look at progress + cancellation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this stop new parts being started? Or does it just ignore the results of all further parts?

This stops any new parts being sent on the body channel. It does not stop in-progress/scheduled work, the results will just be ignored/dropped. I don't think we want to cancel or abort when invoking join as a user could still be processing results off the channel (though I suppose closing the channel may be the wrong behavior as well 🤔 ).

Taking a step back what should the semantics of join be? If we don't close the channel we can result in join hanging forever if the body isn't drained because the channel could be full (which is why I added close originally). We don't know if the caller is going to drain the body though.

Copy link
Contributor

@graebm graebm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

accidentally submitted my one questions as a standalone comment, but looks good.

Copy link

@ysaito1001 ysaito1001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@@ -291,10 +291,23 @@ async fn write_body(mut body: Body, mut dest: fs::File) -> Result<(), BoxError>
Ok(())
}

async fn warmup(config: &SdkConfig) -> Result<(), BoxError> {
async fn warmup(config: &SdkConfig, bucket: &str) -> Result<(), BoxError> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we add an explanatory comment on what we're warming up here?

/// Consume the handle and wait for download transfer to complete
pub async fn join(mut self) -> Result<(), crate::error::Error> {
self.body.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, was there an issue before because of not closing the body?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aajtodd aajtodd merged commit f57573c into main Sep 6, 2024
14 checks passed
@aajtodd aajtodd deleted the atodd/tower branch September 6, 2024 13:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants