-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor Upload to use Tower #50
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple minor things. I also think it would be nice to add some integration tests with a fake server at this point for upload like we did for download. Myself or Yuki can help with structuring that if needed.
+ Send { | ||
let svc = service_fn(upload_part_handler); | ||
ServiceBuilder::new() | ||
.concurrency_limit(ctx.handle.num_workers()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe add a TODO that this needs "globalized"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have added a FIXME.
}; | ||
let svc = svc.clone(); | ||
let task = async move { svc.oneshot(req).await } | ||
.instrument(tracing::trace_span!("upload_part", worker = part_number)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fix: worker = part_number
doesn't make sense here, either drop it or change it to part_number = part_number
(I'm not sure if knowing the part number in the logs is helpful or not)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I have updated it to part_number = part_number
since I think it will be useful to know the part_number in logs.
use tokio::task; | ||
|
||
/// Response type for a single upload object request. | ||
#[derive(Debug)] | ||
#[non_exhaustive] | ||
pub struct UploadHandle { | ||
/// All child multipart upload tasks spawned for this upload | ||
pub(crate) tasks: task::JoinSet<Result<Vec<CompletedPart>, crate::error::Error>>, | ||
pub(crate) upload_tasks: Arc<Mutex<task::JoinSet<Result<CompletedPart, crate::error::Error>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can probably be a regular mutex from stdlib. See https://doc.servo.org/tokio/sync/struct.Mutex.html#which-kind-of-mutex-should-you-use
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure if we can make it a regular mutex. We do keep this lock across await points at https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/50/files#diff-a98b4945e17362a1dcad7da7e15d7ef7af38ff5f88ae751261823a5f23bb3652R135.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ahh I missed that.
for i in 0..n_workers { | ||
let worker = read_body( | ||
part_reader.clone(), | ||
handle.ctx.clone(), | ||
svc.clone(), | ||
handle.upload_tasks.clone(), | ||
) | ||
.instrument(tracing::debug_span!("read_body", worker = i)); | ||
handle.read_tasks.spawn(worker); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to better understand, from the PR description:
This also refactors the upload pipeline to distinguish between the
read_body
andupload_part
phases.read_body
still uses a fixed pool of workers because we need to support unknown content length, and I couldn't figure out how to implement an unknown amount of work in tower.
Do these sentences imply that upload_part
is NOT restricted by the fixed pool of workers, since it only refers to read_body
still using a fixed pool of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Upload part is restricted by a pool of workers, but instead of us explicitly managing a pool of workers where each worker reads a part_body and then uploads a part, we let tower manage it using the concurrency_limit layer at https://github.com/awslabs/aws-s3-transfer-manager-rs/pull/50/files#diff-a6c023261dc31765237ede4502d30ba640bca1ef9be58cb92e48ccdf69c4768cR72, and the pool of read_body workers simply spawns N upload_part tasks.
Description of changes:
This is my first Rust PR, so feel free to provide lots of feedback.
upload_part
implementation from a fixed pool of workers totower
. This will allow us to easily implement and test higher-level abstractions like hedging for slow parts. I saw no performance difference with this.read_body
andupload_part
phases.read_body
still uses a fixed pool of workers because we need to support unknown content length, and I couldn't figure out how to implement an unknown amount of work intower
. Theupload_part
phase uses tower'sconcurrency_limit
layer to manage the concurrency.JoinSets
because they both have different return types, andupload_tasks
requires a lock. If I combine them into oneJoinSet
, we run into a deadlock where we are still trying toread_body
and join all the tasks. Both thejoin
function andread_body
need a lock to join or read the body and spawnupload_tasks
. Please feel free to suggest if there is a better way to accomplish this.By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.