Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return the new object ETag in PutObjectResult #1057

Merged
merged 4 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,8 +731,10 @@ impl ObjectClient for MockClient {

let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
let etag = object.etag.clone();
add_object(&self.objects, key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
Expand Down Expand Up @@ -874,8 +876,10 @@ impl MockPutObjectRequest {
} else {
object.parts = Some(MockObjectParts::Count(parts.len()));
}
let etag = object.etag.clone();
add_object(&self.objects, &self.key, object);
Ok(PutObjectResult {
etag,
sse_type: None,
sse_kms_key_id: None,
})
Expand Down
75 changes: 16 additions & 59 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
@@ -1,68 +1,20 @@
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};
use std::fmt::{self, Debug};
use std::ops::Range;
use std::pin::Pin;
use std::time::SystemTime;

use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
use std::{
fmt::{self, Debug},
ops::Range,
string::ParseError,
};
use thiserror::Error;
use time::OffsetDateTime;

/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
/// offset within the object and the bytes starting at that offset.
pub type GetBodyPart = (u64, Box<[u8]>);

/// An ETag (entity tag) is a unique identifier for a HTTP object.
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ETag(String);

impl ETag {
/// Get the ETag as a string
pub fn as_str(&self) -> &str {
&self.0
}

/// Unpack the [String] contained by the [ETag] wrapper
pub fn into_inner(self) -> String {
self.0
}

/// Creating default etag for tests
#[doc(hidden)]
pub fn for_tests() -> Self {
Self("test_etag".to_string())
}

/// Creating unique etag from bytes
#[doc(hidden)]
#[cfg(feature = "mock")]
pub fn from_object_bytes(data: &[u8]) -> Self {
use md5::Digest as _;

let mut hasher = md5::Md5::new();
hasher.update(data);

let hash = hasher.finalize();
let result = format!("{:x}", hash);
Self(result)
}
}
use crate::checksums;
use crate::error_metadata::{ClientErrorMetadata, ProvideErrorMetadata};

impl FromStr for ETag {
type Err = ParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let etag = value.to_string();
Ok(ETag(etag))
}
}
mod etag;
pub use etag::ETag;

/// A generic interface to S3-like object storage services.
///
Expand Down Expand Up @@ -420,7 +372,7 @@ impl PutObjectSingleParams {
#[derive(Debug, Clone)]
#[non_exhaustive]
pub enum UploadChecksum {
Crc32c(crate::checksums::Crc32c),
Crc32c(checksums::Crc32c),
}

impl UploadChecksum {
Expand Down Expand Up @@ -464,6 +416,10 @@ pub trait GetObjectRequest:
fn read_window_end_offset(self: Pin<&Self>) -> u64;
}

/// A single element of a [`get_object`](ObjectClient::get_object) response stream is a pair of
/// offset within the object and the bytes starting at that offset.
pub type GetBodyPart = (u64, Box<[u8]>);

/// A streaming put request which allows callers to asynchronously write the body of the request.
///
/// You can call the [`write`](Self::write) method to write data to the object, and then call
Expand Down Expand Up @@ -492,10 +448,11 @@ pub trait PutObjectRequest: Send {
}

/// Result of a [ObjectClient::put_object] request
// TODO: Populate this struct with return fields from the S3 API, e.g., etag.
#[derive(Debug)]
#[non_exhaustive]
pub struct PutObjectResult {
/// ETag of the uploaded object
pub etag: ETag,
/// Server-side encryption type that was used to store new object (reported by S3)
pub sse_type: Option<String>,
/// Server-side encryption KMS key ID that was used to store new object (reported by S3)
Expand Down
54 changes: 54 additions & 0 deletions mountpoint-s3-client/src/object_client/etag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::str::FromStr;
use std::string::ParseError;

/// An ETag (entity tag) is a unique identifier for a HTTP object.
///
/// New ETags can be created with the [`FromStr`] implementation.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct ETag(String);

impl ETag {
/// Get the ETag as a string
pub fn as_str(&self) -> &str {
&self.0
}

/// Unpack the [String] contained by the [ETag] wrapper
pub fn into_inner(self) -> String {
self.0
}

/// Creating default etag for tests
#[doc(hidden)]
pub fn for_tests() -> Self {
Self("test_etag".to_string())
}

/// Creating unique etag from bytes
#[doc(hidden)]
#[cfg(feature = "mock")]
pub fn from_object_bytes(data: &[u8]) -> Self {
use md5::Digest as _;

let mut hasher = md5::Md5::new();
hasher.update(data);

let hash = hasher.finalize();
let result = format!("{:x}", hash);
Self(result)
}
}

impl FromStr for ETag {
type Err = ParseError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let etag = value.to_string();
Ok(ETag(etag))
}
}

impl<S: AsRef<str>> From<S> for ETag {
fn from(value: S) -> Self {
Self(value.as_ref().to_string())
}
}
91 changes: 61 additions & 30 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use std::ffi::OsString;
use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::object_client::{
ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
};
use async_trait::async_trait;
use futures::channel::oneshot;
use mountpoint_s3_crt::http::request_response::{Header, Headers};
use futures::channel::oneshot::{self, Receiver};
use mountpoint_s3_crt::http::request_response::{Header, Headers, HeadersError};
use mountpoint_s3_crt::io::stream::InputStream;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, RequestType, UploadReview};
use thiserror::Error;
use tracing::error;

