Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Upload to use Tower #50

Merged
merged 24 commits into from
Sep 24, 2024
Merged

Refactor Upload to use Tower #50

merged 24 commits into from
Sep 24, 2024

Conversation

waahm7
Copy link
Contributor

@waahm7 waahm7 commented Sep 13, 2024

Description of changes:
This is my first Rust PR, so feel free to provide lots of feedback.

  • Similar to refactor download to use tower #47, this PR refactors the upload_part implementation from a fixed pool of workers to tower. This will allow us to easily implement and test higher-level abstractions like hedging for slow parts. I saw no performance difference with this.
  • This also refactors the upload pipeline to distinguish between the read_body and upload_part phases. read_body still uses a fixed pool of workers because we need to support unknown content length, and I couldn't figure out how to implement an unknown amount of work in tower. The upload_part phase uses tower's concurrency_limit layer to manage the concurrency.
  • I used two different JoinSets because they both have different return types, and upload_tasks requires a lock. If I combine them into one JoinSet, we run into a deadlock where we are still trying to read_body and join all the tasks. Both the join function and read_body need a lock to join or read the body and spawn upload_tasks. Please feel free to suggest if there is a better way to accomplish this.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@waahm7 waahm7 requested a review from a team as a code owner September 13, 2024 16:23
Copy link
Contributor

@aajtodd aajtodd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple minor things. I also think it would be nice to add some integration tests with a fake server at this point for upload like we did for download. Myself or Yuki can help with structuring that if needed.

+ Send {
let svc = service_fn(upload_part_handler);
ServiceBuilder::new()
.concurrency_limit(ctx.handle.num_workers())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a TODO that this needs "globalized"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have added a FIXME.

aws-s3-transfer-manager/src/operation/upload/service.rs Outdated Show resolved Hide resolved
aws-s3-transfer-manager/src/operation/upload/service.rs Outdated Show resolved Hide resolved
};
let svc = svc.clone();
let task = async move { svc.oneshot(req).await }
.instrument(tracing::trace_span!("upload_part", worker = part_number));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix: worker = part_number doesn't make sense here, either drop it or change it to part_number = part_number (I'm not sure if knowing the part number in the logs is helpful or not)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I have updated it to part_number = part_number since I think it will be useful to know the part_number in logs.

use tokio::task;

/// Response type for a single upload object request.
#[derive(Debug)]
#[non_exhaustive]
pub struct UploadHandle {
/// All child multipart upload tasks spawned for this upload
pub(crate) tasks: task::JoinSet<Result<Vec<CompletedPart>, crate::error::Error>>,
pub(crate) upload_tasks: Arc<Mutex<task::JoinSet<Result<CompletedPart, crate::error::Error>>>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if we can make it a regular mutex. We do keep this lock across await points at https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/50/files#diff-a98b4945e17362a1dcad7da7e15d7ef7af38ff5f88ae751261823a5f23bb3652R135.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh I missed that.

Comment on lines +96 to +105
for i in 0..n_workers {
let worker = read_body(
part_reader.clone(),
handle.ctx.clone(),
svc.clone(),
handle.upload_tasks.clone(),
)
.instrument(tracing::debug_span!("read_body", worker = i));
handle.read_tasks.spawn(worker);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to better understand, from the PR description:

This also refactors the upload pipeline to distinguish between the read_body and upload_part phases. read_body still uses a fixed pool of workers because we need to support unknown content length, and I couldn't figure out how to implement an unknown amount of work in tower.

Do these sentences imply that upload_part is NOT restricted by the fixed pool of workers, since it only refers to read_body still using a fixed pool of workers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upload part is restricted by a pool of workers, but instead of us explicitly managing a pool of workers where each worker reads a part_body and then uploads a part, we let tower manage it using the concurrency_limit layer at https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/50/files#diff-a6c023261dc31765237ede4502d30ba640bca1ef9be58cb92e48ccdf69c4768cR72, and the pool of read_body workers simply spawns N upload_part tasks.

@waahm7 waahm7 merged commit e929fac into main Sep 24, 2024
13 checks passed
@waahm7 waahm7 deleted the upload-tower-multi-thread-body branch September 24, 2024 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants