-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in GetObject that could result in empty responses #1334
Conversation
// Guaranteed when select_biased! executes the CreateMPU branch. | ||
assert!(!request.is_terminated()); | ||
// Wait for CreateMultipartUpload to complete, or return error. | ||
mpu_created.await.unwrap()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we wait indefinitely for mpu_created
here? Is there any timeout under the hood or knowledge about CreateMultipartUpload
that assure us that we won't wait forever?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See from line 93 above: if we get a result instead of the CreateMultipartUpload completing, it will be an error and we send it to this channel.
Some(S3GetObjectEvent::Error(e)) => { | ||
return Err(e); | ||
} | ||
_ => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be useful to trace!
the body here for debugging purpuses?
*this.next_offset = offset + part.len() as u64; | ||
Poll::Ready(Some(Ok((offset, part)))) | ||
} | ||
Poll::Ready(Some(S3GetObjectEvent::Headers(_))) => Poll::Pending, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shouldn't ever happen right? Maybe we can emit some error log or assert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, it could happen before. I've now updated the on_headers
callback to only send the headers once and added an assertion here.
|_, _| {}, | ||
|_| None, | ||
move |result| { | ||
if let Some(sender) = on_failure_sender.lock().unwrap().take() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe using an atomic bool instead of Arc<Mutex<Option<...>>>
could be simpler here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what you have in mind.
Another approach I considered was to introduce an "event" channel like in get_object
, but for MpuCreated
and Result
. However, it would require more changes to S3PutObjectRequest
and we are considering removing the wait for CreateMultipartUpload anyway, so I decided to go for the less intrusive change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of having only one channel for all results and keeping track of errors in a AtomicBool
, but I think that way the initial CreateMultipartUploadFailed
error would be emitted once the caller first polls this method rather than initially from this method.
|
||
#[ignore = "Stress-test to run many concurrent GetObjects. To be run manually."] | ||
#[tokio::test(flavor = "multi_thread", worker_threads = 100)] | ||
async fn stress_test_get_object() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
|_, _| {}, | ||
|_| None, | ||
move |result| { | ||
if let Some(sender) = on_failure_sender.lock().unwrap().take() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking of having only one channel for all results and keeping track of errors in a AtomicBool
, but I think that way the initial CreateMultipartUploadFailed
error would be emitted once the caller first polls this method rather than initially from this method.
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
I have rebased and taken out the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, one minor comment about an inaccurate comment that can be addressed separately.
) -> Result<S3HttpRequest<Vec<u8>, E>, S3RequestError> { | ||
let options = Self::new_meta_request_options(message, operation); | ||
self.make_simple_http_request_from_options(options, request_span, |_| {}, on_error, |_, _| ()) | ||
self.make_simple_http_request_from_options(options, request_span, |_| {}, parse_meta_request_error, |_, _| ()) | ||
} | ||
|
||
/// Make an HTTP request using this S3 client that returns the body on success or invokes the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please can we update this comment? Looks like the interface of this function has changed slight (we seem to always send body to the channel, and parse errors through the caller-provided function).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(sounds like this is to be addressed in a later PR already in progress..)
Address an issue in the
Stream
implementation forS3GetObjectResponse
that could immediately returnNone
(i.e. terminate the stream) when detecting that the meta request had completed, before returning previously received parts. Reported in #1331.The fix changes the mechanism used to extract the response body parts and the request completion from the meta request callbacks. Instead of multiple independent channels, it introduces a single channel that supports multiple
S3GetObjectEvent
s. The events in the new channel match the order in which the callbacks are invoked, which is guaranteed by the CRT. The events channel also includes theHeaders
event, avoiding the need of a separate channel to await for the headers to be returned.When using Mountpoint, an occurrence of this issue would result in a read request failing with an
Input/output error
, with a warning entry in the logs containing this message:Note however that we were not able to trigger the issue in our tests.
Does this change impact existing behavior?
No.
Does this change need a changelog entry? Does it require a version change?
Bug fix entry and increase patch version.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license and I agree to the terms of the Developer Certificate of Origin (DCO).