Skip to content

Commit

Permalink
Return the new object ETag in PutObjectResult (#1057)
Browse files Browse the repository at this point in the history
* Return the ETag in PutObjectResult

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Simplify handling of response headers

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Move ETag to a separate module

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

* Add comments

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>

---------

Signed-off-by: Alessandro Passaro <alexpax@amazon.co.uk>
  • Loading branch information
passaro authored Oct 15, 2024
1 parent 6acbd20 commit e98a5c2
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 95 deletions.
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,10 @@ impl ObjectClient for MockClient {

let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
let etag = object.etag.clone();
add_object(&self.objects, key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
Expand Down Expand Up @@ -874,8 +876,10 @@ impl MockPutObjectRequest {
} else {
object.parts = Some(MockObjectParts::Count(parts.len()));
}
let etag = object.etag.clone();
add_object(&self.objects, &self.key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
Expand Down
75 changes: 16 additions & 59 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,20 @@
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use std::fmt::{self, Debug};
use std::ops::Range;
use std::pin::Pin;
use std::time::SystemTime;

use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
use std::{
fmt::{self, Debug},
ops::Range,
string::ParseError,
};
use thiserror::Error;
use time::OffsetDateTime;

/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
/// offset within the object and the bytes starting at that offset.
pub type GetBodyPart = (u64, Box<[u8]>);

/// An ETag (entity tag) is a unique identifier for a HTTP object.
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ETag(String);

impl ETag {
/// Get the ETag as a string
pub fn as_str(&self) -> &str {
&self.0
}

/// Unpack the [String] contained by the [ETag] wrapper
pub fn into_inner(self) -> String {
self.0
}

/// Creating default etag for tests
#[doc(hidden)]
pub fn for_tests() -> Self {
Self("test_etag".to_string())
}

/// Creating unique etag from bytes
#[doc(hidden)]
#[cfg(feature = "mock")]
pub fn from_object_bytes(data: &[u8]) -> Self {
use md5::Digest as _;

let mut hasher = md5::Md5::new();
hasher.update(data);

let hash = hasher.finalize();
let result = format!("{:x}", hash);
Self(result)
}
}
use crate::checksums;
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};

impl FromStr for ETag {
type Err = ParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let etag = value.to_string();
Ok(ETag(etag))
}
}
mod etag;
pub use etag::ETag;

/// A generic interface to S3-like object storage services.
///
Expand Down Expand Up @@ -436,7 +388,7 @@ impl PutObjectSingleParams {
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum UploadChecksum {
Crc32c(crate::checksums::Crc32c),
Crc32c(checksums::Crc32c),
}

impl UploadChecksum {
Expand Down Expand Up @@ -480,6 +432,10 @@ pub trait GetObjectRequest:
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
/// offset within the object and the bytes starting at that offset.
pub type GetBodyPart = (u64, Box<[u8]>);

/// A streaming put request which allows callers to asynchronously write the body of the request.
///
/// You can call the [`write`](Self::write) method to write data to the object, and then call
Expand Down Expand Up @@ -508,10 +464,11 @@ pub trait PutObjectRequest: Send {
}

/// Result of a [ObjectClient::put_object] request
// TODO: Populate this struct with return fields from the S3 API, e.g., etag.
#[derive(Debug)]
#[non_exhaustive]
pub struct PutObjectResult {
/// ETag of the uploaded object
pub etag: ETag,
/// Server-side encryption type that was used to store new object (reported by S3)
pub sse_type: Option<String>,
/// Server-side encryption KMS key ID that was used to store new object (reported by S3)
Expand Down
54 changes: 54 additions & 0 deletions mountpoint-s3-client/src/object_client/etag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::str::FromStr;
use std::string::ParseError;

/// An ETag (entity tag) is a unique identifier for a HTTP object.
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ETag(String);

impl ETag {
/// Get the ETag as a string
pub fn as_str(&self) -> &str {
&self.0
}

/// Unpack the [String] contained by the [ETag] wrapper
pub fn into_inner(self) -> String {
self.0
}

/// Creating default etag for tests
#[doc(hidden)]
pub fn for_tests() -> Self {
Self("test_etag".to_string())
}

/// Creating unique etag from bytes
#[doc(hidden)]
#[cfg(feature = "mock")]
pub fn from_object_bytes(data: &[u8]) -> Self {
use md5::Digest as _;

let mut hasher = md5::Md5::new();
hasher.update(data);

let hash = hasher.finalize();
let result = format!("{:x}", hash);
Self(result)
}
}

impl FromStr for ETag {
type Err = ParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let etag = value.to_string();
Ok(ETag(etag))
}
}

impl<S: AsRef<str>> From<S> for ETag {
fn from(value: S) -> Self {
Self(value.as_ref().to_string())
}
}
91 changes: 61 additions & 30 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use std::ffi::OsString;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::object_client::{
ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
};
use async_trait::async_trait;
use futures::channel::oneshot;
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use futures::channel::oneshot::{self, Receiver};
use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
use mountpoint_s3_crt::io::stream::InputStream;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, RequestType, UploadReview};
use thiserror::Error;
use tracing::error;

use super::{
emit_throughput_metric, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Message,
emit_throughput_metric, ETag, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Message,
S3Operation, S3RequestError,
};

const ETAG_HEADER_NAME: &str = "ETag";
const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";

Expand Down Expand Up @@ -52,14 +55,8 @@ impl S3CrtClient {
let review_callback = ReviewCallbackBox::default();
let callback = review_callback.clone();

// Variable `response_headers` will be accessed from different threads: from CRT thread which executes `on_headers` callback
// and from our thread which executes `review_and_complete`. Callback `on_headers` is guaranteed to finish before this
// variable is accessed in `review_and_complete` (see `S3HttpRequest::poll` implementation).
let response_headers: Arc<Mutex<Option<Headers>>> = Default::default();
let response_headers_writer = response_headers.clone();
let on_headers = move |headers: &Headers, _: i32| {
*response_headers_writer.lock().unwrap() = Some(headers.clone());
};
let (on_headers, response_headers) = response_headers_handler();

let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
Expand Down Expand Up @@ -114,9 +111,7 @@ impl S3CrtClient {
let span = request_span!(self.inner, "put_object_single", bucket, key);
let start_time = Instant::now();

// `response_headers` will be populated in the `on_headers` callback (on CRT event loop) and accessed in `extract_result` executing
// on a different thread after request completion.
let response_headers: Arc<Mutex<Option<Headers>>> = Default::default();
let (on_headers, response_headers) = response_headers_handler();
let slice = contents.as_ref();
let content_length = slice.len();
let body = {
Expand Down Expand Up @@ -147,10 +142,6 @@ impl S3CrtClient {
message.set_body_stream(Some(body_input_stream));

let options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObjectSingle);
let response_headers_writer = response_headers.clone();
let on_headers = move |headers: &Headers, _: i32| {
*response_headers_writer.lock().unwrap() = Some(headers.clone());
};
self.inner
.make_simple_http_request_from_options(options, span, |_| {}, |_| None, on_headers)?
};
Expand All @@ -160,7 +151,9 @@ impl S3CrtClient {
let elapsed = start_time.elapsed();
emit_throughput_metric(content_length as u64, elapsed, "put_object_single");

Ok(extract_result(&response_headers))
Ok(extract_result(response_headers.await.expect(
"headers should be available since the request completed successfully",
))?)
}

fn new_put_request(
Expand Down Expand Up @@ -245,8 +238,9 @@ pub struct S3PutObjectRequest {
review_callback: ReviewCallbackBox,
start_time: Instant,
total_bytes: u64,
/// Headers of the CompleteMultipartUpload response, available after the request was finished
response_headers: Arc<Mutex<Option<Headers>>>,
/// Future for the headers of the CompleteMultipartUpload response.
/// Guaranteed to be available after the request finishes successfully.
response_headers: Receiver<Headers>,
state: S3PutObjectRequestState,
}

Expand All @@ -267,16 +261,51 @@ fn try_get_header_value(headers: &Headers, key: &str) -> Option<String> {
headers.get(key).ok()?.value().clone().into_string().ok()
}

fn extract_result(response_headers: &Mutex<Option<Headers>>) -> PutObjectResult {
let response_headers = response_headers
.lock()
.expect("must be able to acquire headers lock")
.take()
.expect("PUT response headers must be available at this point");
PutObjectResult {
fn get_etag(response_headers: &Headers) -> Result<ETag, ParseError> {
Ok(response_headers
.get(ETAG_HEADER_NAME)?
.value()
.clone()
.into_string()
.map_err(ParseError::Invalid)?
.into())
}

#[derive(Error, Debug)]
#[non_exhaustive]
pub enum ParseError {
#[error("Header response error: {0}")]
Header(#[from] HeadersError),

#[error("Header string was not valid: {0:?}")]
Invalid(OsString),
}

fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
let etag = get_etag(&response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))?;

Ok(PutObjectResult {
etag,
sse_type: try_get_header_value(&response_headers, SSE_TYPE_HEADER_NAME),
sse_kms_key_id: try_get_header_value(&response_headers, SSE_KEY_ID_HEADER_NAME),
}
})
}

/// Creates `on_headers` callback that will send the response headers to the matching `Receiver`.
fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
let (response_headers_sender, response_headers) = oneshot::channel();
// The callback signature (`FnMut`) allows for it to be invoked multiple times,
// but for PUT requests it will only be called once (on CompleteMultipartUpload
// or on PutObject).
// Wrapping the `oneshot::Sender` in an `Option` allows it to be consumed
// on the first (and only!) invocation.
let mut response_headers_sender = Some(response_headers_sender);
let on_headers = move |headers: &Headers, _: i32| {
if let Some(sender) = response_headers_sender.take() {
let _ = sender.send(headers.clone());
}
};
(on_headers, response_headers)
}

#[cfg_attr(not(docsrs), async_trait)]
Expand Down Expand Up @@ -345,7 +374,9 @@ impl PutObjectRequest for S3PutObjectRequest {
let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");

Ok(extract_result(&self.response_headers))
Ok(extract_result(self.response_headers.await.expect(
"headers should be available since the request completed successfully",
))?)
}
}

Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_put_object(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object should succeed");
check_get_result(result, None, &contents[..]).await;
Expand All @@ -65,7 +65,7 @@ async fn test_put_object_empty(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object should succeed");
check_get_result(result, None, &[]).await;
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn test_put_object_multi_part(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object failed");
check_get_result(result, None, &contents[..]).await;
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn test_put_object_large(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object failed");
check_get_result(result, None, &contents[..]).await;
Expand Down
Loading

0 comments on commit e98a5c2

Please sign in to comment.