Skip to content

Commit

Permalink
Address review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
benesch committed Nov 6, 2024
1 parent 3206e8c commit 9cf76bc
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/object_store.yml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ jobs:
- name: Run object_store tests (AWS native conditional put)
run: cargo test --features=aws
env:
AWS_CONDITIONAL_PUT: etag-create-only
AWS_CONDITIONAL_PUT: etag-put-if-not-exists
AWS_COPY_IF_NOT_EXISTS: multipart

- name: GCS Output
Expand Down
12 changes: 11 additions & 1 deletion object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ impl S3Client {
PutPartPayload::Part(payload) => request.with_payload(payload),
PutPartPayload::Copy(path) => request.header(
"x-amz-copy-source",
&format!("{}/{}", self.config.bucket, path),
&format!("{}/{}", self.config.bucket, encode_path(path)),
),
};

Expand Down Expand Up @@ -671,6 +671,16 @@ impl S3Client {
Ok(PartId { content_id })
}

pub(crate) async fn abort_multipart(&self, location: &Path, upload_id: &str) -> Result<()> {
self.request(Method::DELETE, location)
.query(&[("uploadId", upload_id)])
.with_encryption_headers()
.send()
.await?;

Ok(())
}

pub(crate) async fn complete_multipart(
&self,
location: &Path,
Expand Down
61 changes: 37 additions & 24 deletions object_store/src/aws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl ObjectStore for AmazonS3 {
(PutMode::Create | PutMode::Update(_), None) => Err(Error::NotImplemented),
(
PutMode::Create,
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagCreateOnly),
Some(S3ConditionalPut::ETagMatch | S3ConditionalPut::ETagPutIfNotExists),
) => {
match request.header(&IF_NONE_MATCH, "*").do_put().await {
// Technically If-None-Match should return NotModified but some stores,
Expand All @@ -196,7 +196,7 @@ impl ObjectStore for AmazonS3 {
source: "ETag required for conditional put".to_string().into(),
})?;
match put {
S3ConditionalPut::ETagCreateOnly => Err(Error::NotImplemented),
S3ConditionalPut::ETagPutIfNotExists => Err(Error::NotImplemented),
S3ConditionalPut::ETagMatch => {
request.header(&IF_MATCH, etag.as_str()).do_put().await
}
Expand Down Expand Up @@ -302,27 +302,40 @@ impl ObjectStore for AmazonS3 {
.client
.create_multipart(to, PutMultipartOpts::default())
.await?;
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
let res = match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e.into()),
};

let res = async {
let part_id = self
.client
.put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
.await?;
match self
.client
.complete_multipart(
to,
&upload_id,
vec![part_id],
CompleteMultipartMode::Create,
)
.await
{
Err(e @ Error::Precondition { .. }) => Err(Error::AlreadyExists {
path: to.to_string(),
source: Box::new(e),
}),
Ok(_) => Ok(()),
Err(e) => Err(e),
}
}
.await;

// If the multipart upload failed, make a best effort attempt to
// clean it up. It's the caller's responsibility to add a
// lifecycle rule if guaranteed cleanup is required, as we
// cannot protect against an ill-timed process crash.
if res.is_err() {
let _ = self.client.abort_multipart(to, &upload_id).await;
}

return res;
}
Some(S3CopyIfNotExists::Dynamo(lock)) => {
Expand Down Expand Up @@ -504,7 +517,7 @@ mod tests {
copy_if_not_exists(&integration).await;
}
if let Some(conditional_put) = &config.conditional_put {
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagCreateOnly);
let supports_update = !matches!(conditional_put, S3ConditionalPut::ETagPutIfNotExists);
put_opts(&integration, supports_update).await;
}

Expand Down
27 changes: 16 additions & 11 deletions object_store/src/aws/precondition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,17 @@ pub enum S3CopyIfNotExists {
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>` ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
/// Native Amazon S3 supports copy if not exists through a multipart upload
/// where the upload copies an existing object and is completed only if
/// the new object does not already exist.
/// where the upload copies an existing object and is completed only if the
/// new object does not already exist.
///
/// WARNING: When using this mode, `copy_if_not_exists` does not copy
/// tags or attributes from the source object.
/// WARNING: When using this mode, `copy_if_not_exists` does not copy tags
/// or attributes from the source object.
///
/// WARNING: When using this mode, `copy_if_not_exists` makes only a best
/// effort attempt to clean up the multipart upload if the copy operation
/// fails. Consider using a lifecycle rule to automatically clean up
/// abandoned multipart uploads. See [the module
/// docs](super#multipart-uploads) for details.
///
/// Encoded as `multipart` ignoring whitespace.
Multipart,
Expand Down Expand Up @@ -81,9 +87,8 @@ impl std::fmt::Display for S3CopyIfNotExists {

impl S3CopyIfNotExists {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"multipart" => return Some(Self::Multipart),
_ => (),
if s.trim() == "multipart" {
return Some(Self::Multipart);
};

let (variant, value) = s.split_once(':')?;
Expand Down Expand Up @@ -139,10 +144,10 @@ pub enum S3ConditionalPut {
/// This is the limited form of conditional put supported by Amazon S3
/// as of August 2024 ([announcement]).
///
/// Encoded as `etag-create-only` ignoring whitespace.
/// Encoded as `etag-put-if-not-exists` ignoring whitespace.
///
/// [announcement]: https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
ETagCreateOnly,
ETagPutIfNotExists,

/// The name of a DynamoDB table to use for coordination
///
Expand All @@ -159,7 +164,7 @@ impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
Self::ETagCreateOnly => write!(f, "etag-create-only"),
Self::ETagPutIfNotExists => write!(f, "etag-put-if-not-exists"),
Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
Expand All @@ -169,7 +174,7 @@ impl S3ConditionalPut {
fn from_str(s: &str) -> Option<Self> {
match s.trim() {
"etag" => Some(Self::ETagMatch),
"etag-create-only" => Some(Self::ETagCreateOnly),
"etag-put-if-not-exists" => Some(Self::ETagPutIfNotExists),
trimmed => match trimmed.split_once(':')? {
("dynamo", s) => Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
_ => None,
Expand Down

0 comments on commit 9cf76bc

Please sign in to comment.