use super::{
emit_throughput_metric, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Message,
emit_throughput_metric, ETag, PutObjectTrailingChecksums, S3CrtClient, S3CrtClientInner, S3HttpRequest, S3Message,
S3Operation, S3RequestError,
};

const ETAG_HEADER_NAME: &str = "ETag";
const SSE_TYPE_HEADER_NAME: &str = "x-amz-server-side-encryption";
const SSE_KEY_ID_HEADER_NAME: &str = "x-amz-server-side-encryption-aws-kms-key-id";

Expand Down Expand Up @@ -45,14 +48,8 @@ impl S3CrtClient {
let review_callback = ReviewCallbackBox::default();
let callback = review_callback.clone();

// Variable `response_headers` will be accessed from different threads: from CRT thread which executes `on_headers` callback
// and from our thread which executes `review_and_complete`. Callback `on_headers` is guaranteed to finish before this
// variable is accessed in `review_and_complete` (see `S3HttpRequest::poll` implementation).
let response_headers: Arc<Mutex<Option<Headers>>> = Default::default();
let response_headers_writer = response_headers.clone();
let on_headers = move |headers: &Headers, _: i32| {
*response_headers_writer.lock().unwrap() = Some(headers.clone());
};
let (on_headers, response_headers) = response_headers_handler();

let mut options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObject);
options.send_using_async_writes(true);
options.on_upload_review(move |review| callback.invoke(review));
Expand Down Expand Up @@ -107,9 +104,7 @@ impl S3CrtClient {
let span = request_span!(self.inner, "put_object_single", bucket, key);
let start_time = Instant::now();

// `response_headers` will be populated in the `on_headers` callback (on CRT event loop) and accessed in `extract_result` executing
// on a different thread after request completion.
let response_headers: Arc<Mutex<Option<Headers>>> = Default::default();
let (on_headers, response_headers) = response_headers_handler();
let slice = contents.as_ref();
let content_length = slice.len();
let body = {
Expand All @@ -134,10 +129,6 @@ impl S3CrtClient {
message.set_body_stream(Some(body_input_stream));

let options = S3CrtClientInner::new_meta_request_options(message, S3Operation::PutObjectSingle);
let response_headers_writer = response_headers.clone();
let on_headers = move |headers: &Headers, _: i32| {
*response_headers_writer.lock().unwrap() = Some(headers.clone());
};
self.inner
.make_simple_http_request_from_options(options, span, |_| {}, |_| None, on_headers)?
};
Expand All @@ -147,7 +138,9 @@ impl S3CrtClient {
let elapsed = start_time.elapsed();
emit_throughput_metric(content_length as u64, elapsed, "put_object_single");

Ok(extract_result(&response_headers))
Ok(extract_result(response_headers.await.expect(
"headers should be available since the request completed successfully",
))?)
}

fn new_put_request(
Expand Down Expand Up @@ -232,8 +225,9 @@ pub struct S3PutObjectRequest {
review_callback: ReviewCallbackBox,
start_time: Instant,
total_bytes: u64,
/// Headers of the CompleteMultipartUpload response, available after the request was finished
response_headers: Arc<Mutex<Option<Headers>>>,
/// Future for the headers of the CompleteMultipartUpload response.
/// Guaranteed to be available after the request finishes successfully.
response_headers: Receiver<Headers>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer response_headers_receiver and update the comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing back a little on this. A Receiver is a future and that's how it is used here. But happy to clarify in the comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just not clear from the naming that we have to call .await to really get the headers, but not blocking on it.

state: S3PutObjectRequestState,
}

Expand All @@ -254,16 +248,51 @@ fn try_get_header_value(headers: &Headers, key: &str) -> Option<String> {
headers.get(key).ok()?.value().clone().into_string().ok()
}

fn extract_result(response_headers: &Mutex<Option<Headers>>) -> PutObjectResult {
let response_headers = response_headers
.lock()
.expect("must be able to acquire headers lock")
.take()
.expect("PUT response headers must be available at this point");
PutObjectResult {
fn get_etag(response_headers: &Headers) -> Result<ETag, ParseError> {
Ok(response_headers
.get(ETAG_HEADER_NAME)?
.value()
.clone()
.into_string()
.map_err(ParseError::Invalid)?
.into())
}

#[derive(Error, Debug)]
#[non_exhaustive]
pub enum ParseError {
#[error("Header response error: {0}")]
Header(#[from] HeadersError),

#[error("Header string was not valid: {0:?}")]
Invalid(OsString),
}

fn extract_result(response_headers: Headers) -> Result<PutObjectResult, S3RequestError> {
let etag = get_etag(&response_headers).map_err(|e| S3RequestError::InternalError(Box::new(e)))?;

Ok(PutObjectResult {
etag,
sse_type: try_get_header_value(&response_headers, SSE_TYPE_HEADER_NAME),
sse_kms_key_id: try_get_header_value(&response_headers, SSE_KEY_ID_HEADER_NAME),
}
})
}

/// Creates `on_headers` callback that will send the response headers to the matching `Receiver`.
fn response_headers_handler() -> (impl FnMut(&Headers, i32), Receiver<Headers>) {
let (response_headers_sender, response_headers) = oneshot::channel();
// The callback signature (`FnMut`) allows for it to be invoked multiple times,
// but for PUT requests it will only be called once (on CompleteMultipartUpload
// or on PutObject).
// Wrapping the `oneshot::Sender` in an `Option` allows it to be consumed
// on the first (and only!) invocation.
let mut response_headers_sender = Some(response_headers_sender);
monthonk marked this conversation as resolved.
Show resolved Hide resolved
let on_headers = move |headers: &Headers, _: i32| {
if let Some(sender) = response_headers_sender.take() {
let _ = sender.send(headers.clone());
}
};
(on_headers, response_headers)
}

#[cfg_attr(not(docsrs), async_trait)]
Expand Down Expand Up @@ -332,7 +361,9 @@ impl PutObjectRequest for S3PutObjectRequest {
let elapsed = self.start_time.elapsed();
emit_throughput_metric(self.total_bytes, elapsed, "put_object");

Ok(extract_result(&self.response_headers))
Ok(extract_result(self.response_headers.await.expect(
"headers should be available since the request completed successfully",
))?)
}
}

Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn test_put_object(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object should succeed");
check_get_result(result, None, &contents[..]).await;
Expand All @@ -65,7 +65,7 @@ async fn test_put_object_empty(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object should succeed");
check_get_result(result, None, &[]).await;
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn test_put_object_multi_part(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object failed");
check_get_result(result, None, &contents[..]).await;
Expand Down Expand Up @@ -137,7 +137,7 @@ async fn test_put_object_large(
let put_object_result = request.complete().await.unwrap();

let result = client
.get_object(bucket, key, None, None)
.get_object(bucket, key, None, Some(put_object_result.etag.clone()))
.await
.expect("get_object failed");
check_get_result(result, None, &contents[..]).await;
Expand Down
Loading
Loading