Skip to content

Commit

Permalink
Add support for writing object metadata with PutObject
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Beal <simobeal@amazon.com>
  • Loading branch information
muddyfish committed Oct 15, 2024
1 parent 5954f53 commit 3f8a280
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 3 deletions.
21 changes: 19 additions & 2 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ pub struct MockObject {
last_modified: OffsetDateTime,
etag: ETag,
parts: Option<MockObjectParts>,
object_metadata: HashMap<String, String>,
}

impl MockObject {
Expand All @@ -391,6 +392,7 @@ impl MockObject {
last_modified: OffsetDateTime::now_utc(),
etag,
parts: None,
object_metadata: HashMap::new(),
}
}

Expand All @@ -403,6 +405,7 @@ impl MockObject {
last_modified: OffsetDateTime::now_utc(),
etag,
parts: None,
object_metadata: HashMap::new(),
}
}

Expand All @@ -425,6 +428,7 @@ impl MockObject {
last_modified: OffsetDateTime::now_utc(),
etag,
parts: None,
object_metadata: HashMap::new(),
}
}

Expand All @@ -436,6 +440,10 @@ impl MockObject {
self.storage_class = storage_class;
}

pub fn set_object_metadata(&mut self, object_metadata: HashMap<String, String>) {
self.object_metadata = object_metadata;
}

pub fn set_restored(&mut self, restore_status: Option<RestoreStatus>) {
self.restore_status = restore_status;
}
Expand Down Expand Up @@ -731,6 +739,7 @@ impl ObjectClient for MockClient {

let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
object.set_object_metadata(params.object_metadata.clone());
add_object(&self.objects, key, object);
Ok(PutObjectResult {
sse_type: None,
Expand Down Expand Up @@ -868,6 +877,7 @@ impl MockPutObjectRequest {
let buffer = std::mem::take(&mut self.buffer);
let mut object: MockObject = buffer.into();
object.set_storage_class(self.params.storage_class.clone());
object.set_object_metadata(self.params.object_metadata.clone());
// For S3 Standard, part attributes are only available when additional checksums are used
if self.params.trailing_checksums == PutObjectTrailingChecksums::Enabled {
object.parts = Some(MockObjectParts::Parts(parts));
Expand Down Expand Up @@ -950,6 +960,7 @@ mod tests {
use futures::{pin_mut, StreamExt};
use rand::{Rng, RngCore, SeedableRng};
use rand_chacha::ChaChaRng;
use std::default::Default;
use test_case::test_case;

use super::*;
Expand Down Expand Up @@ -1501,8 +1512,10 @@ mod tests {
..Default::default()
});

let object_metadata = HashMap::from([("foo".to_string(), "bar".to_string())]);
let put_object_params = PutObjectParams::new().object_metadata(object_metadata.clone());
let mut put_request = client
.put_object("test_bucket", "key1", &Default::default())
.put_object("test_bucket", "key1", &put_object_params)
.await
.expect("put_object failed");

Expand Down Expand Up @@ -1530,6 +1543,7 @@ mod tests {
next_offset += body.len() as u64;
assert_eq!(body, obj.read(offset, body.len()));
}
assert_eq!(object_metadata, get_request.object.object_metadata);
}

#[tokio::test]
Expand All @@ -1542,8 +1556,10 @@ mod tests {
});

let content = vec![42u8; 512];
let object_metadata = HashMap::from([("foo".to_string(), "bar".to_string())]);
let put_object_params = PutObjectSingleParams::new().object_metadata(object_metadata.clone());
let _put_result = client
.put_object_single("test_bucket", "key1", &Default::default(), &content)
.put_object_single("test_bucket", "key1", &put_object_params, &content)
.await
.expect("put_object failed");

Expand All @@ -1552,6 +1568,7 @@ mod tests {
.await
.expect("get_object failed");

assert_eq!(object_metadata, get_request.object.object_metadata);
// Check that the result of get_object is correct.
let actual = get_request.collect().await.expect("failed to collect body");
assert_eq!(&content, &*actual);
Expand Down
18 changes: 18 additions & 0 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use async_trait::async_trait;
use auto_impl::auto_impl;
use futures::Stream;
use mountpoint_s3_crt::s3::client::BufferPoolUsageStats;
use std::collections::HashMap;
use std::hash::Hash;
use std::pin::Pin;
use std::str::FromStr;
use std::time::SystemTime;
Expand Down Expand Up @@ -318,6 +320,8 @@ pub struct PutObjectParams {
pub ssekms_key_id: Option<String>,
/// Custom headers to add to the request
pub custom_headers: Vec<(String, String)>,
/// Object metadata to be uploaded with objects.
pub object_metadata: HashMap<String, String>,
}

impl PutObjectParams {
Expand Down Expand Up @@ -355,6 +359,12 @@ impl PutObjectParams {
self.custom_headers.push((name, value));
self
}

/// Set user defined object metadata.
pub fn object_metadata(mut self, value: HashMap<String, String>) -> Self {
self.object_metadata = value;
self
}
}

/// How CRC32c checksums are used for parts of a multi-part PutObject request
Expand Down Expand Up @@ -393,6 +403,8 @@ pub struct PutObjectSingleParams {
pub ssekms_key_id: Option<String>,
/// Custom headers to add to the request
pub custom_headers: Vec<(String, String)>,
/// Object metadata to be uploaded with objects.
pub object_metadata: HashMap<String, String>,
}

impl PutObjectSingleParams {
Expand Down Expand Up @@ -430,6 +442,12 @@ impl PutObjectSingleParams {
self.custom_headers.push((name, value));
self
}

/// Set user defined object metadata.
pub fn object_metadata(mut self, value: HashMap<String, String>) -> Self {
self.object_metadata = value;
self
}
}

/// A checksum used by the object client for integrity checks on uploads.
Expand Down
10 changes: 10 additions & 0 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ impl S3CrtClient {
};
message.set_checksum_config(checksum_config);

for (name, value) in &params.object_metadata {
message
.set_header(&Header::new(format!("x-amz-meta-{}", name), value))
.map_err(S3RequestError::construction_failure)?
}
for (name, value) in &params.custom_headers {
message
.inner
Expand Down Expand Up @@ -135,6 +140,11 @@ impl S3CrtClient {
.set_checksum_header(checksum)
.map_err(S3RequestError::construction_failure)?;
}
for (name, value) in &params.object_metadata {
message
.set_header(&Header::new(format!("x-amz-meta-{}", name), value))
.map_err(S3RequestError::construction_failure)?
}
for (name, value) in &params.custom_headers {
message
.inner
Expand Down
63 changes: 62 additions & 1 deletion mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use mountpoint_s3_client::types::{
use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::checksums::crc32c;
use rand::Rng;
use std::collections::HashMap;
use test_case::test_case;

// Simple test for PUT object. Puts a single, small object as a single part and checks that the
Expand Down Expand Up @@ -375,6 +376,66 @@ async fn test_put_checksums(trailing_checksums: PutObjectTrailingChecksums) {
}
}

#[test_case(HashMap::new(); "Empty")]
#[test_case(HashMap::from([("foo".to_string(), "bar".to_string()), ("a".to_string(), "b".to_string())]); "ASCII")]
#[tokio::test]
async fn test_put_user_object_metadata_happy(object_metadata: HashMap<String, String>) {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_user_object_metadata_happy");
let client_config = S3ClientConfig::new()
.part_size(PART_SIZE)
.endpoint_config(EndpointConfig::new(&get_test_region()));
let client = S3CrtClient::new(client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().object_metadata(object_metadata.clone());

let mut request = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

request.write(&contents).await.unwrap();
request.complete().await.unwrap();

let sdk_client = get_test_sdk_client().await;
let output = sdk_client.head_object().bucket(&bucket).key(key).send().await.unwrap();

match output.metadata() {
Some(returned_object_metadata) => {
assert_eq!(&object_metadata, returned_object_metadata);
}
None => {
assert!(object_metadata.is_empty());
}
}
}

#[test_case(HashMap::from([("£".to_string(), "£".to_string())]); "UTF-8")]
#[tokio::test]
async fn test_put_user_object_metadata_bad_header(object_metadata: HashMap<String, String>) {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_user_object_metadata_bad_header");
let client_config = S3ClientConfig::new()
.part_size(PART_SIZE)
.endpoint_config(EndpointConfig::new(&get_test_region()));
let client = S3CrtClient::new(client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().object_metadata(object_metadata.clone());

let mut request = client.put_object(&bucket, &key, &params).await.unwrap();
request.write(&contents).await.expect_err("header parsing should fail");
}

#[test_case(true; "pass review")]
#[test_case(false; "fail review")]
#[tokio::test]
Expand Down Expand Up @@ -450,7 +511,7 @@ async fn check_get_object<Client: ObjectClient>(
// S3 Express One Zone is a distinct storage class and can't be overridden
#[cfg(not(feature = "s3express_tests"))]
async fn test_put_object_storage_class(storage_class: &str) {
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_abort");
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_object_storage_class");
let client = get_test_client();
let key = format!("{prefix}hello");

Expand Down
58 changes: 58 additions & 0 deletions mountpoint-s3-client/tests/put_object_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::{ChecksumAlgorithm, PutObjectResult, PutObjectSingleParams, UploadChecksum};
use mountpoint_s3_client::{ObjectClient, S3CrtClient};
use rand::Rng;
use std::collections::HashMap;
use test_case::test_case;

// Simple test for PUT object. Puts a single, small object as a single part and checks that the
Expand Down Expand Up @@ -117,6 +118,63 @@ async fn test_put_checksums(checksum_algorithm: Option<ChecksumAlgorithm>) {
}
}

#[test_case(HashMap::new(); "Empty")]
#[test_case(HashMap::from([("foo".to_string(), "bar".to_string()), ("a".to_string(), "b".to_string())]); "ASCII")]
#[tokio::test]
async fn test_put_user_object_metadata_happy(object_metadata: HashMap<String, String>) {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_user_object_metadata_happy");
let client_config = S3ClientConfig::new()
.part_size(PART_SIZE)
.endpoint_config(EndpointConfig::new(&get_test_region()));
let client = S3CrtClient::new(client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectSingleParams::new().object_metadata(object_metadata.clone());
client
.put_object_single(&bucket, &key, &params, &contents)
.await
.expect("put_object should succeed");

let sdk_client = get_test_sdk_client().await;
let output = sdk_client.head_object().bucket(&bucket).key(key).send().await.unwrap();

match output.metadata() {
Some(returned_object_metadata) => {
assert_eq!(&object_metadata, returned_object_metadata);
}
None => {
assert!(object_metadata.is_empty());
}
}
}

#[test_case(HashMap::from([("£".to_string(), "£".to_string())]); "UTF-8")]
#[tokio::test]
async fn test_put_user_object_metadata_bad_header(object_metadata: HashMap<String, String>) {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_user_object_metadata_bad_header");
let client_config = S3ClientConfig::new()
.part_size(PART_SIZE)
.endpoint_config(EndpointConfig::new(&get_test_region()));
let client = S3CrtClient::new(client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectSingleParams::new().object_metadata(object_metadata.clone());
client
.put_object_single(&bucket, &key, &params, &contents)
.await
.expect_err("header parsing should fail");
}

#[test_case("INTELLIGENT_TIERING")]
#[test_case("GLACIER")]
#[tokio::test]
Expand Down

0 comments on commit 3f8a280

Please sign in to comment.